-
Notifications
You must be signed in to change notification settings - Fork 6
feat(odin): Implement BoundedReader for safe current-day file streaming #543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vikramlc-cognite
wants to merge
6
commits into
master
Choose a base branch
from
EDGE-607-implement-bounded-reader
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+190
−0
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
82d8b6d
feat(odin): Implement bounded reader
vikramlc a20c8c9
refactor(odin): Address gemini comments
vikramlc 1019b85
refactor(odin): Rename parameter
vikramlc 021210e
Merge branch 'master' into EDGE-607-implement-bounded-reader
Yaseen-A-Khan ea58e9b
Merge branch 'master' into EDGE-607-implement-bounded-reader
vikramlc-cognite f903939
test(odin): Added tests
vikramlc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| """Bounded binary reader for point-in-time log file uploads.""" | ||
|
|
||
| from typing import BinaryIO | ||
|
|
||
|
|
||
| class BoundedReader: | ||
| """ | ||
| Wraps a binary file handle and limits reads to a byte count captured at snapshot time. | ||
|
|
||
| Implements ``__len__`` so ``requests.utils.super_len()`` bypasses ``os.fstat()`` | ||
| and declares the correct HTTP ``Content-Length``. This is critical when uploading | ||
| the active log file (``file.log``) while the logger is still appending to it: | ||
| without the cap, the upload reads past the declared length and httpx raises | ||
| ``LocalProtocolError: "Too much data for declared Content-Length"``. | ||
|
|
||
| Usage:: | ||
|
|
||
| snapshot_size = os.path.getsize(log_path) | ||
| with BoundedReader(open(log_path, "rb"), snapshot_size) as reader: | ||
| upload_queue.add_io_to_upload_queue(file_meta, lambda: reader, ...) | ||
| """ | ||
|
|
||
| def __init__(self, stream: BinaryIO, max_bytes: int) -> None: | ||
| self._stream = stream | ||
| self._size = max_bytes | ||
| self._remaining = max_bytes | ||
|
|
||
| def __len__(self) -> int: | ||
| # super_len() checks __len__ before os.fstat — returning the fixed snapshot | ||
| # size here ensures Content-Length is declared at the moment of the snapshot, | ||
| # not at the moment the upload resolves the inode size (which may have grown). | ||
| return self._size | ||
|
|
||
| def tell(self) -> int: | ||
| return self._size - self._remaining | ||
|
|
||
| @property | ||
| def closed(self) -> bool: | ||
| return self._stream.closed | ||
|
|
||
| def close(self) -> None: | ||
| self._stream.close() | ||
|
|
||
| def read(self, size: int = -1) -> bytes: | ||
| if self._remaining <= 0: | ||
| return b"" | ||
| to_read = self._remaining if size < 0 else min(size, self._remaining) | ||
| data = self._stream.read(to_read) | ||
| self._remaining -= len(data) | ||
| return data | ||
|
|
||
| def __enter__(self) -> "BoundedReader": | ||
| return self | ||
|
|
||
| def __exit__(self, *_: object) -> None: | ||
| self.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| from pathlib import Path | ||
|
|
||
| import pytest | ||
| from requests.utils import super_len | ||
|
|
||
| from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader | ||
|
|
||
|
|
||
| def _make_file(tmp_path: Path, content: bytes) -> Path: | ||
| p = tmp_path / "f.log" | ||
| p.write_bytes(content) | ||
| return p | ||
|
|
||
|
|
||
| def test_len_returns_snapshot_size_and_is_stable_after_reads(tmp_path: Path) -> None: | ||
| path = _make_file(tmp_path, b"hello world") | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 5) | ||
| assert len(reader) == 5 | ||
| reader.read(3) | ||
| assert len(reader) == 5 # must not decrement with reads | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( | ||
| "snapshot,read_arg,expected", | ||
| [ | ||
| (5, -1, b"hello"), # read() with no arg reads up to snapshot | ||
| (5, 3, b"hel"), # read(n) where n < remaining | ||
| (5, 10, b"hello"), # read(n) where n > snapshot — capped at snapshot | ||
| ], | ||
| ids=["read_all", "read_partial", "read_exceeds_snapshot"], | ||
| ) | ||
| def test_read_respects_snapshot_bound(tmp_path: Path, snapshot: int, read_arg: int, expected: bytes) -> None: | ||
| path = _make_file(tmp_path, b"hello world") # 11 bytes — always larger than snapshot | ||
| with open(path, "rb") as f: | ||
| assert BoundedReader(f, snapshot).read(read_arg) == expected | ||
|
|
||
|
|
||
| def test_read_returns_empty_after_exhaustion(tmp_path: Path) -> None: | ||
| path = _make_file(tmp_path, b"hello world") | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 5) | ||
| reader.read() | ||
| assert reader.read() == b"" | ||
| assert reader.read(100) == b"" | ||
|
|
||
|
|
||
| def test_zero_snapshot_returns_empty_immediately(tmp_path: Path) -> None: | ||
| path = _make_file(tmp_path, b"data") | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 0) | ||
| assert len(reader) == 0 | ||
| assert reader.read() == b"" | ||
|
|
||
|
|
||
| def test_context_manager_closes_underlying_file(tmp_path: Path) -> None: | ||
| path = _make_file(tmp_path, b"data") | ||
| f = open(path, "rb") # noqa: SIM115 | ||
| with BoundedReader(f, 4): | ||
| pass | ||
| assert f.closed | ||
|
|
||
|
|
||
| def test_super_len_reads_len_not_fstat(tmp_path: Path) -> None: | ||
| # Verify that requests.utils.super_len() uses __len__ (snapshot) rather than | ||
| # os.fstat().st_size (live inode size). The file is intentionally larger than | ||
| # the declared snapshot to confirm fstat is not consulted. | ||
| path = _make_file(tmp_path, b"hello world extended") # 20 bytes on disk | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 5) | ||
| assert super_len(reader) == 5 | ||
|
|
||
|
|
||
| def test_midnight_rotation_race_file_shorter_than_snapshot(tmp_path: Path) -> None: | ||
| # If TimedRotatingFileHandler rotates between snapshot and file open, the new | ||
| # file.log is empty. BoundedReader returns b"" — the upload layer sees "too little | ||
| # data" against the declared Content-Length. This is acceptable; the caller retries. | ||
| path = _make_file(tmp_path, b"") | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 1_000) | ||
| assert len(reader) == 1_000 # snapshot still declared | ||
| assert reader.read() == b"" # but file is empty | ||
|
|
||
|
|
||
| def test_tell_tracks_bytes_consumed(tmp_path: Path) -> None: | ||
| path = _make_file(tmp_path, b"hello world") | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 8) | ||
| assert reader.tell() == 0 | ||
| reader.read(3) | ||
| assert reader.tell() == 3 | ||
| reader.read(5) | ||
| assert reader.tell() == 8 | ||
|
|
||
|
|
||
| def test_stream_shorter_than_snapshot_partial(tmp_path: Path) -> None: | ||
| # File has 5 bytes but snapshot declares 10 (e.g. file was truncated after snapshot). | ||
| # _remaining decrements by len(data) not to_read, so tell() reflects actual bytes read. | ||
| path = _make_file(tmp_path, b"hello") | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 10) | ||
| assert len(reader) == 10 # snapshot still declared | ||
| data = reader.read(10) | ||
| assert data == b"hello" # only 5 bytes available | ||
| assert reader.tell() == 5 # tracks actual bytes, not requested | ||
| assert reader.read() == b"" | ||
|
|
||
|
|
||
| def test_read_multiple_calls_clamps_final_chunk(tmp_path: Path) -> None: | ||
| path = _make_file(tmp_path, b"hello world") | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 7) | ||
| assert reader.read(3) == b"hel" | ||
| assert reader.read(3) == b"lo " | ||
| assert reader.read(3) == b"w" # only 1 byte left — clamped by min(size, remaining) | ||
| assert reader.read(3) == b"" | ||
|
|
||
|
|
||
| def test_read_all_after_partial_consumption(tmp_path: Path) -> None: | ||
| path = _make_file(tmp_path, b"hello") | ||
| with open(path, "rb") as f: | ||
| reader = BoundedReader(f, 5) | ||
| assert reader.read(3) == b"hel" | ||
| assert reader.read(-1) == b"lo" # read() after partial — returns only remaining 2 bytes | ||
|
|
||
|
|
||
| def test_close_and_closed_property(tmp_path: Path) -> None: | ||
| path = _make_file(tmp_path, b"data") | ||
| f = open(path, "rb") # noqa: SIM115 | ||
| reader = BoundedReader(f, 4) | ||
| assert not reader.closed | ||
| reader.close() | ||
| assert reader.closed | ||
| assert f.closed | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.