From 9d34837f473a24c184b1113a09ea7353dd4bf29e Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 19 Jun 2026 13:58:11 +0530 Subject: [PATCH 1/7] feat(odin): Register fetch logs action and validate input dates --- .../unstable/core/_log_upload_action.py | 54 +++++++ .../extractorutils/unstable/core/actions.py | 19 ++- cognite/extractorutils/unstable/core/base.py | 23 ++- tests/test_unstable/test_action_dispatch.py | 16 +- .../test_unstable/test_action_registration.py | 35 +++-- tests/test_unstable/test_log_upload_action.py | 145 ++++++++++++++++++ 6 files changed, 275 insertions(+), 17 deletions(-) create mode 100644 cognite/extractorutils/unstable/core/_log_upload_action.py create mode 100644 tests/test_unstable/test_log_upload_action.py diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py new file mode 100644 index 00000000..9806ec4d --- /dev/null +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -0,0 +1,54 @@ +"""Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" + +from datetime import date + +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError + +MAX_DATE_RANGE_DAYS = 7 +"""Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" + +_FETCH_LOGS_DESCRIPTION = ( + "Upload rotated log files to CDF Files for a given date range. " + f"At most {MAX_DATE_RANGE_DAYS} days per invocation." +) + + +def _parse_date(raw: str, field: str) -> date: + try: + return date.fromisoformat(raw) + except ValueError: + raise ActionError( + f"Invalid {field} '{raw}': expected ISO 8601 date (YYYY-MM-DD)", + error_type="invalid_parameter", + ) from None + + +def fetch_logs_action(ctx: ActionContext) -> None: + """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" + params = ctx.call_metadata or {} + + start_date_raw = params.get("start_date") + end_date_raw = params.get("end_date") + + if start_date_raw is None: + raise ActionError("Missing required parameter: start_date", error_type="missing_parameter") + if end_date_raw is None: + raise ActionError("Missing required parameter: end_date", error_type="missing_parameter") + + start_date = _parse_date(start_date_raw, "start_date") + end_date = _parse_date(end_date_raw, "end_date") + + if end_date < start_date: + raise ActionError( + f"end_date ({end_date}) must be on or after start_date ({start_date})", + error_type="invalid_date_range", + ) + + num_days = (end_date - start_date).days + 1 + if num_days > MAX_DATE_RANGE_DAYS: + raise ActionError( + f"Date range of {num_days} days exceeds the maximum of {MAX_DATE_RANGE_DAYS} days; " + "use multiple invocations for longer spans", + error_type="invalid_date_range", + ) + diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index 480f7c11..0193dc67 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -12,7 +12,7 @@ if TYPE_CHECKING: from cognite.extractorutils.unstable.core.base import Extractor -__all__ = ["ActionContext", "ActionTarget", "CustomAction"] +__all__ = ["ActionContext", "ActionError", "ActionTarget", "CustomAction"] class ActionContext(CogniteLogger): @@ -56,6 +56,23 @@ def _new_error( ) +class ActionError(Exception): + """Deliberate action failure with structured metadata for Odin result reporting.""" + + def __init__(self, message: str, *, error_type: str, details: str | None = None) -> None: + super().__init__(message) + self.error_type = error_type + self.details = details + + @property + def result_metadata(self) -> dict[str, str]: + """Structured metadata dict for the action update.""" + meta: dict[str, str] = {"error_type": self.error_type} + if self.details is not None: + meta["error_detail"] = self.details + return meta + + ActionTarget = Callable[["ActionContext"], None] diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index a8549b09..0fff2d52 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -98,8 +98,9 @@ def my_task_function(self, task_context: TaskContext) -> None: from cognite.extractorutils.unstable.core._dto import ( Task as DtoTask, ) +from cognite.extractorutils.unstable.core._log_upload_action import _FETCH_LOGS_DESCRIPTION, fetch_logs_action from cognite.extractorutils.unstable.core._messaging import RuntimeMessage -from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.checkin_worker import CheckinWorker from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel from cognite.extractorutils.unstable.core.logger import CogniteLogger, RobustFileHandler @@ -205,6 +206,7 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker ) self.__init_tasks__() + self._register_builtin_actions() self.__init_actions__() def _setup_cancellation_watcher(self, cancel_event: MpEvent) -> None: @@ -346,6 +348,16 @@ def __init_tasks__(self) -> None: """ pass + def _register_builtin_actions(self) -> None: + """Register framework-level actions available on every extractor.""" + self.add_action( + CustomAction( + name="fetch_logs", + target=fetch_logs_action, + description=_FETCH_LOGS_DESCRIPTION, + ) + ) + def __init_actions__(self) -> None: """ This method should be overridden by subclasses to register custom actions. @@ -667,6 +679,15 @@ def _handle_custom_action(self, action: Action) -> None: self._checkin_worker.queue_action_update( ActionUpdate(external_id=action.external_id, status=ActionStatus.succeeded) ) + except ActionError as e: + self._checkin_worker.queue_action_update( + ActionUpdate( + external_id=action.external_id, + status=ActionStatus.failed, + result_message=str(e), + result_metadata=e.result_metadata, + ) + ) except Exception as e: self._checkin_worker.queue_action_update( ActionUpdate( diff --git a/tests/test_unstable/test_action_dispatch.py b/tests/test_unstable/test_action_dispatch.py index e885d617..7b61a1ed 100644 --- a/tests/test_unstable/test_action_dispatch.py +++ b/tests/test_unstable/test_action_dispatch.py @@ -6,7 +6,7 @@ import pytest from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate -from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.base import FullConfig from cognite.extractorutils.unstable.core.tasks import ScheduledTask, TaskContext @@ -170,6 +170,20 @@ def target(ctx: ActionContext) -> None: assert expected_message in (updates[-1].result_message or "") +def test_action_error_sets_result_metadata_and_keeps_failed_status() -> None: + def target(ctx: ActionContext) -> None: + raise ActionError("bad input", error_type="invalid_parameter") + + extractor = _make_extractor() + extractor.add_action(CustomAction(name="strict", target=target)) + extractor._dispatch_single_action(_make_action("act-err", "strict")) + + updates = _queued_updates(extractor) + failed = next(u for u in updates if u.status == ActionStatus.failed) + assert failed.result_metadata == {"error_type": "invalid_parameter"} + assert failed.result_message == "bad input" + + def test_custom_action_receives_call_metadata_in_context() -> None: received_metadata: list[dict | None] = [] diff --git a/tests/test_unstable/test_action_registration.py b/tests/test_unstable/test_action_registration.py index 6d3142d3..44fa50be 100644 --- a/tests/test_unstable/test_action_registration.py +++ b/tests/test_unstable/test_action_registration.py @@ -39,21 +39,25 @@ def _startup_request(extractor: Extractor) -> StartupRequest: ), ], ) -def test_available_actions_none_without_scheduled_tasks(extra_tasks: list) -> None: +def test_available_actions_without_scheduled_tasks_only_has_builtins(extra_tasks: list) -> None: extractor = _make_extractor() for task in extra_tasks: extractor.add_task(task) - assert _startup_request(extractor).available_actions is None + req = _startup_request(extractor) + assert req.available_actions is not None + # Only built-in actions present — no start/stop actions from scheduled tasks + task_action_names = {a.name for a in req.available_actions if a.type != ActionType.custom} + assert task_action_names == set() -def test_two_scheduled_tasks_produce_four_available_actions() -> None: +def test_two_scheduled_tasks_produce_four_task_start_stop_actions() -> None: extractor = _make_extractor() extractor.add_task(ScheduledTask.from_interval(interval="1h", name="alpha", target=lambda _: None)) extractor.add_task(ScheduledTask.from_interval(interval="2h", name="beta", target=lambda _: None)) req = _startup_request(extractor) assert req.available_actions is not None - assert len(req.available_actions) == 4 - assert {a.name for a in req.available_actions} == {"Start alpha", "Stop alpha", "Start beta", "Stop beta"} + task_action_names = {a.name for a in req.available_actions if a.type != ActionType.custom} + assert task_action_names == {"Start alpha", "Stop alpha", "Start beta", "Stop beta"} @pytest.mark.parametrize( @@ -79,19 +83,22 @@ def test_scheduled_and_custom_actions_combined_ordering() -> None: extractor.add_action(CustomAction(name="flush", target=lambda _: None)) req = _startup_request(extractor) assert req.available_actions is not None - assert len(req.available_actions) == 3 names = [a.name for a in req.available_actions] - assert names == ["Start sync", "Stop sync", "flush"] + # Built-in actions precede scheduled-task start/stop, which precede user custom actions + assert names.index("Start sync") < names.index("flush") + assert names.index("Stop sync") < names.index("flush") + assert "fetch_logs" in names def test_custom_action_appears_with_correct_type_and_description() -> None: extractor = _make_extractor() extractor.add_action(CustomAction(name="flush cache", target=lambda _: None, description="Clears state")) actions = _startup_request(extractor).available_actions - assert actions is not None and len(actions) == 1 - assert actions[0].name == "flush cache" - assert actions[0].type == ActionType.custom - assert actions[0].description == "Clears state" + assert actions is not None + by_name = {a.name: a for a in actions} + assert "flush cache" in by_name + assert by_name["flush cache"].type == ActionType.custom + assert by_name["flush cache"].description == "Clears state" def test_init_actions_hook_called_after_init_tasks_and_can_register_actions() -> None: @@ -107,15 +114,15 @@ def __init_actions__(self) -> None: extractor = _make_extractor(_Ext) assert call_order == ["tasks", "actions"] - assert len(extractor._custom_actions) == 1 - assert extractor._custom_actions[0].name == "ping" + assert any(a.name == "ping" for a in extractor._custom_actions) def test_multiple_add_action_calls_accumulate_in_registration_order() -> None: extractor = _make_extractor() for name in ("a1", "a2", "a3"): extractor.add_action(CustomAction(name=name, target=lambda _: None)) - assert [a.name for a in extractor._custom_actions] == ["a1", "a2", "a3"] + names = [a.name for a in extractor._custom_actions] + assert names.index("a1") < names.index("a2") < names.index("a3") def test_add_action_raises_on_duplicate_name() -> None: diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py new file mode 100644 index 00000000..3648e476 --- /dev/null +++ b/tests/test_unstable/test_log_upload_action.py @@ -0,0 +1,145 @@ +from datetime import date, timedelta +from unittest.mock import MagicMock + +import pytest + +from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate +from cognite.extractorutils.unstable.core._log_upload_action import MAX_DATE_RANGE_DAYS +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction +from cognite.extractorutils.unstable.core.base import FullConfig + +from .conftest import TestConfig, TestExtractor + + +def _make_extractor() -> TestExtractor: + conn = MagicMock() + conn.integration.external_id = "test-integration" + full_config = FullConfig( + connection_config=conn, + application_config=TestConfig(parameter_one=1, parameter_two="a"), + current_config_revision=1, + ) + return TestExtractor(full_config, MagicMock()) + + +def _queued_updates(extractor: TestExtractor) -> list[ActionUpdate]: + return [c[0][0] for c in extractor._checkin_worker.queue_action_update.call_args_list] + + +def _dispatch(extractor: TestExtractor, call_metadata: dict[str, str] | None) -> list[ActionUpdate]: + action = Action( + external_id="act-1", + action_name="fetch_logs", + status=ActionStatus.pending, + call_metadata=call_metadata, + ) + extractor._dispatch_single_action(action) + return _queued_updates(extractor) + + +def _failed_update(updates: list[ActionUpdate]) -> ActionUpdate: + return next(u for u in updates if u.status == ActionStatus.failed) + + +def test_fetch_logs_registered_as_builtin_with_description() -> None: + extractor = _make_extractor() + action = next((a for a in extractor._custom_actions if a.name == "fetch_logs"), None) + assert action is not None + assert action.description + + +def test_registering_fetch_logs_as_user_action_raises() -> None: + extractor = _make_extractor() + with pytest.raises(ValueError, match="fetch_logs"): + extractor.add_action(CustomAction(name="fetch_logs", target=lambda ctx: None)) + + +@pytest.mark.parametrize( + "call_metadata,missing_field", + [ + (None, "start_date"), + ({"end_date": "2026-06-10"}, "start_date"), + ({"start_date": "2026-06-10"}, "end_date"), + ], + ids=["no_metadata", "missing_start_date", "missing_end_date"], +) +def test_missing_required_date_reports_missing_parameter( + call_metadata: dict[str, str] | None, missing_field: str +) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "missing_parameter"} + assert missing_field in (failed.result_message or "") + + +@pytest.mark.parametrize( + "call_metadata,bad_field", + [ + ({"start_date": "not-a-date", "end_date": "2026-06-10"}, "start_date"), + ({"start_date": "2026-06-10", "end_date": "2026/06/11"}, "end_date"), + ({"start_date": "2026-13-01", "end_date": "2026-06-11"}, "start_date"), + ], + ids=["invalid_start", "slash_end", "out_of_range_month"], +) +def test_non_iso_date_reports_invalid_parameter(call_metadata: dict[str, str], bad_field: str) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "invalid_parameter"} + assert bad_field in (failed.result_message or "") + + +@pytest.mark.parametrize( + "call_metadata,message_contains", + [ + ({"start_date": "2026-06-10", "end_date": "2026-06-09"}, None), + ( + { + "start_date": str(date(2026, 6, 1)), + "end_date": str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS)), + }, + str(MAX_DATE_RANGE_DAYS), + ), + ], + ids=["end_before_start", "exceeds_max_days"], +) +def test_invalid_date_range_reports_invalid_date_range( + call_metadata: dict[str, str], message_contains: str | None +) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "invalid_date_range"} + if message_contains: + assert message_contains in (failed.result_message or "") + + +@pytest.mark.parametrize( + "start_str,end_str", + [ + ("2026-06-10", "2026-06-10"), + ("2026-06-01", str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1))), + ], + ids=["single_day", "exact_max_days"], +) +def test_valid_date_range_succeeds(start_str: str, end_str: str) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, {"start_date": start_str, "end_date": end_str}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses + + +def test_action_error_details_included_in_result_metadata() -> None: + extractor = _make_extractor() + + def raise_with_details(ctx: ActionContext) -> None: + raise ActionError("boom", error_type="unexpected_error", details="inner detail") + + extractor.add_action(CustomAction(name="boom-action", target=raise_with_details)) + action = Action(external_id="act-detail", action_name="boom-action", status=ActionStatus.pending) + extractor._dispatch_single_action(action) + failed = _failed_update(_queued_updates(extractor)) + assert failed.result_metadata == {"error_type": "unexpected_error", "error_detail": "inner detail"} + assert failed.result_message == "boom" From a480f65139b95a320aa2fe4f5a46e0d360cee4d0 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 19 Jun 2026 14:16:33 +0530 Subject: [PATCH 2/7] fix(odin): Fix lint issue and added type error --- .../extractorutils/unstable/core/_log_upload_action.py | 6 ++---- tests/test_unstable/test_log_upload_action.py | 10 ++++++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 9806ec4d..f395627e 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -8,15 +8,14 @@ """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" _FETCH_LOGS_DESCRIPTION = ( - "Upload rotated log files to CDF Files for a given date range. " - f"At most {MAX_DATE_RANGE_DAYS} days per invocation." + f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation." ) def _parse_date(raw: str, field: str) -> date: try: return date.fromisoformat(raw) - except ValueError: + except (ValueError, TypeError): raise ActionError( f"Invalid {field} '{raw}': expected ISO 8601 date (YYYY-MM-DD)", error_type="invalid_parameter", @@ -51,4 +50,3 @@ def fetch_logs_action(ctx: ActionContext) -> None: "use multiple invocations for longer spans", error_type="invalid_date_range", ) - diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 3648e476..1fc00915 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -90,6 +90,16 @@ def test_non_iso_date_reports_invalid_parameter(call_metadata: dict[str, str], b assert bad_field in (failed.result_message or "") +def test_parse_date_non_string_type_raises_action_error() -> None: + # Pydantic guards dict[str, str] at the DTO boundary, but _parse_date may be called + # directly, so TypeError from date.fromisoformat must also surface as ActionError. + from cognite.extractorutils.unstable.core._log_upload_action import _parse_date + + with pytest.raises(ActionError) as exc_info: + _parse_date(20260610, "start_date") # type: ignore[arg-type] + assert exc_info.value.error_type == "invalid_parameter" + + @pytest.mark.parametrize( "call_metadata,message_contains", [ From 8670c5edd044ab35496f42a72c6ba6fdefca9d0f Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 19 Jun 2026 15:14:28 +0530 Subject: [PATCH 3/7] feat(odin): Build candidate log file list --- .../unstable/core/_log_upload_action.py | 86 +++++++++- tests/test_unstable/test_log_upload_action.py | 153 +++++++++++++++--- 2 files changed, 219 insertions(+), 20 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index f395627e..001ab9ca 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,7 +1,11 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" -from datetime import date +from dataclasses import dataclass +from datetime import date, timedelta, timezone +from datetime import datetime as dt +from pathlib import Path +from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError MAX_DATE_RANGE_DAYS = 7 @@ -12,6 +16,19 @@ ) +@dataclass(frozen=True) +class LogFileCandidate: + """A log file resolved for a given date that exists and has content.""" + + date: date + path: Path + is_current: bool # True when path is the live (unrotated) file.log + + +def _today_utc() -> date: + return dt.now(tz=timezone.utc).date() + + def _parse_date(raw: str, field: str) -> date: try: return date.fromisoformat(raw) @@ -22,6 +39,63 @@ def _parse_date(raw: str, field: str) -> date: ) from None +def _resolve_log_file_path(config: ExtractorConfig) -> Path | None: + """Return the base log file path from the first file handler in config, or None.""" + for handler in config.log_handlers: + if isinstance(handler, LogFileHandlerConfig): + return handler.path + return None + + +def _build_candidate_files( + base_path: Path, + start_date: date, + end_date: date, + today: date, +) -> tuple[list[LogFileCandidate], list[date]]: + """ + Enumerate log files for [start_date, end_date] and partition into candidates vs skipped. + + Rotated files follow the naming convention ``.YYYY-MM-DD``. + The live file (``base_path``) is used for ``today``; all other dates use the rotated name. + + A file is skipped when it does not exist or is empty. This covers the brief rotation + race window: if the action is dispatched right after midnight before ``TimedRotatingFileHandler`` + has renamed ``file.log`` to ``file.log.YYYY-MM-DD``, the rotated file will not be found and + that date will appear in ``skipped``. Retrying after rotation completes (within seconds) + will pick it up. + + Returns: + (candidates, skipped): candidates are files that exist and have content; + skipped contains dates for which no usable file was found. + """ + candidates: list[LogFileCandidate] = [] + skipped: list[date] = [] + + current = start_date + while current <= end_date: + if current == today: + path = base_path + is_current = True + else: + path = base_path.parent / f"{base_path.name}.{current.isoformat()}" + is_current = False + + try: + size = path.stat().st_size + except FileNotFoundError: + skipped.append(current) + else: + if size > 0: + candidates.append(LogFileCandidate(date=current, path=path, is_current=is_current)) + else: + skipped.append(current) + + current += timedelta(days=1) + + return candidates, skipped + + def fetch_logs_action(ctx: ActionContext) -> None: """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" params = ctx.call_metadata or {} @@ -50,3 +124,13 @@ def fetch_logs_action(ctx: ActionContext) -> None: "use multiple invocations for longer spans", error_type="invalid_date_range", ) + + log_file_path = _resolve_log_file_path(ctx._extractor.application_config) + if log_file_path is None: + raise ActionError( + "No file log handler configured; add a 'file' type log handler to enable log uploads", + error_type="no_file_handler_configured", + ) + + today = _today_utc() + _candidates, _skipped = _build_candidate_files(log_file_path, start_date, end_date, today) diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 1fc00915..e60e8143 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -1,22 +1,36 @@ from datetime import date, timedelta +from pathlib import Path from unittest.mock import MagicMock import pytest +from cognite.extractorutils.unstable.configuration.models import ( + LogFileHandlerConfig, + LogLevel, +) from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate -from cognite.extractorutils.unstable.core._log_upload_action import MAX_DATE_RANGE_DAYS +from cognite.extractorutils.unstable.core._log_upload_action import ( + MAX_DATE_RANGE_DAYS, + _build_candidate_files, + _resolve_log_file_path, +) from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.base import FullConfig from .conftest import TestConfig, TestExtractor +_PAST_TODAY = date(2026, 6, 19) -def _make_extractor() -> TestExtractor: + +def _make_extractor(log_path: Path | None = None) -> TestExtractor: conn = MagicMock() conn.integration.external_id = "test-integration" + config_kwargs: dict = {"parameter_one": 1, "parameter_two": "a"} + if log_path is not None: + config_kwargs["log_handlers"] = [LogFileHandlerConfig(type="file", path=log_path, level=LogLevel.INFO)] full_config = FullConfig( connection_config=conn, - application_config=TestConfig(parameter_one=1, parameter_two="a"), + application_config=TestConfig(**config_kwargs), current_config_revision=1, ) return TestExtractor(full_config, MagicMock()) @@ -125,22 +139,6 @@ def test_invalid_date_range_reports_invalid_date_range( assert message_contains in (failed.result_message or "") -@pytest.mark.parametrize( - "start_str,end_str", - [ - ("2026-06-10", "2026-06-10"), - ("2026-06-01", str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1))), - ], - ids=["single_day", "exact_max_days"], -) -def test_valid_date_range_succeeds(start_str: str, end_str: str) -> None: - extractor = _make_extractor() - updates = _dispatch(extractor, {"start_date": start_str, "end_date": end_str}) - statuses = [u.status for u in updates] - assert ActionStatus.succeeded in statuses - assert ActionStatus.failed not in statuses - - def test_action_error_details_included_in_result_metadata() -> None: extractor = _make_extractor() @@ -153,3 +151,120 @@ def raise_with_details(ctx: ActionContext) -> None: failed = _failed_update(_queued_updates(extractor)) assert failed.result_metadata == {"error_type": "unexpected_error", "error_detail": "inner detail"} assert failed.result_message == "boom" + + +def test_resolve_log_file_path_returns_none_for_console_only_config() -> None: + config = TestConfig(parameter_one=1, parameter_two="a") + assert _resolve_log_file_path(config) is None + + +@pytest.mark.parametrize( + "handler_names,expected_name", + [ + (["extractor.log"], "extractor.log"), + (["first.log", "second.log"], "first.log"), + ], + ids=["single_handler", "multiple_handlers_returns_first"], +) +def test_resolve_log_file_path_with_file_handlers(tmp_path: Path, handler_names: list[str], expected_name: str) -> None: + config = TestConfig( + parameter_one=1, + parameter_two="a", + log_handlers=[LogFileHandlerConfig(type="file", path=tmp_path / n, level=LogLevel.INFO) for n in handler_names], + ) + assert _resolve_log_file_path(config) == tmp_path / expected_name + + +def test_existing_rotated_files_become_candidates(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + start = date(2026, 6, 1) + end = date(2026, 6, 3) + for d in [start, start + timedelta(days=1), end]: + (tmp_path / f"extractor.log.{d.isoformat()}").write_bytes(b"data") + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == 3 + assert skipped == [] + assert all(not c.is_current for c in candidates) + + +@pytest.mark.parametrize( + "create_file", + [False, True], + ids=["missing", "empty"], +) +def test_unusable_file_goes_to_skipped(tmp_path: Path, create_file: bool) -> None: + # Covers missing files (including the rotation race window) and 0-byte files. + base = tmp_path / "extractor.log" + d = date(2026, 6, 1) + if create_file: + (tmp_path / f"extractor.log.{d.isoformat()}").touch() # 0 bytes + candidates, skipped = _build_candidate_files(base, d, d, _PAST_TODAY) + assert candidates == [] + assert skipped == [d] + + +def test_today_uses_live_file_not_rotated_name(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + base.write_bytes(b"today data") + candidates, skipped = _build_candidate_files(base, _PAST_TODAY, _PAST_TODAY, _PAST_TODAY) + assert len(candidates) == 1 + assert candidates[0].path == base + assert candidates[0].is_current is True + assert skipped == [] + + +def test_mixed_range_partitions_correctly(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + start = date(2026, 6, 1) + end = date(2026, 6, 3) + (tmp_path / "extractor.log.2026-06-01").write_bytes(b"data") + (tmp_path / "extractor.log.2026-06-03").write_bytes(b"data") + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == 2 + assert skipped == [date(2026, 6, 2)] + + +@pytest.mark.parametrize( + "start,end", + [ + (_PAST_TODAY, _PAST_TODAY), + (date(2026, 6, 1), date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1)), + ], + ids=["single_today", "exact_max_historical"], +) +def test_boundary_ranges_produce_correct_candidate_count(tmp_path: Path, start: date, end: date) -> None: + base = tmp_path / "extractor.log" + expected = (end - start).days + 1 + if start == _PAST_TODAY: + base.write_bytes(b"data") + else: + current = start + while current <= end: + (tmp_path / f"extractor.log.{current.isoformat()}").write_bytes(b"data") + current += timedelta(days=1) + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == expected + assert skipped == [] + + +def test_no_file_handler_reports_no_file_handler_configured() -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, {"start_date": "2026-06-01", "end_date": "2026-06-07"}) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "no_file_handler_configured"} + + +@pytest.mark.parametrize( + "create_log_file", + [True, False], + ids=["files_present", "all_files_missing"], +) +def test_fetch_logs_action_with_file_handler_succeeds(tmp_path: Path, create_log_file: bool) -> None: + log_path = tmp_path / "extractor.log" + if create_log_file: + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"log data") + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses From 471e5e15b1c05cb5371def55b7014d1ee584ca58 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 19 Jun 2026 16:16:00 +0530 Subject: [PATCH 4/7] refactor(odin): Fix gemini comments --- .../unstable/core/_log_upload_action.py | 21 ++++++++++++------- tests/test_unstable/test_log_upload_action.py | 3 ++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 001ab9ca..b0e13f4b 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -59,11 +59,11 @@ def _build_candidate_files( Rotated files follow the naming convention ``.YYYY-MM-DD``. The live file (``base_path``) is used for ``today``; all other dates use the rotated name. - A file is skipped when it does not exist or is empty. This covers the brief rotation - race window: if the action is dispatched right after midnight before ``TimedRotatingFileHandler`` - has renamed ``file.log`` to ``file.log.YYYY-MM-DD``, the rotated file will not be found and - that date will appear in ``skipped``. Retrying after rotation completes (within seconds) - will pick it up. + A file is skipped when it does not exist, is empty, or is inaccessible (any ``OSError``). + This covers the brief rotation race window: if the action is dispatched right after midnight + before ``TimedRotatingFileHandler`` has renamed ``file.log`` to ``file.log.YYYY-MM-DD``, + the rotated file will not be found and that date will appear in ``skipped``. Retrying after + rotation completes (within seconds) will pick it up. Returns: (candidates, skipped): candidates are files that exist and have content; @@ -83,7 +83,7 @@ def _build_candidate_files( try: size = path.stat().st_size - except FileNotFoundError: + except OSError: skipped.append(current) else: if size > 0: @@ -117,6 +117,14 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="invalid_date_range", ) + today = _today_utc() + + if end_date > today: + raise ActionError( + f"end_date ({end_date}) cannot be in the future (today is {today})", + error_type="invalid_date_range", + ) + num_days = (end_date - start_date).days + 1 if num_days > MAX_DATE_RANGE_DAYS: raise ActionError( @@ -132,5 +140,4 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="no_file_handler_configured", ) - today = _today_utc() _candidates, _skipped = _build_candidate_files(log_file_path, start_date, end_date, today) diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index e60e8143..ac21d243 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -125,8 +125,9 @@ def test_parse_date_non_string_type_raises_action_error() -> None: }, str(MAX_DATE_RANGE_DAYS), ), + ({"start_date": "2020-01-01", "end_date": "2099-01-01"}, "future"), ], - ids=["end_before_start", "exceeds_max_days"], + ids=["end_before_start", "exceeds_max_days", "end_date_in_future"], ) def test_invalid_date_range_reports_invalid_date_range( call_metadata: dict[str, str], message_contains: str | None From acbecd5ab3e6290e227b65cf83c5902ccab217a4 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Wed, 24 Jun 2026 15:03:44 +0530 Subject: [PATCH 5/7] refactor(odin): Add logger and validate actions order --- cognite/extractorutils/unstable/core/_log_upload_action.py | 5 +++++ tests/test_unstable/test_action_registration.py | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index f395627e..9d6b20e1 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,9 +1,12 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" +import logging from datetime import date from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError +_logger = logging.getLogger(__name__) + MAX_DATE_RANGE_DAYS = 7 """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" @@ -50,3 +53,5 @@ def fetch_logs_action(ctx: ActionContext) -> None: "use multiple invocations for longer spans", error_type="invalid_date_range", ) + + _logger.info("fetch_logs: uploading logs for %s to %s (%d day(s))", start_date, end_date, num_days) diff --git a/tests/test_unstable/test_action_registration.py b/tests/test_unstable/test_action_registration.py index 44fa50be..e8223f09 100644 --- a/tests/test_unstable/test_action_registration.py +++ b/tests/test_unstable/test_action_registration.py @@ -84,10 +84,11 @@ def test_scheduled_and_custom_actions_combined_ordering() -> None: req = _startup_request(extractor) assert req.available_actions is not None names = [a.name for a in req.available_actions] - # Built-in actions precede scheduled-task start/stop, which precede user custom actions + # Ordering: scheduled-task start/stop actions, then _custom_actions in registration order + # (_custom_actions = built-ins registered first, then user-registered actions) assert names.index("Start sync") < names.index("flush") assert names.index("Stop sync") < names.index("flush") - assert "fetch_logs" in names + assert names.index("fetch_logs") < names.index("flush") def test_custom_action_appears_with_correct_type_and_description() -> None: From 90a2cbd74a24b938f787799a0dabd27b33b8417f Mon Sep 17 00:00:00 2001 From: vikramlc Date: Wed, 24 Jun 2026 15:23:35 +0530 Subject: [PATCH 6/7] refactor(odin): Added logging and renaming fields --- .../unstable/core/_log_upload_action.py | 19 ++++++++++--- tests/test_unstable/test_log_upload_action.py | 28 +++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index b0e13f4b..c027e00c 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,5 +1,6 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" +import logging from dataclasses import dataclass from datetime import date, timedelta, timezone from datetime import datetime as dt @@ -8,6 +9,8 @@ from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError +_logger = logging.getLogger(__name__) + MAX_DATE_RANGE_DAYS = 7 """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" @@ -20,7 +23,7 @@ class LogFileCandidate: """A log file resolved for a given date that exists and has content.""" - date: date + log_date: date path: Path is_current: bool # True when path is the live (unrotated) file.log @@ -83,11 +86,12 @@ def _build_candidate_files( try: size = path.stat().st_size - except OSError: + except OSError as e: + _logger.warning("fetch_logs: skipping %s for %s — %s", path, current, e) skipped.append(current) else: if size > 0: - candidates.append(LogFileCandidate(date=current, path=path, is_current=is_current)) + candidates.append(LogFileCandidate(log_date=current, path=path, is_current=is_current)) else: skipped.append(current) @@ -140,4 +144,11 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="no_file_handler_configured", ) - _candidates, _skipped = _build_candidate_files(log_file_path, start_date, end_date, today) + candidates, skipped = _build_candidate_files(log_file_path, start_date, end_date, today) + _logger.info( + "fetch_logs: %d candidate file(s) for %s to %s; %d date(s) skipped", + len(candidates), + start_date, + end_date, + len(skipped), + ) diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index ac21d243..505b541b 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -269,3 +269,31 @@ def test_fetch_logs_action_with_file_handler_succeeds(tmp_path: Path, create_log statuses = [u.status for u in updates] assert ActionStatus.succeeded in statuses assert ActionStatus.failed not in statuses + + +def test_valid_exact_max_days_range_succeeds_at_dispatch(tmp_path: Path) -> None: + # Exactly MAX_DATE_RANGE_DAYS is the boundary — one more would be rejected. + start = date(2026, 6, 1) + end = start + timedelta(days=MAX_DATE_RANGE_DAYS - 1) + log_path = tmp_path / "extractor.log" + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": str(start), "end_date": str(end)}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses + + +def test_range_spanning_rotated_and_live_file(tmp_path: Path) -> None: + # Covers the common case: start_date = yesterday (rotated file), end_date = today (live file). + base = tmp_path / "extractor.log" + yesterday = _PAST_TODAY - timedelta(days=1) + base.write_bytes(b"today data") + (tmp_path / f"extractor.log.{yesterday.isoformat()}").write_bytes(b"yesterday data") + candidates, skipped = _build_candidate_files(base, yesterday, _PAST_TODAY, _PAST_TODAY) + assert skipped == [] + assert len(candidates) == 2 + rotated = next(c for c in candidates if c.log_date == yesterday) + live = next(c for c in candidates if c.log_date == _PAST_TODAY) + assert rotated.is_current is False + assert live.is_current is True + assert live.path == base From 3c56bd05aa8634e4b63123bdae7dc17cf54f84bf Mon Sep 17 00:00:00 2001 From: vikramlc Date: Mon, 29 Jun 2026 12:22:12 +0530 Subject: [PATCH 7/7] refactor(odin): Add extractor application config to action context --- .../unstable/core/_log_upload_action.py | 2 +- cognite/extractorutils/unstable/core/actions.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index c027e00c..ed7407f2 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -137,7 +137,7 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="invalid_date_range", ) - log_file_path = _resolve_log_file_path(ctx._extractor.application_config) + log_file_path = _resolve_log_file_path(ctx.application_config) if log_file_path is None: raise ActionError( "No file log handler configured; add a 'file' type log handler to enable log uploads", diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index 0193dc67..dd1c6f1a 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -4,8 +4,9 @@ import logging from collections.abc import Callable -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Generic +from cognite.extractorutils.unstable.configuration.models import ConfigType from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel from cognite.extractorutils.unstable.core.logger import CogniteLogger @@ -15,7 +16,7 @@ __all__ = ["ActionContext", "ActionError", "ActionTarget", "CustomAction"] -class ActionContext(CogniteLogger): +class ActionContext(Generic[ConfigType], CogniteLogger): """ Context for a custom action invocation. @@ -28,7 +29,7 @@ class ActionContext(CogniteLogger): def __init__( self, action: "CustomAction", - extractor: "Extractor", + extractor: "Extractor[ConfigType]", external_id: str, call_metadata: dict[str, str] | None = None, ) -> None: @@ -40,6 +41,11 @@ def __init__( self._logger = logging.getLogger(f"{self._extractor.EXTERNAL_ID}.action.{self._action.name.replace(' ', '')}") + @property + def application_config(self) -> ConfigType: + """The extractor's application configuration.""" + return self._extractor.application_config + def _new_error( self, level: ErrorLevel,