Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9d34837
feat(odin): Register fetch logs action and validate input dates
vikramlc Jun 19, 2026
a480f65
fix(odin): Fix lint issue and added type error
vikramlc Jun 19, 2026
8670c5e
feat(odin): Build candidate log file list
vikramlc Jun 19, 2026
471e5e1
refactor(odin): Fix gemini comments
vikramlc Jun 19, 2026
acbecd5
refactor(odin): Add logger and validate actions order
vikramlc Jun 24, 2026
90a2cbd
refactor(odin): Added logging and renaming fields
vikramlc Jun 24, 2026
03be2d1
refactor(odin): Resolve merge conflicts
vikramlc Jun 24, 2026
eca3f93
Merge branch 'EDGE-607-implement-bounded-reader' into EDG-371-upload-…
vikramlc Jun 25, 2026
97a064e
feat(odin): Add support for streaming log files to cdf files
vikramlc Jun 25, 2026
d99a0cb
refactor(odin): Resolve merge conflicts
vikramlc Jun 29, 2026
6598517
Merge branch 'EDGE-607-implement-bounded-reader' into EDG-371-upload-…
vikramlc Jun 29, 2026
556aee1
refactor(odin): Remove duplicate code snippets
vikramlc Jun 29, 2026
c028fac
Merge branch 'EDGE-607-implement-bounded-reader' into EDG-371-upload-…
vikramlc Jun 29, 2026
0ab05e9
refactor(odin): Add with open for optimisation
vikramlc Jun 29, 2026
3d85fbf
refactor(odin): Remove thread pooling and refactor code
vikramlc Jun 30, 2026
c308409
refactor(odin): Remove unnecessary code
vikramlc Jun 30, 2026
7164668
fix(odin): Add seek in bounded reader and add tests
vikramlc Jul 1, 2026
bd85924
Merge branch 'EDGE-607-implement-bounded-reader' into EDG-371-upload-…
vikramlc Jul 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cognite/extractorutils/unstable/core/_bounded_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ def __len__(self) -> int:
def tell(self) -> int:
return self._size - self._remaining

def seek(self, offset: int, whence: int = 0) -> int:
pos = self._stream.seek(offset, whence)
self._remaining = max(0, self._size - pos)
return pos

def seekable(self) -> bool:
return self._stream.seekable()

@property
def closed(self) -> bool:
return self._stream.closed
Expand Down
149 changes: 147 additions & 2 deletions cognite/extractorutils/unstable/core/_log_upload_action.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
"""Built-in ``fetch_logs`` action: streams rotated log files to CDF Files."""

import json
import logging
from collections import Counter
from dataclasses import dataclass
from datetime import date, timedelta, timezone
from datetime import datetime as dt
from pathlib import Path
from typing import BinaryIO, Literal

from cognite.client import CogniteClient

from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig
from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader
from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError

_logger = logging.getLogger(__name__)

MAX_DATE_RANGE_DAYS = 7
"""Maximum number of calendar days a single ``fetch_logs`` invocation may cover."""

MAX_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024 # 4 GiB — CDF single-request upload limit

_FETCH_LOGS_DESCRIPTION = (
f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation."
)
Expand All @@ -28,6 +36,15 @@ class LogFileCandidate:
is_current: bool # True when path is the live (unrotated) file.log


@dataclass
class _FileUploadResult:
log_date: date
file_external_id: str
status: Literal["uploaded", "skipped_too_large", "failed"]
size_bytes: int = 0
error: str | None = None


def _today_utc() -> date:
return dt.now(tz=timezone.utc).date()

Expand Down Expand Up @@ -100,6 +117,67 @@ def _build_candidate_files(
return candidates, skipped


def _file_external_id(integration_external_id: str, log_date: date) -> str:
return f"extractor-logs-{integration_external_id}-{log_date.isoformat()}"


def _upload_candidate(
candidate: LogFileCandidate,
integration_external_id: str,
cdf_client: CogniteClient,
snapshot_size: int | None,
) -> _FileUploadResult:
"""Upload one candidate log file to CDF Files. Returns a result regardless of success or failure."""
external_id = _file_external_id(integration_external_id, candidate.log_date)

try:
actual_size = snapshot_size if snapshot_size is not None else candidate.path.stat().st_size
except OSError as e:
return _FileUploadResult(
log_date=candidate.log_date, file_external_id=external_id, status="failed", error=str(e)
)

if actual_size > MAX_FILE_SIZE_BYTES:
_logger.warning(
"fetch_logs: skipping %s (%d bytes) — exceeds MAX_FILE_SIZE_BYTES (%d bytes)",
candidate.path.name,
actual_size,
MAX_FILE_SIZE_BYTES,
)
return _FileUploadResult(
log_date=candidate.log_date,
file_external_id=external_id,
status="skipped_too_large",
size_bytes=actual_size,
)

try:
with open(candidate.path, "rb") as f:
reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment]
Comment thread
vikramlc-cognite marked this conversation as resolved.
cdf_client.files.upload_bytes(
content=reader,
name=f"{external_id}.log",
external_id=external_id,
mime_type="text/plain",
overwrite=True,
)
_logger.info("fetch_logs: uploaded %s (%d bytes)", external_id, actual_size)
return _FileUploadResult(
log_date=candidate.log_date,
file_external_id=external_id,
status="uploaded",
size_bytes=actual_size,
)
except Exception as e:
_logger.error("fetch_logs: failed to upload %s — %s", external_id, e)
return _FileUploadResult(
log_date=candidate.log_date,
file_external_id=external_id,
status="failed",
error=str(e),
)


def fetch_logs_action(ctx: ActionContext) -> None:
"""Validate parameters and upload rotated log files for the requested date range to CDF Files."""
params = ctx.call_metadata or {}
Expand Down Expand Up @@ -144,11 +222,78 @@ def fetch_logs_action(ctx: ActionContext) -> None:
error_type="no_file_handler_configured",
)

candidates, skipped = _build_candidate_files(log_file_path, start_date, end_date, today)
candidates, skipped_dates = _build_candidate_files(log_file_path, start_date, end_date, today)
_logger.info(
"fetch_logs: %d candidate file(s) for %s to %s; %d date(s) skipped",
len(candidates),
start_date,
end_date,
len(skipped),
len(skipped_dates),
)

# Snapshot the current-day file size BEFORE starting uploads.
# This gives a fixed read ceiling for BoundedReader — bytes written after this
# point are excluded from the upload, preventing Content-Length mismatches.
snapshot_size: int | None = None
current_candidate = next((c for c in candidates if c.is_current), None)
if current_candidate is not None:
try:
snapshot_size = current_candidate.path.stat().st_size
_logger.info(
"fetch_logs: current-day snapshot %d bytes (%s)",
snapshot_size,
current_candidate.path.name,
)
except OSError as e:
_logger.warning("fetch_logs: could not snapshot current-day file size — %s", e)

integration_external_id = ctx._extractor.connection_config.integration.external_id
cdf_client = ctx._extractor.cognite_client

upload_results: list[_FileUploadResult] = [
_upload_candidate(
candidate,
integration_external_id,
cdf_client,
snapshot_size if candidate.is_current else None,
)
for candidate in candidates
]

counts = Counter(r.status for r in upload_results)

# Per-file entries: upload results (sorted by date) + missing dates (skipped by candidate builder)
files_list: list[dict[str, str]] = []
for r in upload_results:
entry: dict[str, str] = {
"date": str(r.log_date),
"file_external_id": r.file_external_id,
"status": r.status,
}
if r.size_bytes:
entry["size_bytes"] = str(r.size_bytes)
if r.error:
entry["error"] = r.error
files_list.append(entry)
files_list.extend(
{
"date": str(skipped_date),
"file_external_id": _file_external_id(integration_external_id, skipped_date),
"status": "skipped",
}
for skipped_date in sorted(skipped_dates)
)
files_list.sort(key=lambda e: e["date"])

total_skipped = len(skipped_dates) + counts["skipped_too_large"]

ctx.set_result(
f"{counts['uploaded']} of {num_days} log files uploaded to CDF Files",
metadata={
"total_files": str(num_days),
"uploaded_files": str(counts["uploaded"]),
"skipped_files": str(total_skipped),
"failed_files": str(counts["failed"]),
"files": json.dumps(files_list),
},
)
7 changes: 7 additions & 0 deletions cognite/extractorutils/unstable/core/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def __init__(
self._extractor = extractor
self.external_id = external_id
self.call_metadata = call_metadata
self._result_message: str | None = None
self._result_metadata: dict[str, str] | None = None

self._logger = logging.getLogger(f"{self._extractor.EXTERNAL_ID}.action.{self._action.name.replace(' ', '')}")

Expand All @@ -61,6 +63,11 @@ def _new_error(
task_name=task_name if task_name is not None else self._action.name,
)

def set_result(self, message: str, *, metadata: dict[str, str] | None = None) -> None:
"""Record the result for a successful action completion."""
self._result_message = message
self._result_metadata = metadata


class ActionError(Exception):
"""Deliberate action failure with structured metadata for Odin result reporting."""
Expand Down
7 changes: 6 additions & 1 deletion cognite/extractorutils/unstable/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,12 @@ def _handle_custom_action(self, action: Action) -> None:
try:
custom.target(ctx)
self._checkin_worker.queue_action_update(
ActionUpdate(external_id=action.external_id, status=ActionStatus.succeeded)
ActionUpdate(
external_id=action.external_id,
status=ActionStatus.succeeded,
result_message=ctx._result_message,
result_metadata=ctx._result_metadata,
)
)
except ActionError as e:
self._checkin_worker.queue_action_update(
Expand Down
15 changes: 15 additions & 0 deletions tests/test_unstable/test_action_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,18 @@ def test_start_registers_handle_actions_as_dispatcher() -> None:
registered = extractor._checkin_worker.set_action_dispatcher.call_args[0][0]
assert registered.__func__.__name__ == "_handle_actions"
assert registered.__self__ is extractor


def test_set_result_propagates_to_succeeded_action_update() -> None:
extractor = _make_extractor()

def action_with_result(ctx: ActionContext) -> None:
ctx.set_result("3 files uploaded", metadata={"total_files": "3", "uploaded_files": "3"})

extractor.add_action(CustomAction(name="upload", target=action_with_result))
action = Action(external_id="act-r", action_name="upload", status=ActionStatus.pending)
extractor._dispatch_single_action(action)
updates = [c[0][0] for c in extractor._checkin_worker.queue_action_update.call_args_list]
succeeded = next(u for u in updates if u.status == ActionStatus.succeeded)
assert succeeded.result_message == "3 files uploaded"
assert succeeded.result_metadata == {"total_files": "3", "uploaded_files": "3"}
27 changes: 27 additions & 0 deletions tests/test_unstable/test_bounded_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,33 @@ def test_tell_tracks_bytes_consumed(tmp_path: Path) -> None:
assert reader.tell() == 8


def test_seek_rewinds_for_retry(tmp_path: Path) -> None:
path = _make_file(tmp_path, b"hello world")
with open(path, "rb") as f:
reader = BoundedReader(f, 8)
assert reader.read(5) == b"hello"
assert reader.seek(0) == 0
assert reader.tell() == 0
assert reader.read(5) == b"hello"


def test_seek_relative(tmp_path: Path) -> None:
path = _make_file(tmp_path, b"hello world")
with open(path, "rb") as f:
reader = BoundedReader(f, 8)
reader.read(3)
assert reader.seek(2, 1) == 5
assert reader.tell() == 5
assert reader.read(3) == b" wo" # bytes 5-7 of b"hello world" within snapshot of 8


def test_seekable_mirrors_underlying_stream(tmp_path: Path) -> None:
path = _make_file(tmp_path, b"data")
with open(path, "rb") as f:
reader = BoundedReader(f, 4)
assert reader.seekable() == f.seekable()


def test_stream_shorter_than_snapshot_partial(tmp_path: Path) -> None:
# File has 5 bytes but snapshot declares 10 (e.g. file was truncated after snapshot).
# _remaining decrements by len(data) not to_read, so tell() reflects actual bytes read.
Expand Down
Loading