Skip to content
Merged
101 changes: 99 additions & 2 deletions cognite/extractorutils/unstable/core/_log_upload_action.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
"""Built-in ``fetch_logs`` action: streams rotated log files to CDF Files."""

import logging
from datetime import date
from dataclasses import dataclass
from datetime import date, timedelta, timezone
from datetime import datetime as dt
from pathlib import Path

from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig
from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError

_logger = logging.getLogger(__name__)
Expand All @@ -15,6 +19,19 @@
)


@dataclass(frozen=True)
class LogFileCandidate:
"""A log file resolved for a given date that exists and has content."""

log_date: date
path: Path
is_current: bool # True when path is the live (unrotated) file.log


def _today_utc() -> date:
return dt.now(tz=timezone.utc).date()


def _parse_date(raw: str, field: str) -> date:
try:
return date.fromisoformat(raw)
Expand All @@ -25,6 +42,64 @@ def _parse_date(raw: str, field: str) -> date:
) from None


def _resolve_log_file_path(config: ExtractorConfig) -> Path | None:
"""Return the base log file path from the first file handler in config, or None."""
for handler in config.log_handlers:
if isinstance(handler, LogFileHandlerConfig):
return handler.path
return None


def _build_candidate_files(
base_path: Path,
start_date: date,
end_date: date,
today: date,
Comment thread
vikramlc-cognite marked this conversation as resolved.
) -> tuple[list[LogFileCandidate], list[date]]:
"""
Enumerate log files for [start_date, end_date] and partition into candidates vs skipped.

Rotated files follow the naming convention ``<base_path>.YYYY-MM-DD``.
The live file (``base_path``) is used for ``today``; all other dates use the rotated name.

A file is skipped when it does not exist, is empty, or is inaccessible (any ``OSError``).
This covers the brief rotation race window: if the action is dispatched right after midnight
before ``TimedRotatingFileHandler`` has renamed ``file.log`` to ``file.log.YYYY-MM-DD``,
the rotated file will not be found and that date will appear in ``skipped``. Retrying after
rotation completes (within seconds) will pick it up.

Returns:
(candidates, skipped): candidates are files that exist and have content;
skipped contains dates for which no usable file was found.
"""
candidates: list[LogFileCandidate] = []
skipped: list[date] = []

current = start_date
while current <= end_date:
if current == today:
path = base_path
is_current = True
else:
path = base_path.parent / f"{base_path.name}.{current.isoformat()}"
is_current = False

try:
size = path.stat().st_size
except OSError as e:
_logger.warning("fetch_logs: skipping %s for %s — %s", path, current, e)
skipped.append(current)
Comment thread
vikramlc-cognite marked this conversation as resolved.
else:
if size > 0:
candidates.append(LogFileCandidate(log_date=current, path=path, is_current=is_current))
else:
skipped.append(current)

current += timedelta(days=1)

return candidates, skipped


def fetch_logs_action(ctx: ActionContext) -> None:
"""Validate parameters and upload rotated log files for the requested date range to CDF Files."""
params = ctx.call_metadata or {}
Expand All @@ -46,6 +121,14 @@ def fetch_logs_action(ctx: ActionContext) -> None:
error_type="invalid_date_range",
)

today = _today_utc()
Comment thread
vikramlc-cognite marked this conversation as resolved.

if end_date > today:
raise ActionError(
f"end_date ({end_date}) cannot be in the future (today is {today})",
error_type="invalid_date_range",
)

num_days = (end_date - start_date).days + 1
if num_days > MAX_DATE_RANGE_DAYS:
raise ActionError(
Expand All @@ -54,4 +137,18 @@ def fetch_logs_action(ctx: ActionContext) -> None:
error_type="invalid_date_range",
)

_logger.info("fetch_logs: uploading logs for %s to %s (%d day(s))", start_date, end_date, num_days)
log_file_path = _resolve_log_file_path(ctx.application_config)
if log_file_path is None:
raise ActionError(
"No file log handler configured; add a 'file' type log handler to enable log uploads",
error_type="no_file_handler_configured",
)

candidates, skipped = _build_candidate_files(log_file_path, start_date, end_date, today)
_logger.info(
"fetch_logs: %d candidate file(s) for %s to %s; %d date(s) skipped",
len(candidates),
start_date,
end_date,
len(skipped),
)
12 changes: 9 additions & 3 deletions cognite/extractorutils/unstable/core/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import logging
from collections.abc import Callable
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Generic

from cognite.extractorutils.unstable.configuration.models import ConfigType
from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel
from cognite.extractorutils.unstable.core.logger import CogniteLogger

Expand All @@ -15,7 +16,7 @@
__all__ = ["ActionContext", "ActionError", "ActionTarget", "CustomAction"]


class ActionContext(CogniteLogger):
class ActionContext(Generic[ConfigType], CogniteLogger):
"""
Context for a custom action invocation.

Expand All @@ -28,7 +29,7 @@ class ActionContext(CogniteLogger):
def __init__(
self,
action: "CustomAction",
extractor: "Extractor",
extractor: "Extractor[ConfigType]",
external_id: str,
call_metadata: dict[str, str] | None = None,
) -> None:
Expand All @@ -40,6 +41,11 @@ def __init__(

self._logger = logging.getLogger(f"{self._extractor.EXTERNAL_ID}.action.{self._action.name.replace(' ', '')}")

@property
def application_config(self) -> ConfigType:
"""The extractor's application configuration."""
return self._extractor.application_config

def _new_error(
self,
level: ErrorLevel,
Expand Down
184 changes: 164 additions & 20 deletions tests/test_unstable/test_log_upload_action.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
from datetime import date, timedelta
from pathlib import Path
from unittest.mock import MagicMock

import pytest

from cognite.extractorutils.unstable.configuration.models import (
LogFileHandlerConfig,
LogLevel,
)
from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate
from cognite.extractorutils.unstable.core._log_upload_action import MAX_DATE_RANGE_DAYS
from cognite.extractorutils.unstable.core._log_upload_action import (
MAX_DATE_RANGE_DAYS,
_build_candidate_files,
_resolve_log_file_path,
)
from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction
from cognite.extractorutils.unstable.core.base import FullConfig

from .conftest import TestConfig, TestExtractor

_PAST_TODAY = date(2026, 6, 19)

def _make_extractor() -> TestExtractor:

def _make_extractor(log_path: Path | None = None) -> TestExtractor:
conn = MagicMock()
conn.integration.external_id = "test-integration"
config_kwargs: dict = {"parameter_one": 1, "parameter_two": "a"}
if log_path is not None:
config_kwargs["log_handlers"] = [LogFileHandlerConfig(type="file", path=log_path, level=LogLevel.INFO)]
full_config = FullConfig(
connection_config=conn,
application_config=TestConfig(parameter_one=1, parameter_two="a"),
application_config=TestConfig(**config_kwargs),
current_config_revision=1,
)
return TestExtractor(full_config, MagicMock())
Expand Down Expand Up @@ -111,8 +125,9 @@ def test_parse_date_non_string_type_raises_action_error() -> None:
},
str(MAX_DATE_RANGE_DAYS),
),
({"start_date": "2020-01-01", "end_date": "2099-01-01"}, "future"),
],
ids=["end_before_start", "exceeds_max_days"],
ids=["end_before_start", "exceeds_max_days", "end_date_in_future"],
)
def test_invalid_date_range_reports_invalid_date_range(
call_metadata: dict[str, str], message_contains: str | None
Expand All @@ -125,22 +140,6 @@ def test_invalid_date_range_reports_invalid_date_range(
assert message_contains in (failed.result_message or "")


@pytest.mark.parametrize(
"start_str,end_str",
[
("2026-06-10", "2026-06-10"),
("2026-06-01", str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1))),
],
ids=["single_day", "exact_max_days"],
)
def test_valid_date_range_succeeds(start_str: str, end_str: str) -> None:
extractor = _make_extractor()
updates = _dispatch(extractor, {"start_date": start_str, "end_date": end_str})
statuses = [u.status for u in updates]
assert ActionStatus.succeeded in statuses
assert ActionStatus.failed not in statuses


Comment thread
vikramlc-cognite marked this conversation as resolved.
def test_action_error_details_included_in_result_metadata() -> None:
extractor = _make_extractor()

Expand All @@ -153,3 +152,148 @@ def raise_with_details(ctx: ActionContext) -> None:
failed = _failed_update(_queued_updates(extractor))
assert failed.result_metadata == {"error_type": "unexpected_error", "error_detail": "inner detail"}
assert failed.result_message == "boom"


def test_resolve_log_file_path_returns_none_for_console_only_config() -> None:
config = TestConfig(parameter_one=1, parameter_two="a")
assert _resolve_log_file_path(config) is None


@pytest.mark.parametrize(
"handler_names,expected_name",
[
(["extractor.log"], "extractor.log"),
(["first.log", "second.log"], "first.log"),
],
ids=["single_handler", "multiple_handlers_returns_first"],
)
def test_resolve_log_file_path_with_file_handlers(tmp_path: Path, handler_names: list[str], expected_name: str) -> None:
config = TestConfig(
parameter_one=1,
parameter_two="a",
log_handlers=[LogFileHandlerConfig(type="file", path=tmp_path / n, level=LogLevel.INFO) for n in handler_names],
)
assert _resolve_log_file_path(config) == tmp_path / expected_name


def test_existing_rotated_files_become_candidates(tmp_path: Path) -> None:
base = tmp_path / "extractor.log"
start = date(2026, 6, 1)
end = date(2026, 6, 3)
for d in [start, start + timedelta(days=1), end]:
(tmp_path / f"extractor.log.{d.isoformat()}").write_bytes(b"data")
candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY)
assert len(candidates) == 3
assert skipped == []
assert all(not c.is_current for c in candidates)


@pytest.mark.parametrize(
"create_file",
[False, True],
ids=["missing", "empty"],
)
def test_unusable_file_goes_to_skipped(tmp_path: Path, create_file: bool) -> None:
# Covers missing files (including the rotation race window) and 0-byte files.
base = tmp_path / "extractor.log"
d = date(2026, 6, 1)
if create_file:
(tmp_path / f"extractor.log.{d.isoformat()}").touch() # 0 bytes
candidates, skipped = _build_candidate_files(base, d, d, _PAST_TODAY)
assert candidates == []
assert skipped == [d]


def test_today_uses_live_file_not_rotated_name(tmp_path: Path) -> None:
base = tmp_path / "extractor.log"
base.write_bytes(b"today data")
candidates, skipped = _build_candidate_files(base, _PAST_TODAY, _PAST_TODAY, _PAST_TODAY)
assert len(candidates) == 1
assert candidates[0].path == base
assert candidates[0].is_current is True
assert skipped == []

Comment thread
vikramlc-cognite marked this conversation as resolved.

def test_mixed_range_partitions_correctly(tmp_path: Path) -> None:
base = tmp_path / "extractor.log"
start = date(2026, 6, 1)
end = date(2026, 6, 3)
(tmp_path / "extractor.log.2026-06-01").write_bytes(b"data")
(tmp_path / "extractor.log.2026-06-03").write_bytes(b"data")
candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY)
assert len(candidates) == 2
assert skipped == [date(2026, 6, 2)]


@pytest.mark.parametrize(
"start,end",
[
(_PAST_TODAY, _PAST_TODAY),
(date(2026, 6, 1), date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1)),
],
ids=["single_today", "exact_max_historical"],
)
def test_boundary_ranges_produce_correct_candidate_count(tmp_path: Path, start: date, end: date) -> None:
base = tmp_path / "extractor.log"
expected = (end - start).days + 1
if start == _PAST_TODAY:
base.write_bytes(b"data")
else:
current = start
while current <= end:
(tmp_path / f"extractor.log.{current.isoformat()}").write_bytes(b"data")
current += timedelta(days=1)
candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY)
assert len(candidates) == expected
assert skipped == []


def test_no_file_handler_reports_no_file_handler_configured() -> None:
extractor = _make_extractor()
updates = _dispatch(extractor, {"start_date": "2026-06-01", "end_date": "2026-06-07"})
failed = _failed_update(updates)
assert failed.result_metadata == {"error_type": "no_file_handler_configured"}


@pytest.mark.parametrize(
"create_log_file",
[True, False],
ids=["files_present", "all_files_missing"],
)
def test_fetch_logs_action_with_file_handler_succeeds(tmp_path: Path, create_log_file: bool) -> None:
log_path = tmp_path / "extractor.log"
if create_log_file:
(tmp_path / "extractor.log.2026-06-10").write_bytes(b"log data")
extractor = _make_extractor(log_path=log_path)
updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"})
statuses = [u.status for u in updates]
assert ActionStatus.succeeded in statuses
assert ActionStatus.failed not in statuses


def test_valid_exact_max_days_range_succeeds_at_dispatch(tmp_path: Path) -> None:
# Exactly MAX_DATE_RANGE_DAYS is the boundary — one more would be rejected.
start = date(2026, 6, 1)
end = start + timedelta(days=MAX_DATE_RANGE_DAYS - 1)
log_path = tmp_path / "extractor.log"
extractor = _make_extractor(log_path=log_path)
updates = _dispatch(extractor, {"start_date": str(start), "end_date": str(end)})
statuses = [u.status for u in updates]
assert ActionStatus.succeeded in statuses
assert ActionStatus.failed not in statuses


def test_range_spanning_rotated_and_live_file(tmp_path: Path) -> None:
# Covers the common case: start_date = yesterday (rotated file), end_date = today (live file).
base = tmp_path / "extractor.log"
yesterday = _PAST_TODAY - timedelta(days=1)
base.write_bytes(b"today data")
(tmp_path / f"extractor.log.{yesterday.isoformat()}").write_bytes(b"yesterday data")
candidates, skipped = _build_candidate_files(base, yesterday, _PAST_TODAY, _PAST_TODAY)
assert skipped == []
assert len(candidates) == 2
rotated = next(c for c in candidates if c.log_date == yesterday)
live = next(c for c in candidates if c.log_date == _PAST_TODAY)
assert rotated.is_current is False
assert live.is_current is True
assert live.path == base
Loading