Skip to content

feat(streaming): stream tool call argument deltas in TemporalStreamingModel#355

Open
vkalmathscale wants to merge 6 commits into
nextfrom
vkalmath/stream-tool-call-arg-deltas
Open

feat(streaming): stream tool call argument deltas in TemporalStreamingModel#355
vkalmathscale wants to merge 6 commits into
nextfrom
vkalmath/stream-tool-call-arg-deltas

Conversation

@vkalmathscale

@vkalmathscale vkalmathscale commented May 12, 2026

Copy link
Copy Markdown

Summary

TemporalStreamingModel already streams text deltas and reasoning summary deltas to Redis via StreamingTaskMessageContext, but ResponseFunctionCallArgumentsDeltaEvent was being silently buffered into function_calls_in_progress[...]['arguments'] with no per-delta publish. Consumers only saw the completed tool call surface later (after the activity returned, via downstream hooks if any).

For write-heavy tools — write_file, apply_patch, anything that puts a 2–20KB string into a single argument — the model spends multiple seconds generating the argument body, and the UI sees nothing until the entire activity finishes. The result is a frozen UI followed by an abrupt jump when the activity returns.

This PR threads tool-call argument deltas through the same streaming machinery used for text and reasoning, riding on the CoalescingBuffer + StreamingMode infrastructure added in #333. The buffer's merge helpers already key on tool_call_id for ToolRequestDelta, so coalescing, mode dispatch, and opt-out are inherited from existing infra.

Design

TemporalStreamingModel now opens a streaming_task_message_context per function call (keyed off the call's output_index), with initial_content=ToolRequestContent(...) and the model's configured streaming_mode. Three event handlers participate:

Event Behavior added
ResponseOutputItemAddedEvent (type=function_call) Open the per-call streaming context and stash it on function_calls_in_progress[output_index]['context'].
ResponseFunctionCallArgumentsDeltaEvent Emit StreamTaskMessageDelta(delta=ToolRequestDelta(arguments_delta=..., tool_call_id=..., name=...)) into the per-call context. The coalescing buffer merges consecutive deltas with the same tool_call_id.
ResponseOutputItemDoneEvent (type=function_call) Parse the accumulated args (with a graceful empty-dict fallback on JSONDecodeError), emit a final StreamTaskMessageFull(content=ToolRequestContent(...)), and close the context.

End-of-loop cleanup defensively closes any function-call contexts that didn't see a Done event (truncated stream or mid-stream exception).

ModelResponse output is unchanged: output_items still receives the same complete ResponseFunctionToolCall. Activity determinism is unaffected — streaming is a side effect.

What this does NOT change

  • Text and reasoning streaming paths are untouched.
  • StreamingMode is already the on/off knob. No new flag. streaming_mode="off" suppresses tool-arg deltas the same way it suppresses text deltas. "per_token" publishes immediately; "coalesced" (default) batches at 50ms / 128 chars.
  • TemporalStreamingHooks.on_tool_start is unchanged. It still fires after the activity returns and still emits a ToolRequestContent Full message via the stream_lifecycle_content activity. See Caveats.

Caveats

  1. Overlap with TemporalStreamingHooks.on_tool_start. Users who pass TemporalStreamingHooks to Runner.run will now see two persisted task_messages per tool call: one created by the model (delta stream + final Full) and one created by the hook (Full only). Both land on the same Redis topic task:{task_id} with different parent_task_message.ids, so a default UI will render two cards for the same logical tool call.

    This needs a follow-up to decide which path owns the canonical ToolRequest emission. Options for review discussion:

    • Silence on_tool_start's Full emit when the model is also emitting (auto-detect via a workflow-instance flag, mirroring how _task_id / _trace_id are threaded today).
    • Remove on_tool_start's Full emit entirely in a follow-up major bump (the model becomes the single source of truth for ToolRequest events).

    Until that follow-up, users who want streamed tool args without duplicate emits should subclass TemporalStreamingHooks and override on_tool_start to a no-op.

  2. Coalescing windows still apply. With the default 50ms / 128-char window, tool args render in ~50ms-granularity chunks rather than per-token. This is the same tradeoff already made for text streaming in perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window) #333, and the right default for write-heavy tools (UX value is "watch the artifact appear", not "see each token").

  3. Malformed argument JSON. If the model produces invalid JSON for the args (truncated stream, hallucinated structure), the path logs a WARNING and emits arguments={} in the final ToolRequestContent. The raw delta stream is preserved on the consumer side regardless — only the structured final view falls back.

Test plan

  • Two new unit tests in test_streaming_model.py::TestStreamingModelFunctionCallArgsStreaming:
    • Happy path: well-formed args produce one streaming context opened with ToolRequestContent, one StreamTaskMessageDelta(ToolRequestDelta) per ArgumentsDelta event preserving the delta text, and one final StreamTaskMessageFull(ToolRequestContent) with parsed args.
    • Malformed args: emits arguments={} in the final Full and logs a WARNING.
  • Full test_streaming_model.py suite passes (42/42).
  • ruff check clean on both modified files.
  • Manual smoke: deploy to a dev environment with an agent that calls a write-heavy tool, confirm UI sees tool args streaming in coalesced batches.
  • Manual smoke: streaming_mode="off" suppresses tool-arg deltas (only the final persisted message exists on close).

cc reviewers familiar with #333's CoalescingBuffer design.

Greptile Summary

This PR threads tool-call argument deltas through the existing streaming machinery in TemporalStreamingModel, so write-heavy tools like write_file publish incremental ToolRequestDelta updates to Redis instead of buffering the entire argument body until the activity returns. The implementation follows the established CoalescingBuffer + StreamingMode pattern from #333.

  • Per-call context lifecycle: a streaming_task_message_context is opened on ResponseOutputItemAddedEvent, each ResponseFunctionCallArgumentsDeltaEvent emits a StreamTaskMessageDelta(ToolRequestDelta), and ResponseOutputItemDoneEvent emits a final StreamTaskMessageFull(ToolRequestContent) then closes the context with call_data['context'] = None in a finally block — correctly preventing the orphan-cleanup loop from double-closing.
  • Known caveat acknowledged: when TemporalStreamingHooks is also active, two ToolRequest task messages will be persisted per call (on_tool_start still fires); the PR flags this as a follow-up to resolve ownership of the canonical emission.
  • Test coverage: happy-path (delta-per-chunk + parsed final Full) and malformed-JSON fallback (empty-dict + WARNING log) are both exercised; close() call count is not asserted, leaving the double-close guard untested against future regressions.

Confidence Score: 4/5

Safe to merge for the streaming side-effect path; the actual tool-call execution and ModelResponse output are unchanged.

The core event-handling logic is correct and follows established patterns. The finally: call_data['context'] = None block properly prevents double-close, and JSON parse failures fall back gracefully. The main gap is that the new tests do not assert close() is called exactly once per function call, so the double-close guard has no regression test. The acknowledged TemporalStreamingHooks duplicate-emission caveat is a known UX concern, not a data-correctness issue.

The test file would benefit from a close() call-count assertion to lock in the double-close prevention; the production model file itself looks correct.

Important Files Changed

Filename Overview
src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py Adds per-function-call streaming contexts for tool argument deltas. Core logic is correct: context opens on ResponseOutputItemAddedEvent, deltas streamed, final Full emitted and context closed (with context=None set in a finally block to prevent double-close) on ResponseOutputItemDoneEvent. Orphan cleanup after the event loop is correctly guarded by the None check.
src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py Two new tests cover the happy path (delta-per-chunk + final Full with parsed args) and malformed JSON fallback. _install_real_task_message correctly replaces the shared MagicMock task_message to allow pydantic-validated StreamTaskMessage construction. Tests do not assert close() call count, leaving the double-close guard unverified.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant API as OpenAI Streaming API
    participant TSM as TemporalStreamingModel
    participant CTX as StreamingContext (per call)
    participant BUF as CoalescingBuffer
    participant REDIS as Redis (task:{task_id})

    API->>TSM: ResponseOutputItemAddedEvent (function_call)
    TSM->>CTX: "open context (initial_content=ToolRequestContent(args={}))"
    CTX->>REDIS: persist initial task_message

    loop N × ResponseFunctionCallArgumentsDeltaEvent
        API->>TSM: ArgumentsDelta(delta, output_index)
        TSM->>TSM: "call_data['arguments'] += delta"
        TSM->>CTX: stream_update(StreamTaskMessageDelta(ToolRequestDelta))
        CTX->>BUF: merge delta (keyed on tool_call_id)
        BUF-->>REDIS: publish (50ms / 128-char window or per-token)
    end

    API->>TSM: ResponseFunctionCallArgumentsDoneEvent
    TSM->>TSM: "call_data['arguments'] = final_args (canonical override)"

    API->>TSM: ResponseOutputItemDoneEvent (function_call)
    TSM->>TSM: "json.loads(raw_args) parsed_args (fallback={} on JSONDecodeError)"
    TSM->>CTX: stream_update(StreamTaskMessageFull(ToolRequestContent(parsed_args)))
    CTX->>REDIS: persist final message
    TSM->>CTX: "close() [finally: call_data['context']=None]"

    API->>TSM: ResponseCompletedEvent
    TSM->>TSM: "output_items = response.output (authoritative list)"

    Note over TSM: Orphan cleanup loop — context=None, no-op for clean calls
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant API as OpenAI Streaming API
    participant TSM as TemporalStreamingModel
    participant CTX as StreamingContext (per call)
    participant BUF as CoalescingBuffer
    participant REDIS as Redis (task:{task_id})

    API->>TSM: ResponseOutputItemAddedEvent (function_call)
    TSM->>CTX: "open context (initial_content=ToolRequestContent(args={}))"
    CTX->>REDIS: persist initial task_message

    loop N × ResponseFunctionCallArgumentsDeltaEvent
        API->>TSM: ArgumentsDelta(delta, output_index)
        TSM->>TSM: "call_data['arguments'] += delta"
        TSM->>CTX: stream_update(StreamTaskMessageDelta(ToolRequestDelta))
        CTX->>BUF: merge delta (keyed on tool_call_id)
        BUF-->>REDIS: publish (50ms / 128-char window or per-token)
    end

    API->>TSM: ResponseFunctionCallArgumentsDoneEvent
    TSM->>TSM: "call_data['arguments'] = final_args (canonical override)"

    API->>TSM: ResponseOutputItemDoneEvent (function_call)
    TSM->>TSM: "json.loads(raw_args) parsed_args (fallback={} on JSONDecodeError)"
    TSM->>CTX: stream_update(StreamTaskMessageFull(ToolRequestContent(parsed_args)))
    CTX->>REDIS: persist final message
    TSM->>CTX: "close() [finally: call_data['context']=None]"

    API->>TSM: ResponseCompletedEvent
    TSM->>TSM: "output_items = response.output (authoritative list)"

    Note over TSM: Orphan cleanup loop — context=None, no-op for clean calls
Loading

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py:936-1005
**`close()` call count not asserted — double-close regression not guarded**

The `finally: call_data['context'] = None` block in the production handler is the only barrier against double-close. The tests confirm that `stream_update` receives the right payloads but never assert that `ctx.close` is called exactly once. If that `finally` line is accidentally removed or the orphan-cleanup guard is loosened in a future refactor, the double-close would silently reappear with no failing test to catch it.

Adding `ctx.close.assert_called_once()` (or `assert ctx.close.call_count == 1`) at the end of both happy-path tests would lock in the invariant.

### Issue 2 of 2
src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py:723-749
**Streaming context opened even when `streaming_mode="off"` — always emits one persisted Full message**

With `streaming_mode="off"`, the coalescing buffer suppresses per-delta publishes but `streaming_task_message_context` is still opened and a `StreamTaskMessageFull` with the final `ToolRequestContent` is always emitted before `close()`. This means a stored `task_message` is written to Redis for every function call regardless of the mode setting.

This is consistent with the existing text-streaming path and the PR description calls it out, but it means `streaming_mode="off"` is a misnomer for tool calls — it silences incremental updates while still persisting a final message. Worth confirming this is the intended contract, particularly for callers who use `off` expecting zero tool-call emissions from this path.

Reviews (4): Last reviewed commit: "Merge remote-tracking branch 'origin/nex..." | Re-trigger Greptile

…gModel

Wire ResponseFunctionCallArgumentsDeltaEvent into the streaming layer
introduced in #333, so write-heavy tools (write_file, apply_patch) no
longer freeze the UI for the duration of argument generation.

The model now opens a per-function-call streaming context with a
ToolRequestContent placeholder, emits ToolRequestDelta updates for each
argument delta, and finalizes with a StreamTaskMessageFull containing
the parsed arguments on ResponseOutputItemDoneEvent. Coalescing and mode
dispatch are inherited from the existing streaming infrastructure -- no
new flags or surface area.

ModelResponse output is unchanged; activity determinism is unaffected.
End-of-loop cleanup defensively closes any function-call contexts that
didn't see a Done event (truncated stream or mid-stream exception).

Adds two tests covering the happy path (well-formed JSON args -> deltas
+ parsed Full) and the malformed-args fallback (invalid JSON -> empty
dict + WARNING log).
Logging raw_args[:200] could leak partial file contents, PII, or
secrets from write_file / apply_patch arguments into production log
pipelines. Switch to logging only bounded metadata (tool name + raw
arg byte count).

The existing malformed-args test still passes since it asserts on the
"Failed to parse tool call arguments" prefix, which is preserved.
@vkalmathscale vkalmathscale changed the base branch from main to next May 19, 2026 17:33
…call-arg-deltas

# Conflicts:
#	src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants