Skip to content

fix(streaming): CoalescingBuffer leaks CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever#418

Draft
eberki-scale wants to merge 1 commit into
mainfrom
endre/improve-buffer
Draft

fix(streaming): CoalescingBuffer leaks CPU — idle/orphaned ticker spins at 1/FLUSH_INTERVAL forever#418
eberki-scale wants to merge 1 commit into
mainfrom
endre/improve-buffer

Conversation

@eberki-scale

Copy link
Copy Markdown
Contributor

Summary

CoalescingBuffer (agentex/lib/core/services/adk/streaming.py) runs a per-instance background ticker that wakes every FLUSH_INTERVAL_S (50 ms) whether or not there is buffered data. Because one buffer + ticker is created per streaming task-message context, any buffer that outlives its stream without a clean close() becomes an orphaned task that polls at 20 Hz forever. In a long-lived worker that handles many streaming tasks, these accumulate and ratchet CPU up until a core is saturated, with no memory growth and no log output, clearing only on process restart.

Symptoms

  • Worker CPU climbs steadily with cumulative streamed tasks and never recovers between idle periods (e.g. ~0.04 cores fresh → ~0.55 after a load run → pinned at the CPU limit after sustained traffic). Latency degrades in lockstep as the dead tickers steal the event loop / GIL from live work.
  • Memory stays flat; nothing is logged.
  • A process restart resets it.

Root cause

# CoalescingBuffer._run (before)
while True:
    try:
        await asyncio.wait_for(self._flush_signal.wait(), timeout=self.FLUSH_INTERVAL_S)
    except asyncio.TimeoutError:
        pass
    async with self._lock:
        self._flush_signal.clear()
        drained = self._drain_locked()   # empty when idle
    ...
    if self._closed:
        return

When the buffer is empty and _closed is False, the wait_for times out every 50 ms, drains nothing, and loops — a permanent 20 Hz busy-loop. The loop only exits on _closed, which is set by close(). If close() never runs, or is interrupted by cancellation while awaiting the ticker, the ticker is orphaned and spins indefinitely. With N orphaned/idle buffers, the event loop spends most of its time arming/cancelling TimerHandles.

Evidence (py-spy on an affected worker)

py-spy top:  GIL 69%, Active 73%
 41%  _run (asyncio/events.py)            # event loop running timer callbacks
 22%  _run (.../adk/streaming.py)         # CoalescingBuffer._run
 17%  wait_for (asyncio/tasks.py)
 11%  reschedule/__aenter__ (asyncio/timeouts.py)
  9%  __init__ (asyncio/events.py)        # TimerHandle churn

py-spy dump (event loop thread):
    wait_for (asyncio/tasks.py:506)
    _run (.../adk/streaming.py:190)       # the 50 ms wait_for

CPU was ~linearly proportional to the number of streamed tasks since the last restart; terminating the in-flight tasks/workflows did not release it (the leak is the worker-process asyncio tasks, independent of task/workflow state).

Fix

Replace the single _flush_signal + fixed-interval wait with two events so the ticker can park at zero CPU when idle:

  • _wake — set by add() only when the buffer goes empty → non-empty. _run blocks on await self._wake.wait() when idle (no polling).
  • _flush_now — set on first delta / size threshold / close → immediate flush, bypassing the coalescing delay.

_run parks on _wake; on wake it flushes immediately if _flush_now is set, otherwise coalesces for up to FLUSH_INTERVAL_S, drains, re-arms _wake if data arrived during the flush, and exits on _closed. close() additionally force-cancels the ticker if close() itself is cancelled, so it can never be orphaned on the cancellation path.

Behaviour preserved: first-delta-immediate flush (latency-critical), the MAX_BUFFERED_CHARS early flush, the FLUSH_INTERVAL_S coalescing window for trailing partials, and the exactly-once final drain on close(). The only difference: an idle or orphaned buffer parks instead of polling, and under light streaming (buffer empties between deltas) a delta may flush slightly sooner — coalescing under sustained streaming is unchanged.

Tests

  • New TestCoalescingBufferIdleParks:

    • test_idle_buffer_does_not_spin — no data added → 0 drain cycles over ~8 windows (was ~8).
    • test_orphaned_buffer_parks_after_flush — buffer flushed then never closed0 drain cycles afterward.

    Both fail against the old code (~8) and pass against the fix.

  • Existing streaming suite unchanged: 33 passed (tests/lib/core/services/adk/test_streaming.py).

Risk

Low. Hot-path semantics (first-flush, size/time coalescing, exactly-once close drain) are covered by the existing 31 tests and unchanged. The change is local to CoalescingBuffer.

@github-actions

Copy link
Copy Markdown

This PR is targeting main, but PRs should target the next branch by default.

The main branch is reserved for release-please and Stainless automation. To resolve, pick one of:

  • Re-target the PR to next (recommended). On the PR page, click Edit next to the title and change the base branch to next.
  • Add the target-main label if this is an intentional exception (e.g. an urgent hotfix). The check will re-run and pass.

See CONTRIBUTING.md for the full branch model.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant