Skip to content

feat(openai-agents): migrate onto the unified harness surface#416

Open
declan-scale wants to merge 2 commits into
declan-scale/unified-harness-surfacefrom
declan-scale/pr6-openai
Open

feat(openai-agents): migrate onto the unified harness surface#416
declan-scale wants to merge 2 commits into
declan-scale/unified-harness-surfacefrom
declan-scale/pr6-openai

Conversation

@declan-scale

@declan-scale declan-scale commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

What

PR 6 of the unified-harness-surface series: migrate the OpenAI Agents SDK integration onto the shared harness surface.

Library

  • OpenAITurn (src/agentex/lib/adk/providers/_modules/openai_turn.py): a HarnessTurn adapter that wraps a Runner.run_streamed result. It converts the SDK's native events into the canonical StreamTaskMessage* stream via the existing convert_openai_to_agentex_events, and after the stream is exhausted reads result.raw_responses to aggregate per-response usage into a provider-independent TurnUsage.

    • openai_usage_to_turn_usage(usage, model) maps agents.Usage -> TurnUsage with defensive getattr access so present-but-zero values (e.g. 0 output tokens on a cache hit) survive as 0, not None.
    • _aggregate_usage(raw_responses) sums usage across ModelResponses via Usage.add, skipping responses without usage.
    • Accepts either result= (a streamed run) or stream= (a pre-built canonical stream, for tests); raises ValueError if neither. coalesce_tool_requests is a no-op kept for API parity.
  • OpenAIService.run_agent_streamed_auto_send: replaced the ~270-line inline streaming/reasoning/span loop with UnifiedEmitter.auto_send_turn(OpenAITurn(result=result, model=model)). Guardrail tripwire handling and the RunResultStreaming return type are preserved. The created_at first-message ordering limitation under the unified path is documented in a comment. OpenAITurn is imported lazily inside the method to avoid a circular import at package init.

  • SyncStreamingModel / SyncStreamingProvider: docstring-deprecated (no runtime warning), pointing at the harness pattern.

Tests

  • tests/lib/adk/providers/test_openai_turn.py: usage mapping (full / None / real zeros), _aggregate_usage (empty / single / multiple), events driven by an injected canonical stream, usage() before/after exhaustion (including the result-backed path), and the ValueError guard.
  • tests/lib/core/harness/conformance/test_openai_conformance.py: text-only, tool-call, reasoning, and multi-step canonical fixtures; registers module-locally and parametrizes over its own list to avoid the cross-module global-registry hazard.
  • tests/lib/adk/providers/test_openai_activities.py: updated the streamed-auto-send activity test to the new contract (full tool messages are posted by opening a context with initial_content and closing it, no stream_update).

Tutorials

Three tutorials demonstrating the same OpenAITurn across delivery modes, each with an offline test (no server / Redis / Temporal / API key required):

  • examples/tutorials/00_sync/060_harness_openaiUnifiedEmitter.yield_turn
  • examples/tutorials/10_async/00_base/130_harness_openaiUnifiedEmitter.auto_send_turn
  • examples/tutorials/10_async/10_temporal/140_harness_openaiauto_send_turn inside a custom Temporal activity

Verification

  • ./scripts/lint — clean (ruff + pyright, 0 errors)
  • Full tests/ suite — 1016 passed, 1376 skipped
  • All three tutorial offline tests pass individually

🤖 Generated with Claude Code

Greptile Summary

This PR migrates the OpenAI Agents SDK integration onto the shared unified harness surface, replacing the ~270-line inline streaming/reasoning/span loop in run_agent_streamed_auto_send with OpenAITurn + UnifiedEmitter.auto_send_turn. Three tutorial directories (sync yield, async auto-send, Temporal activity) are added alongside unit tests and conformance fixtures.

  • OpenAITurn is a new HarnessTurn adapter that converts Runner.run_streamed events into the canonical StreamTaskMessage* stream and lazily aggregates per-response token usage from raw_responses after stream exhaustion.
  • OpenAIService.run_agent_streamed_auto_send is simplified from a hand-rolled per-item streaming context loop to a single UnifiedEmitter.auto_send_turn(OpenAITurn(...)) call; guardrail tripwire handling and RunResultStreaming return type are preserved.
  • SyncStreamingModel / SyncStreamingProvider receive docstring-level deprecation notices pointing to the harness pattern.

Confidence Score: 4/5

The core streaming refactor is clean, but token usage will always be empty in TurnResult — the emitter captures it before the stream is consumed.

The ~270-line inline loop is cleanly replaced and all observable behaviour (message delivery, guardrail handling, created_at threading) is preserved. One defect stands out: UnifiedEmitter.auto_send_turn evaluates turn.usage() eagerly — before consuming turn.events — so the lazy raw_responses aggregation that OpenAITurn performs after stream exhaustion never makes it into TurnResult.usage. No existing callers in this PR read turn_result.usage, so the regression is silent today, but any downstream consumer of token usage from the auto-send path will receive null counts. The fix is a two-line change in emitter.py.

src/agentex/lib/core/harness/emitter.py (not in diff) — auto_send_turn needs to call turn.usage() after awaiting auto_send, not before.

Important Files Changed

Filename Overview
src/agentex/lib/adk/providers/_modules/openai_turn.py New OpenAITurn adapter; correctly implements lazy usage (post-stream), but this lazy pattern is incompatible with how emitter.auto_send_turn eagerly captures turn.usage() before exhausting events, leaving TurnResult.usage always empty on the auto_send path.
src/agentex/lib/core/services/adk/providers/openai.py ~270-line inline streaming loop replaced with OpenAITurn + UnifiedEmitter.auto_send_turn; guardrail handling preserved; previous_response_id bug and streaming cleanup are flagged in earlier review threads.
src/agentex/lib/adk/providers/_modules/sync_provider.py Only formatting changes (single→double quotes, blank lines); SyncStreamingModel and SyncStreamingProvider marked deprecated via docstring; no functional changes.
tests/lib/adk/providers/test_openai_turn.py Good coverage of usage mapping (full/None/zeros), _aggregate_usage, events forwarding, and the ValueError guard; tests usage() directly on the turn object rather than through auto_send_turn, so the emitter evaluation-order bug is not caught.
tests/lib/core/harness/conformance/test_openai_conformance.py Module-local fixture list avoids the global-registry ordering hazard; covers text, tool-call, reasoning, and multi-step canonical sequences for span-derivation determinism.
tests/lib/adk/providers/test_openai_activities.py Updated to the unified harness contract; new test verifies created_at threading; raw_responses=[] added to mocks so OpenAITurn usage aggregation does not AttributeError.
examples/tutorials/10_async/10_temporal/140_harness_openai/project/activities.py Correctly wires OpenAITurn + UnifiedEmitter; streaming=None intentionally defers to adk.streaming; multi-turn history concern is already noted in a prior review thread.
examples/tutorials/10_async/00_base/130_harness_openai/project/acp.py Clean async ACP tutorial; correctly omits streaming= (uses real adk.streaming in production); only uses final_text from TurnResult, so empty usage does not affect this example.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Caller as Activity / ACP Handler
    participant Emitter as UnifiedEmitter
    participant Turn as OpenAITurn
    participant AutoSend as auto_send()
    participant Runner as Runner.run_streamed
    participant Streaming as adk.streaming

    Caller->>Runner: run_streamed(agent, input)
    Runner-->>Turn: RunResultStreaming (result)
    Caller->>Emitter: auto_send_turn(turn, created_at)
    Note over Emitter,Turn: turn.usage() evaluated HERE (pre-exhaustion)
    Emitter->>AutoSend: "auto_send(turn.events, usage=TurnUsage(model), ...)"
    loop For each canonical event
        AutoSend->>Turn: next event via _iter_events
        Turn->>Runner: stream_events()
        Runner-->>Turn: raw OpenAI event
        Turn-->>AutoSend: "StreamTaskMessage*"
        AutoSend->>Streaming: streaming_task_message_context(...)
    end
    Note over Turn: Stream exhausted, _usage updated from raw_responses
    AutoSend-->>Emitter: "TurnResult(final_text, usage=stale TurnUsage)"
    Emitter-->>Caller: TurnResult with empty token counts
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Caller as Activity / ACP Handler
    participant Emitter as UnifiedEmitter
    participant Turn as OpenAITurn
    participant AutoSend as auto_send()
    participant Runner as Runner.run_streamed
    participant Streaming as adk.streaming

    Caller->>Runner: run_streamed(agent, input)
    Runner-->>Turn: RunResultStreaming (result)
    Caller->>Emitter: auto_send_turn(turn, created_at)
    Note over Emitter,Turn: turn.usage() evaluated HERE (pre-exhaustion)
    Emitter->>AutoSend: "auto_send(turn.events, usage=TurnUsage(model), ...)"
    loop For each canonical event
        AutoSend->>Turn: next event via _iter_events
        Turn->>Runner: stream_events()
        Runner-->>Turn: raw OpenAI event
        Turn-->>AutoSend: "StreamTaskMessage*"
        AutoSend->>Streaming: streaming_task_message_context(...)
    end
    Note over Turn: Stream exhausted, _usage updated from raw_responses
    AutoSend-->>Emitter: "TurnResult(final_text, usage=stale TurnUsage)"
    Emitter-->>Caller: TurnResult with empty token counts
Loading

Comments Outside Diff (2)

  1. src/agentex/lib/adk/providers/_modules/sync_provider.py, line 564-572 (link)

    P2 Reasoning spans are missed

    The converter starts OpenAI reasoning output as TextContent, but the shared span derivation opens reasoning spans only when the start content has type reasoning. Real OpenAI reasoning streams therefore flow through as text starts, so the unified harness never derives the reasoning span that the new conformance fixture expects.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/adk/providers/_modules/sync_provider.py
    Line: 564-572
    
    Comment:
    **Reasoning spans are missed**
    
    The converter starts OpenAI reasoning output as `TextContent`, but the shared span derivation opens reasoning spans only when the start content has type `reasoning`. Real OpenAI reasoning streams therefore flow through as text starts, so the unified harness never derives the reasoning span that the new conformance fixture expects.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Claude Code

  2. src/agentex/lib/core/services/adk/providers/openai.py, line 794 (link)

    P1 run_agent_streamed_auto_send silently drops previous_response_id

    • Bug
      • previous_response_id is accepted as a parameter on line 681 but is never forwarded to Runner.run_streamed on lines 794-797. The migrated method uses only a 2-branch if/else (max_turns or not), while all three sibling methods use a 4-branch matrix that correctly forwards previous_response_id.
    • Cause
      • During the migration to the unified harness (OpenAITurn + UnifiedEmitter), the Runner.run_streamed call was simplified to 2 branches, dropping the previous_response_id forwarding. The # noqa: ARG002 annotation on line 681 suppressed the linter warning that would have caught the unused argument.
    • Fix
      • Replace the 2-branch if/else at lines 794-797 with the same 4-branch pattern used by run_agent_streamed (lines 632-646), forwarding previous_response_id to Runner.run_streamed when it is not None.
    Artifacts

    Supporting artifact from the T-Rex run

    • Contains supporting evidence from the run (text/markdown; charset=utf-8).

    View artifacts

    T-Rex Ran code and verified through T-Rex

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
src/agentex/lib/adk/providers/_modules/openai_turn.py:138-140
**`TurnResult.usage` is always empty when using `auto_send_turn`**

`UnifiedEmitter.auto_send_turn` evaluates `turn.usage()` as an eager argument before passing `events` to `auto_send` — i.e. `auto_send(..., usage=turn.usage(), ...)` captures the pre-exhaustion value of `TurnUsage(model=model)` with all token counts `None`/`0`. `_iter_events` updates `self._usage` only after the stream is fully consumed (the code below `yield event`), but by then `auto_send` has already been given the old reference and uses it verbatim in the returned `TurnResult`. As a result, every `TurnResult` from the `auto_send` path carries empty token counts regardless of what `raw_responses` contains.

The `HarnessTurn` protocol itself documents the contract: "`usage()` valid only after `events` is exhausted." `auto_send_turn` in `emitter.py` violates this. The fix is there — exhaust events first, then read usage:

```python
# in emitter.py auto_send_turn
result = await auto_send(turn.events, ..., usage=None, ...)
return TurnResult(final_text=result.final_text, usage=turn.usage())
```

This PR is the first to introduce a turn whose usage is populated lazily (post-stream), so the bug surfaces here even though the emitter is pre-existing.

Reviews (3): Last reviewed commit: "fix(openai-agents): restore created_at d..." | Re-trigger Greptile

logger.info(f"Running harness OpenAI agent for task {params.task_id}")

agent = create_agent()
result = Runner.run_streamed(starting_agent=agent, input=params.user_message)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Multi-turn conversation history lost in 140_harness_openai tutorial

  • Bug
    • Each turn creates a fresh agent and passes only the latest user message to Runner.run_streamed, so the LLM has no memory of prior turns.
  • Cause
    • The workflow accumulates no _messages list and RunHarnessAgentParams has no history field; activities.py:47 passes a bare string instead of a message list.
  • Fix
    • Add a _messages list to the workflow (like 130_langgraph), include it in RunHarnessAgentParams, append the assistant reply after each turn, and pass the full list to Runner.run_streamed.
Artifacts

Supporting artifact from the T-Rex run

  • Contains supporting evidence from the run (text/markdown; charset=utf-8).

View artifacts

T-Rex Ran code and verified through T-Rex

declan-scale and others added 2 commits June 18, 2026 17:03
Add OpenAITurn, a HarnessTurn adapter that wraps an OpenAI Agents SDK
streamed run (Runner.run_streamed) and converts its native events into the
canonical StreamTaskMessage* stream via convert_openai_to_agentex_events,
aggregating per-response usage into a provider-independent TurnUsage after
stream exhaustion. Defensive getattr access preserves real zeros.

Refactor OpenAIService.run_agent_streamed_auto_send to drive delivery,
tracing, and usage through UnifiedEmitter.auto_send_turn(OpenAITurn(...)),
replacing the ~270-line inline streaming loop. Guardrail tripwire handling
and the RunResultStreaming return type are preserved; the created_at
first-message ordering limitation under the unified path is documented.

Docstring-deprecate SyncStreamingModel/SyncStreamingProvider (no runtime
warning).

Add unit tests for OpenAITurn + usage mapping, OpenAI conformance fixtures
(module-local registry), update the streamed-auto-send activity test to the
new full-message contract, and add three tutorials (sync 060, async 130,
temporal 140) demonstrating OpenAITurn with yield_turn / auto_send_turn,
each with an offline test.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…378)

Thread the workflow-supplied created_at through
UnifiedEmitter.auto_send_turn(turn, created_at=created_at) so the first
agent message of the turn is stamped with the deterministic timestamp
(e.g. workflow.now()) just as the original inline loop did before the
unified-harness migration.

The foundation (b4b8b33) wired auto_send_turn to accept and forward
created_at to every streaming_task_message_context call. This commit
connects the call site in run_agent_streamed_auto_send to that new
parameter, restoring the behaviour that the migration comment documented
as a known trade-off.

Update the stale limitation comment to reflect the fix. Add
test_run_agent_streamed_auto_send_forwards_created_at, which drives
the activity through a fake stream with a pinned datetime and asserts
every streaming context receives that datetime.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@declan-scale declan-scale force-pushed the declan-scale/pr6-openai branch from d1c5c65 to ab92b50 Compare June 18, 2026 21:09
@declan-scale

Copy link
Copy Markdown
Contributor Author

@greptile review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant