diff --git a/.github/workflows/harness-integration.yml b/.github/workflows/harness-integration.yml index ab6b353b9..c24b9cb78 100644 --- a/.github/workflows/harness-integration.yml +++ b/.github/workflows/harness-integration.yml @@ -7,6 +7,8 @@ on: paths: - "src/agentex/lib/core/harness/**" - "src/agentex/lib/adk/_modules/**" + - "tests/lib/core/harness/test_harness_pydantic_ai_*.py" + - "tests/lib/core/harness/test_harness_langgraph_*.py" - ".github/workflows/harness-integration.yml" jobs: @@ -31,10 +33,27 @@ jobs: - name: Conformance suite run: ./scripts/test tests/lib/core/harness/ -v - # Live integration matrix (harness x {sync, async, temporal}) is added per-harness - # in the migration plans. Placeholder job keeps the workflow valid until then. + # Offline LangGraph integration tests (sync / async / temporal channels). + # These use fake LangGraph streams + fake streaming/tracing and require no live + # infrastructure. Enabled here for PR 5 (LangGraph migration). live-matrix: runs-on: ubuntu-latest - if: false # enabled once the first harness's test agents land + strategy: + matrix: + channel: [sync, async, temporal] + fail-fast: false + name: langgraph-${{ matrix.channel }} steps: - - run: echo "populated by migration PRs" # TODO(harness-migration): enable per-harness; see docs/superpowers/plans migration PRs 4-8 + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Install uv + uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5.4.2 + with: + version: '0.10.2' + + - name: Bootstrap + run: ./scripts/bootstrap + + - name: langgraph ${{ matrix.channel }} integration tests (offline, fake stream) + run: | + ./scripts/test tests/lib/core/harness/test_harness_langgraph_${{ matrix.channel }}.py -v diff --git a/examples/tutorials/00_sync/harness_langgraph/Dockerfile b/examples/tutorials/00_sync/harness_langgraph/Dockerfile new file mode 100644 index 000000000..9d492198f --- /dev/null +++ b/examples/tutorials/00_sync/harness_langgraph/Dockerfile @@ -0,0 +1,50 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy pyproject.toml and README.md to install dependencies +COPY 00_sync/harness_langgraph/pyproject.toml /app/harness_langgraph/pyproject.toml +COPY 00_sync/harness_langgraph/README.md /app/harness_langgraph/README.md + +WORKDIR /app/harness_langgraph + +# Copy the project code +COPY 00_sync/harness_langgraph/project /app/harness_langgraph/project + +# Copy the test files +COPY 00_sync/harness_langgraph/tests /app/harness_langgraph/tests + +# Copy shared test utilities +COPY test_utils /app/test_utils + +# Install the required Python packages with dev dependencies +RUN uv pip install --system .[dev] + +# Set environment variables +ENV PYTHONPATH=/app + +# Set test environment variables +ENV AGENT_NAME=s-harness-langgraph + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/00_sync/harness_langgraph/README.md b/examples/tutorials/00_sync/harness_langgraph/README.md new file mode 100644 index 000000000..86367f162 --- /dev/null +++ b/examples/tutorials/00_sync/harness_langgraph/README.md @@ -0,0 +1,55 @@ +# Tutorial: Sync Harness LangGraph Agent + +This tutorial demonstrates how to build a **synchronous** LangGraph agent on AgentEx +using the **unified harness surface**: + +```python +turn = LangGraphTurn(stream, model=None) +emitter = UnifiedEmitter(task_id=task_id, trace_id=task_id, ...) +async for event in emitter.yield_turn(turn): + yield event +``` + +Compare with ``030_langgraph``, which uses the bespoke +``convert_langgraph_to_agentex_events`` helper directly. + +## Key Concepts + +### Unified Harness + +`LangGraphTurn` implements the `HarnessTurn` protocol: it wraps the raw +LangGraph `astream()` generator and exposes `events` (an async generator of +`TaskMessageUpdate`) and `usage()` (token counts captured from the final +`AIMessage`). + +`UnifiedEmitter.yield_turn(turn)` iterates the turn's events and yields them +to the sync ACP handler unchanged. The same `LangGraphTurn` object can also be +passed to `UnifiedEmitter.auto_send_turn` in the async/temporal channels. + +### AGX1-377 Note + +LangGraph emits tool requests as `StreamTaskMessageFull` events (from "updates" +node outputs). The `SpanDeriver` does not open tool spans from Full events +today; that gap is tracked in AGX1-373. + +## Files + +| File | Description | +|------|-------------| +| `project/acp.py` | ACP server using unified harness (LangGraphTurn + yield_turn) | +| `project/graph.py` | LangGraph state graph (identical to 030_langgraph) | +| `project/tools.py` | Tool definitions (weather example) | +| `tests/test_agent.py` | Integration tests | +| `manifest.yaml` | Agent configuration (name: s-harness-langgraph) | + +## Running Locally + +```bash +agentex agents run +``` + +## Running Tests + +```bash +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/00_sync/harness_langgraph/manifest.yaml b/examples/tutorials/00_sync/harness_langgraph/manifest.yaml new file mode 100644 index 000000000..1f57678f2 --- /dev/null +++ b/examples/tutorials/00_sync/harness_langgraph/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../ + include_paths: + - 00_sync/harness_langgraph + - test_utils + dockerfile: 00_sync/harness_langgraph/Dockerfile + dockerignore: 00_sync/harness_langgraph/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: sync + name: s-harness-langgraph + description: A sync LangGraph agent using the unified harness surface (LangGraphTurn + UnifiedEmitter.yield_turn) + + temporal: + enabled: false + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "s-harness-langgraph" + description: "A sync LangGraph agent using the unified harness surface" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/00_sync/harness_langgraph/project/__init__.py b/examples/tutorials/00_sync/harness_langgraph/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/00_sync/harness_langgraph/project/acp.py b/examples/tutorials/00_sync/harness_langgraph/project/acp.py new file mode 100644 index 000000000..a9b4cc5b2 --- /dev/null +++ b/examples/tutorials/00_sync/harness_langgraph/project/acp.py @@ -0,0 +1,100 @@ +"""ACP handler for sync harness LangGraph agent. + +Uses the unified harness surface: ``LangGraphTurn`` wraps the LangGraph +``astream()`` generator, and ``UnifiedEmitter.yield_turn`` converts it into +the AgentEx ``TaskMessageUpdate`` event stream expected by the sync ACP. + +Differences from ``030_langgraph`` (bespoke path): +- No ``create_langgraph_tracing_handler`` boilerplate. +- No manual text-delta accumulation for the span output. +- Tool calls are emitted as ``StreamTaskMessageFull`` (not Start+Delta+Done) + via the same code path as the async/temporal channels. +- Usage data (token counts) is captured on the ``LangGraphTurn`` object and + can be read after the turn completes. + +AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` +events (from "updates"). The ``SpanDeriver`` does not open tool spans from +Full events today; that gap is tracked in AGX1-373. +""" + +from __future__ import annotations + +import os +from typing import AsyncGenerator + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from project.graph import create_graph +from agentex.lib.types.acp import SendMessageParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import TaskMessageUpdate +from agentex.types.task_message_content import TaskMessageContent +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create(acp_type="sync") + +_graph = None + + +async def get_graph(): + """Get or create the compiled graph instance.""" + global _graph + if _graph is None: + _graph = await create_graph() + return _graph + + +@acp.on_message_send +async def handle_message_send( + params: SendMessageParams, +) -> TaskMessageContent | list[TaskMessageContent] | AsyncGenerator[TaskMessageUpdate, None]: + """Handle incoming messages, streaming tokens and tool calls via unified harness.""" + graph = await get_graph() + + task_id = params.task.id + user_message = params.content.content + + logger.info(f"Processing message for task {task_id}") + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + stream = graph.astream( + {"messages": [{"role": "user", "content": user_message}]}, + config={"configurable": {"thread_id": task_id}}, + stream_mode=["messages", "updates"], + ) + + turn = LangGraphTurn(stream, model=None) + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + async for event in emitter.yield_turn(turn): + yield event + + if turn_span: + turn_span.output = {"final_output": turn.usage().model_dump()} diff --git a/examples/tutorials/00_sync/harness_langgraph/project/graph.py b/examples/tutorials/00_sync/harness_langgraph/project/graph.py new file mode 100644 index 000000000..4516087d2 --- /dev/null +++ b/examples/tutorials/00_sync/harness_langgraph/project/graph.py @@ -0,0 +1,67 @@ +"""LangGraph graph definition for the harness_langgraph sync agent. + +Identical to ``030_langgraph/project/graph.py`` — the graph definition is not +affected by the harness migration. Only ``acp.py`` changes. +""" + +from __future__ import annotations + +from typing import Any, Annotated +from datetime import datetime +from typing_extensions import TypedDict + +from langgraph.graph import START, StateGraph +from langchain_openai import ChatOpenAI +from langgraph.prebuilt import ToolNode, tools_condition +from langchain_core.messages import SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS +from agentex.lib.adk import create_checkpointer + +MODEL_NAME = "gpt-5" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use tools when they would help answer the user's question +- If you're unsure, ask clarifying questions +- Always provide accurate information +""" + + +class AgentState(TypedDict): + """State schema for the agent graph.""" + + messages: Annotated[list[Any], add_messages] + + +async def create_graph(): + """Create and compile the agent graph with checkpointer.""" + llm = ChatOpenAI( + model=MODEL_NAME, + reasoning={"effort": "high", "summary": "auto"}, + ) + llm_with_tools = llm.bind_tools(TOOLS) + + checkpointer = await create_checkpointer() + + def agent_node(state: AgentState) -> dict[str, Any]: + """Process the current state and generate a response.""" + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system_content = SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + messages = [SystemMessage(content=system_content)] + messages + response = llm_with_tools.invoke(messages) + return {"messages": [response]} + + builder = StateGraph(AgentState) + builder.add_node("agent", agent_node) + builder.add_node("tools", ToolNode(tools=TOOLS)) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", tools_condition, "tools") + builder.add_edge("tools", "agent") + + return builder.compile(checkpointer=checkpointer) diff --git a/examples/tutorials/00_sync/harness_langgraph/project/tools.py b/examples/tutorials/00_sync/harness_langgraph/project/tools.py new file mode 100644 index 000000000..f02587430 --- /dev/null +++ b/examples/tutorials/00_sync/harness_langgraph/project/tools.py @@ -0,0 +1,24 @@ +"""Tool definitions for the harness_langgraph sync agent.""" + +from langchain_core.tools import Tool + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + return f"The weather in {city} is sunny and 72°F" + + +weather_tool = Tool( + name="get_weather", + func=get_weather, + description="Get the current weather for a city. Input should be a city name.", +) + +TOOLS = [weather_tool] diff --git a/examples/tutorials/00_sync/harness_langgraph/pyproject.toml b/examples/tutorials/00_sync/harness_langgraph/pyproject.toml new file mode 100644 index 000000000..deecd08b3 --- /dev/null +++ b/examples/tutorials/00_sync/harness_langgraph/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "s-harness-langgraph" +version = "0.1.0" +description = "A sync LangGraph agent using the unified harness surface" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "langgraph", + "langchain-openai", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/tutorials/00_sync/harness_langgraph/tests/test_agent.py b/examples/tutorials/00_sync/harness_langgraph/tests/test_agent.py new file mode 100644 index 000000000..2eb561cec --- /dev/null +++ b/examples/tutorials/00_sync/harness_langgraph/tests/test_agent.py @@ -0,0 +1,144 @@ +""" +Tests for the sync harness LangGraph agent. + +Validates the unified harness surface (LangGraphTurn + UnifiedEmitter.yield_turn) +end-to-end against a live AgentEx server. + +Configuration: +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) +- AGENT_NAME: Name of the agent to test (default: s-harness-langgraph) +""" + +import os + +import pytest +from test_utils.sync import validate_text_in_string, collect_streaming_response + +from agentex import Agentex +from agentex.types import TextContent, TextContentParam +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest, ParamsSendMessageRequest +from agentex.lib.sdk.fastacp.base.base_acp_server import uuid + +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "s-harness-langgraph") + + +@pytest.fixture +def client(): + return Agentex(base_url=AGENTEX_API_BASE_URL) + + +@pytest.fixture +def agent_name(): + return AGENT_NAME + + +@pytest.fixture +def agent_id(client, agent_name): + agents = client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingMessages: + def test_send_simple_message(self, client: Agentex, agent_name: str): + response = client.agents.send_message( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="Hello! What can you help me with?", + type="text", + ) + ), + ) + result = response.result + assert result is not None + assert len(result) >= 1 + + def test_tool_calling(self, client: Agentex, agent_name: str): + response = client.agents.send_message( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="What's the weather in San Francisco?", + type="text", + ) + ), + ) + result = response.result + assert result is not None + assert len(result) >= 1 + + def test_multiturn_conversation(self, client: Agentex, agent_name: str, agent_id: str): + task_response = client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + response1 = client.agents.send_message( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="My name is Alice. Remember that.", + type="text", + ), + task_id=task.id, + ), + ) + assert response1.result is not None + + response2 = client.agents.send_message( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="What is my name?", + type="text", + ), + task_id=task.id, + ), + ) + assert response2.result is not None + for message in response2.result: + if isinstance(message.content, TextContent): + validate_text_in_string("alice", message.content.content.lower()) + + +class TestStreamingMessages: + def test_stream_simple_message(self, client: Agentex, agent_name: str): + stream = client.agents.send_message_stream( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="Tell me a short joke.", + type="text", + ) + ), + ) + aggregated_content, chunks = collect_streaming_response(stream) + assert aggregated_content is not None + assert len(chunks) > 1, "No chunks received in streaming response." + + def test_stream_tool_calling(self, client: Agentex, agent_name: str): + stream = client.agents.send_message_stream( + agent_name=agent_name, + params=ParamsSendMessageRequest( + content=TextContentParam( + author="user", + content="What's the weather in New York?", + type="text", + ) + ), + ) + aggregated_content, chunks = collect_streaming_response(stream) + assert aggregated_content is not None + assert len(chunks) > 0, "No chunks received in streaming response." + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/Dockerfile b/examples/tutorials/10_async/00_base/harness_langgraph/Dockerfile new file mode 100644 index 000000000..3e0bd696a --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_langgraph/Dockerfile @@ -0,0 +1,50 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +# Copy pyproject.toml and README.md to install dependencies +COPY 10_async/00_base/harness_langgraph/pyproject.toml /app/harness_langgraph/pyproject.toml +COPY 10_async/00_base/harness_langgraph/README.md /app/harness_langgraph/README.md + +WORKDIR /app/harness_langgraph + +# Copy the project code +COPY 10_async/00_base/harness_langgraph/project /app/harness_langgraph/project + +# Copy the test files +COPY 10_async/00_base/harness_langgraph/tests /app/harness_langgraph/tests + +# Copy shared test utilities +COPY test_utils /app/test_utils + +# Install the required Python packages with dev dependencies +RUN uv pip install --system .[dev] pytest-asyncio httpx + +# Set environment variables +ENV PYTHONPATH=/app + +# Set test environment variables +ENV AGENT_NAME=a-harness-langgraph + +# Run the agent using uvicorn +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/README.md b/examples/tutorials/10_async/00_base/harness_langgraph/README.md new file mode 100644 index 000000000..7efe28207 --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_langgraph/README.md @@ -0,0 +1,57 @@ +# Tutorial: Async Harness LangGraph Agent + +This tutorial demonstrates how to build an **async** LangGraph agent on AgentEx +using the **unified harness surface**: + +```python +turn = LangGraphTurn(stream, model=None) +emitter = UnifiedEmitter(task_id=task_id, trace_id=task_id, ...) +result = await emitter.auto_send_turn(turn) +``` + +Compare with ``100_langgraph``, which uses the bespoke +``stream_langgraph_events`` helper directly. + +## Key Concepts + +### Unified Harness + +`LangGraphTurn` implements the `HarnessTurn` protocol: it wraps the raw +LangGraph `astream()` generator and exposes `events` (an async generator of +`TaskMessageUpdate`) and `usage()` (token counts captured from the final +`AIMessage`). + +`UnifiedEmitter.auto_send_turn(turn)` pushes each event to Redis via +`streaming_task_message_context`, accumulates the final text, and returns a +`TurnResult(final_text=..., usage=...)`. + +The same `LangGraphTurn` object can also be passed to +`UnifiedEmitter.yield_turn` in the sync channel. + +### AGX1-377 Note + +LangGraph emits tool requests as `StreamTaskMessageFull` events (from "updates" +node outputs). The `SpanDeriver` does not open tool spans from Full events +today; that gap is tracked in AGX1-373. + +## Files + +| File | Description | +|------|-------------| +| `project/acp.py` | ACP server using unified harness (LangGraphTurn + auto_send_turn) | +| `project/graph.py` | LangGraph state graph (identical to 100_langgraph) | +| `project/tools.py` | Tool definitions (weather example) | +| `tests/test_agent.py` | Integration tests | +| `manifest.yaml` | Agent configuration (name: a-harness-langgraph) | + +## Running Locally + +```bash +agentex agents run +``` + +## Running Tests + +```bash +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/manifest.yaml b/examples/tutorials/10_async/00_base/harness_langgraph/manifest.yaml new file mode 100644 index 000000000..bb19e25b3 --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_langgraph/manifest.yaml @@ -0,0 +1,58 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/00_base/harness_langgraph + - test_utils + dockerfile: 10_async/00_base/harness_langgraph/Dockerfile + dockerignore: 10_async/00_base/harness_langgraph/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + +agent: + acp_type: async + name: a-harness-langgraph + description: An async LangGraph agent using the unified harness surface (LangGraphTurn + UnifiedEmitter.auto_send_turn) + + temporal: + enabled: false + + credentials: + - env_var_name: OPENAI_API_KEY + secret_name: openai-api-key + secret_key: api-key + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + - env_var_name: SGP_API_KEY + secret_name: sgp-api-key + secret_key: api-key + - env_var_name: SGP_ACCOUNT_ID + secret_name: sgp-account-id + secret_key: account-id + - env_var_name: SGP_CLIENT_BASE_URL + secret_name: sgp-client-base-url + secret_key: url + +deployment: + image: + repository: "" + tag: "latest" + + global: + agent: + name: "a-harness-langgraph" + description: "An async LangGraph agent using the unified harness surface" + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/project/__init__.py b/examples/tutorials/10_async/00_base/harness_langgraph/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/project/acp.py b/examples/tutorials/10_async/00_base/harness_langgraph/project/acp.py new file mode 100644 index 000000000..a99395424 --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_langgraph/project/acp.py @@ -0,0 +1,109 @@ +"""ACP handler for async harness LangGraph agent. + +Uses the unified harness surface: ``LangGraphTurn`` wraps the LangGraph +``astream()`` generator, and ``UnifiedEmitter.auto_send_turn`` streams events +to Redis and returns a ``TurnResult`` with the accumulated final text. + +Differences from ``100_langgraph`` (bespoke path): +- No ``create_langgraph_tracing_handler`` boilerplate. +- ``stream_langgraph_events`` is replaced by + ``UnifiedEmitter.auto_send_turn(LangGraphTurn(stream))``. +- Tool calls/responses go through ``streaming_task_message_context`` + (same code path as text deltas), making the event stream channel-agnostic. +- Usage data (token counts) is captured on ``LangGraphTurn.usage()`` after + ``auto_send_turn`` returns. + +AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` +events (from "updates"). The ``SpanDeriver`` does not open tool spans from +Full events today; that gap is tracked in AGX1-373. +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +import agentex.lib.adk as adk +from project.graph import create_graph +from agentex.lib.types.acp import SendEventParams, CancelTaskParams, CreateTaskParams +from agentex.lib.types.fastacp import AsyncACPConfig +from agentex.lib.types.tracing import SGPTracingProcessorConfig +from agentex.lib.utils.logging import make_logger +from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from agentex.lib.core.tracing.tracing_processor_manager import add_tracing_processor_config + +logger = make_logger(__name__) + +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + sgp_base_url=os.environ.get("SGP_CLIENT_BASE_URL", ""), + ) +) + +acp = FastACP.create( + acp_type="async", + config=AsyncACPConfig(type="base"), +) + +_graph = None + + +async def get_graph(): + global _graph + if _graph is None: + _graph = await create_graph() + return _graph + + +@acp.on_task_event_send +async def handle_task_event_send(params: SendEventParams): + """Handle incoming events, streaming tokens and tool calls via unified harness.""" + graph = await get_graph() + task_id = params.task.id + user_message = params.event.content.content + + logger.info(f"Processing message for thread {task_id}") + + await adk.messages.create(task_id=task_id, content=params.event.content) + + async with adk.tracing.span( + trace_id=task_id, + task_id=task_id, + name="message", + input={"message": user_message}, + data={"__span_type__": "AGENT_WORKFLOW"}, + ) as turn_span: + stream = graph.astream( + {"messages": [{"role": "user", "content": user_message}]}, + config={"configurable": {"thread_id": task_id}}, + stream_mode=["messages", "updates"], + ) + + turn = LangGraphTurn(stream, model=None) + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=task_id, + parent_span_id=turn_span.id if turn_span else None, + ) + + result = await emitter.auto_send_turn(turn) + + if turn_span: + turn_span.output = {"final_output": result.final_text} + + +@acp.on_task_create +async def handle_task_create(params: CreateTaskParams): + logger.info(f"Task created: {params.task.id}") + + +@acp.on_task_cancel +async def handle_task_canceled(params: CancelTaskParams): + logger.info(f"Task canceled: {params.task.id}") diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/project/graph.py b/examples/tutorials/10_async/00_base/harness_langgraph/project/graph.py new file mode 100644 index 000000000..4aeac3b3c --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_langgraph/project/graph.py @@ -0,0 +1,67 @@ +"""LangGraph graph definition for the harness_langgraph async agent. + +Identical to ``100_langgraph/project/graph.py`` — the graph definition is not +affected by the harness migration. Only ``acp.py`` changes. +""" + +from __future__ import annotations + +from typing import Any, Annotated +from datetime import datetime +from typing_extensions import TypedDict + +from langgraph.graph import START, StateGraph +from langchain_openai import ChatOpenAI +from langgraph.prebuilt import ToolNode, tools_condition +from langchain_core.messages import SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS +from agentex.lib.adk import create_checkpointer + +MODEL_NAME = "gpt-5" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Guidelines: +- Be concise and helpful +- Use tools when they would help answer the user's question +- If you're unsure, ask clarifying questions +- Always provide accurate information +""" + + +class AgentState(TypedDict): + """State schema for the agent graph.""" + + messages: Annotated[list[Any], add_messages] + + +async def create_graph(): + """Create and compile the agent graph with checkpointer.""" + llm = ChatOpenAI( + model=MODEL_NAME, + reasoning={"effort": "high", "summary": "auto"}, + ) + llm_with_tools = llm.bind_tools(TOOLS) + + checkpointer = await create_checkpointer() + + def agent_node(state: AgentState) -> dict[str, Any]: + """Process the current state and generate a response.""" + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system_content = SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + messages = [SystemMessage(content=system_content)] + messages + response = llm_with_tools.invoke(messages) + return {"messages": [response]} + + builder = StateGraph(AgentState) + builder.add_node("agent", agent_node) + builder.add_node("tools", ToolNode(tools=TOOLS)) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", tools_condition, "tools") + builder.add_edge("tools", "agent") + + return builder.compile(checkpointer=checkpointer) diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/project/tools.py b/examples/tutorials/10_async/00_base/harness_langgraph/project/tools.py new file mode 100644 index 000000000..6e7614300 --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_langgraph/project/tools.py @@ -0,0 +1,24 @@ +"""Tool definitions for the harness_langgraph async agent.""" + +from langchain_core.tools import Tool + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + return f"The weather in {city} is sunny and 72°F" + + +weather_tool = Tool( + name="get_weather", + func=get_weather, + description="Get the current weather for a city. Input should be a city name.", +) + +TOOLS = [weather_tool] diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/pyproject.toml b/examples/tutorials/10_async/00_base/harness_langgraph/pyproject.toml new file mode 100644 index 000000000..69856e6db --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_langgraph/pyproject.toml @@ -0,0 +1,37 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "a-harness-langgraph" +version = "0.1.0" +description = "An async LangGraph agent using the unified harness surface" +readme = "README.md" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "langgraph", + "langchain-openai", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/tutorials/10_async/00_base/harness_langgraph/tests/test_agent.py b/examples/tutorials/10_async/00_base/harness_langgraph/tests/test_agent.py new file mode 100644 index 000000000..762b2b90c --- /dev/null +++ b/examples/tutorials/10_async/00_base/harness_langgraph/tests/test_agent.py @@ -0,0 +1,100 @@ +""" +Tests for the async harness LangGraph agent. + +Validates the unified harness surface (LangGraphTurn + UnifiedEmitter.auto_send_turn) +end-to-end against a live AgentEx server. + +Configuration: +- AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) +- AGENT_NAME: Name of the agent to test (default: a-harness-langgraph) +""" + +import os + +import pytest +import pytest_asyncio + +from agentex import AsyncAgentex +from agentex.types import TextContentParam +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest +from agentex.lib.sdk.fastacp.base.base_acp_server import uuid + +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "a-harness-langgraph") + + +@pytest_asyncio.fixture +async def client(): + client = AsyncAgentex(base_url=AGENTEX_API_BASE_URL) + yield client + await client.close() + + +@pytest.fixture +def agent_name(): + return AGENT_NAME + + +@pytest_asyncio.fixture +async def agent_id(client, agent_name): + agents = await client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingEvents: + @pytest.mark.asyncio + async def test_send_event(self, client: AsyncAgentex, agent_id: str): + task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + event_content = TextContentParam( + type="text", + author="user", + content="Hello! What can you help me with?", + ) + await client.agents.send_event( + agent_id=agent_id, + params={"task_id": task.id, "content": event_content}, + ) + + @pytest.mark.asyncio + async def test_tool_calling(self, client: AsyncAgentex, agent_id: str): + task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + event_content = TextContentParam( + type="text", + author="user", + content="What's the weather in San Francisco?", + ) + await client.agents.send_event( + agent_id=agent_id, + params={"task_id": task.id, "content": event_content}, + ) + + +class TestStreamingEvents: + @pytest.mark.asyncio + async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): + task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + event_content = TextContentParam( + type="text", + author="user", + content="Tell me a short joke.", + ) + await client.agents.send_event( + agent_id=agent_id, + params={"task_id": task.id, "content": event_content}, + ) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/Dockerfile b/examples/tutorials/10_async/10_temporal/harness_langgraph/Dockerfile new file mode 100644 index 000000000..f6c9fb59b --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/Dockerfile @@ -0,0 +1,43 @@ +# syntax=docker/dockerfile:1.3 +FROM python:3.12-slim +COPY --from=ghcr.io/astral-sh/uv:0.6.4 /uv /uvx /bin/ + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + htop \ + vim \ + curl \ + tar \ + python3-dev \ + postgresql-client \ + build-essential \ + libpq-dev \ + gcc \ + cmake \ + netcat-openbsd \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +RUN uv pip install --system --upgrade pip setuptools wheel + +ENV UV_HTTP_TIMEOUT=1000 + +COPY 10_async/10_temporal/harness_langgraph/pyproject.toml /app/harness_langgraph/pyproject.toml +COPY 10_async/10_temporal/harness_langgraph/README.md /app/harness_langgraph/README.md + +WORKDIR /app/harness_langgraph + +COPY 10_async/10_temporal/harness_langgraph/project /app/harness_langgraph/project +COPY 10_async/10_temporal/harness_langgraph/tests /app/harness_langgraph/tests +COPY test_utils /app/test_utils + +RUN uv pip install --system .[dev] + +ENV PYTHONPATH=/app + +ENV AGENT_NAME=at-harness-langgraph + +CMD ["uvicorn", "project.acp:acp", "--host", "0.0.0.0", "--port", "8000"] + +# When we deploy the worker, we will replace the CMD with the following +# CMD ["python", "-m", "run_worker"] diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/README.md b/examples/tutorials/10_async/10_temporal/harness_langgraph/README.md new file mode 100644 index 000000000..4df6969f1 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/README.md @@ -0,0 +1,53 @@ +# Tutorial: Temporal Harness LangGraph Agent + +This tutorial demonstrates how to build a **Temporal-backed** LangGraph agent on +AgentEx, following the ``130_langgraph`` pattern. The agent's LLM node runs as a +durable Temporal activity; the tools node runs inline in the workflow. + +This agent is named ``at-harness-langgraph`` to distinguish it from +``at130-langgraph`` (the bespoke reference). The graph and workflow structure are +identical; only the agent name changes. + +## Key Concepts + +### Temporal + LangGraph + +The ``LangGraphPlugin`` from ``temporalio.contrib.langgraph`` turns annotated graph +nodes into Temporal activities or inline workflow callables: + +- `agent` node: `execute_in="activity"` (durable, retryable LLM call) +- `tools` node: `execute_in="workflow"` (inline, fast tool execution) + +### Message surfacing + +After each turn, ``emit_langgraph_messages`` converts the new LangGraph messages +(tool requests, tool responses, final text) into AgentEx ``TaskMessage`` objects +and posts them to the task's message stream. + +This is the Temporal-specific path. The non-Temporal async/sync channels use +``UnifiedEmitter.auto_send_turn`` / ``UnifiedEmitter.yield_turn`` with +``LangGraphTurn`` instead. + +## Files + +| File | Description | +|------|-------------| +| `project/acp.py` | ACP server (Temporal config, LangGraphPlugin) | +| `project/graph.py` | LangGraph graph (agent + tools nodes) | +| `project/workflow.py` | Temporal workflow (signal handlers, emit_langgraph_messages) | +| `project/run_worker.py` | Temporal worker runner | +| `project/tools.py` | Tool definitions (weather example) | +| `tests/test_agent.py` | Integration tests | +| `manifest.yaml` | Agent configuration (name: at-harness-langgraph) | + +## Running Locally + +```bash +agentex agents run +``` + +## Running Tests + +```bash +pytest tests/test_agent.py -v +``` diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/manifest.yaml b/examples/tutorials/10_async/10_temporal/harness_langgraph/manifest.yaml new file mode 100644 index 000000000..596d38eb4 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/manifest.yaml @@ -0,0 +1,51 @@ +build: + context: + root: ../../../ + include_paths: + - 10_async/10_temporal/harness_langgraph + - test_utils + dockerfile: 10_async/10_temporal/harness_langgraph/Dockerfile + dockerignore: 10_async/10_temporal/harness_langgraph/.dockerignore + +local_development: + agent: + port: 8000 + host_address: host.docker.internal + paths: + acp: project/acp.py + worker: project/run_worker.py + +agent: + acp_type: async + name: at-harness-langgraph + description: "A Temporal-backed LangGraph agent (harness variant) whose nodes run as Temporal activities" + + temporal: + enabled: true + workflows: + - name: at-harness-langgraph + queue_name: at_harness_langgraph_queue + + credentials: + - env_var_name: REDIS_URL + secret_name: redis-url-secret + secret_key: url + + env: {} + +deployment: + image: + repository: "" + tag: "latest" + + imagePullSecrets: [] + + global: + replicaCount: 1 + resources: + requests: + cpu: "500m" + memory: "1Gi" + limits: + cpu: "1000m" + memory: "2Gi" diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/project/__init__.py b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/project/acp.py b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/acp.py new file mode 100644 index 000000000..7af9c5e68 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/acp.py @@ -0,0 +1,34 @@ +"""ACP server for the Temporal harness LangGraph agent. + +Follows the ``130_langgraph`` pattern: the Temporal ``LangGraphPlugin`` runs +graph nodes as Temporal activities. The agent logic lives in ``workflow.py`` +(the runtime) and ``graph.py`` (the LangGraph graph), executed by the Temporal +worker (``run_worker.py``), not by this HTTP process. + +The workflow uses ``emit_langgraph_messages`` to surface turn messages to +AgentEx. That helper is Temporal-specific and is not replaced by the unified +harness here (``UnifiedEmitter`` targets the non-Temporal async/sync channels). +""" + +from __future__ import annotations + +import os + +from dotenv import load_dotenv + +load_dotenv() + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from agentex.lib.types.fastacp import TemporalACPConfig +from agentex.lib.sdk.fastacp.fastacp import FastACP + +acp = FastACP.create( + acp_type="async", + config=TemporalACPConfig( + type="temporal", + temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ), +) diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/project/graph.py b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/graph.py new file mode 100644 index 000000000..ce9c2b520 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/graph.py @@ -0,0 +1,85 @@ +"""LangGraph graph for at-harness-langgraph — nodes run as Temporal activities. + +Identical in structure to ``130_langgraph/project/graph.py``. The graph +definition is not affected by the harness migration; only the agent naming +changes. The LLM ``agent`` node runs as a durable Temporal activity; +the ``tools`` node runs inline in the workflow. +""" + +from __future__ import annotations + +import os +from typing import Any, Annotated +from datetime import datetime, timedelta + +_litellm_key = os.environ.get("LITELLM_API_KEY") +if _litellm_key: + os.environ.setdefault("OPENAI_API_KEY", _litellm_key) + +from typing_extensions import TypedDict + +from langgraph.graph import END, START, StateGraph +from langchain_openai import ChatOpenAI +from langchain_core.messages import ToolMessage, SystemMessage +from langgraph.graph.message import add_messages + +from project.tools import TOOLS + +_TOOLS_BY_NAME = {tool.name: tool for tool in TOOLS} + +GRAPH_NAME = "at-harness-langgraph" +MODEL_NAME = "gpt-4o" +SYSTEM_PROMPT = """You are a helpful AI assistant with access to tools. + +Current date and time: {timestamp} + +Be concise and use tools when they help answer the question.""" + + +class AgentState(TypedDict): + messages: Annotated[list[Any], add_messages] + + +async def agent_node(state: AgentState) -> dict[str, Any]: + """The 'agent' node — one LLM call. Runs as a durable Temporal activity.""" + llm = ChatOpenAI(model=MODEL_NAME).bind_tools(TOOLS) + messages = state["messages"] + if not messages or not isinstance(messages[0], SystemMessage): + system = SystemMessage(content=SYSTEM_PROMPT.format(timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) + messages = [system, *messages] + return {"messages": [await llm.ainvoke(messages)]} + + +async def tools_node(state: AgentState) -> dict[str, Any]: + """Run the tool calls the model requested. Runs inline in the workflow.""" + last = state["messages"][-1] + results: list[Any] = [] + for call in getattr(last, "tool_calls", None) or []: + tool = _TOOLS_BY_NAME.get(call["name"]) + if tool is None: + output = f"Error: unknown tool {call['name']!r}. Available: {list(_TOOLS_BY_NAME)}" + else: + output = await tool.ainvoke(call["args"]) + results.append(ToolMessage(content=str(output), tool_call_id=call["id"], name=call["name"])) + return {"messages": results} + + +async def route_after_agent(state: AgentState) -> str: + """Go to the tools node if the model requested tools, else finish.""" + last = state["messages"][-1] + return "tools" if getattr(last, "tool_calls", None) else END + + +def build_graph() -> StateGraph: + """Build the agent graph; the LLM node runs as an activity, tools in the workflow.""" + builder = StateGraph(AgentState) + builder.add_node( + "agent", + agent_node, + metadata={"execute_in": "activity", "start_to_close_timeout": timedelta(minutes=5)}, + ) + builder.add_node("tools", tools_node, metadata={"execute_in": "workflow"}) + builder.add_edge(START, "agent") + builder.add_conditional_edges("agent", route_after_agent, {"tools": "tools", END: END}) + builder.add_edge("tools", "agent") + return builder diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/project/run_worker.py b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/run_worker.py new file mode 100644 index 000000000..ca64464fc --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/run_worker.py @@ -0,0 +1,46 @@ +"""Temporal worker for at-harness-langgraph. + +Run as a separate long-lived process alongside the ACP HTTP server. The +worker polls Temporal for workflow + activity tasks and executes them. + +The ``LangGraphPlugin`` is given the graph registry (``{ GRAPH_NAME: graph }``). +At runtime it turns the graph's ``execute_in="activity"`` nodes into Temporal +activities and registers them on the worker automatically. +""" + +import asyncio + +from temporalio.contrib.langgraph import LangGraphPlugin + +from project.graph import GRAPH_NAME, build_graph +from project.workflow import AtHarnessLanggraphWorkflow +from agentex.lib.utils.debug import setup_debug_if_enabled +from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.activities import get_all_activities +from agentex.lib.core.temporal.workers.worker import AgentexWorker + +environment_variables = EnvironmentVariables.refresh() +logger = make_logger(__name__) + + +async def main(): + setup_debug_if_enabled() + + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE + if task_queue_name is None: + raise ValueError("WORKFLOW_TASK_QUEUE is not set") + + worker = AgentexWorker( + task_queue=task_queue_name, + plugins=[LangGraphPlugin(graphs={GRAPH_NAME: build_graph()})], + ) + + await worker.run( + activities=get_all_activities(), + workflow=AtHarnessLanggraphWorkflow, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/project/tools.py b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/tools.py new file mode 100644 index 000000000..51440398e --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/tools.py @@ -0,0 +1,24 @@ +"""Tool definitions for the harness_langgraph temporal agent.""" + +from langchain_core.tools import Tool + + +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: The name of the city to get weather for. + + Returns: + A string describing the weather conditions. + """ + return f"The weather in {city} is sunny and 72°F" + + +weather_tool = Tool( + name="get_weather", + func=get_weather, + description="Get the current weather for a city. Input should be a city name.", +) + +TOOLS = [weather_tool] diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/project/workflow.py b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/workflow.py new file mode 100644 index 000000000..4125dca39 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/project/workflow.py @@ -0,0 +1,80 @@ +"""Temporal workflow for at-harness-langgraph. + +Each turn the workflow runs the LangGraph graph (``project/graph.py``) via the +``temporalio.contrib.langgraph`` plugin. The plugin runs the LLM ``agent`` node +as a durable Temporal activity and the ``tools`` node inline in the workflow. + +Multi-turn memory is kept on the workflow instance (``self._messages``) — it's +durable and replay-safe for free, so no checkpoint database is needed. +""" + +from __future__ import annotations + +import json +from typing import Any + +from temporalio import workflow +from temporalio.contrib.langgraph import graph as lg_graph + +from agentex.lib import adk +from project.graph import GRAPH_NAME +from agentex.lib.adk import emit_langgraph_messages +from agentex.protocol.acp import SendEventParams, CreateTaskParams +from agentex.lib.utils.logging import make_logger +from agentex.types.text_content import TextContent +from agentex.lib.environment_variables import EnvironmentVariables +from agentex.lib.core.temporal.types.workflow import SignalName +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow + +environment_variables = EnvironmentVariables.refresh() + +if environment_variables.WORKFLOW_NAME is None: + raise ValueError("Environment variable WORKFLOW_NAME is not set") +if environment_variables.AGENT_NAME is None: + raise ValueError("Environment variable AGENT_NAME is not set") + +logger = make_logger(__name__) + + +@workflow.defn(name=environment_variables.WORKFLOW_NAME) +class AtHarnessLanggraphWorkflow(BaseWorkflow): + """Runs the LangGraph agent each turn; its nodes run as Temporal activities.""" + + def __init__(self) -> None: + super().__init__(display_name=environment_variables.AGENT_NAME) + self._complete_task = False + self._messages: list[Any] = [] + self._emitted = 0 + + @workflow.signal(name=SignalName.RECEIVE_EVENT) + async def on_task_event_send(self, params: SendEventParams) -> None: + """Echo the user's message, run the graph, surface the new messages.""" + await adk.messages.create(task_id=params.task.id, content=params.event.content) + self._messages.append({"role": "user", "content": params.event.content.content}) + + compiled = lg_graph(GRAPH_NAME).compile() + result = await compiled.ainvoke({"messages": self._messages}) + self._messages = result["messages"] + + await emit_langgraph_messages(self._messages[self._emitted :], params.task.id) + self._emitted = len(self._messages) + + @workflow.signal + async def complete_task_signal(self) -> None: + self._complete_task = True + + @workflow.run + async def on_task_create(self, params: CreateTaskParams) -> str: + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n\n" + "Send me a message and I'll respond using a LangGraph agent whose nodes " + "run as durable Temporal activities." + ), + ), + ) + await workflow.wait_condition(lambda: self._complete_task, timeout=None) + return "Task completed" diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/pyproject.toml b/examples/tutorials/10_async/10_temporal/harness_langgraph/pyproject.toml new file mode 100644 index 000000000..897f54dd6 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/pyproject.toml @@ -0,0 +1,40 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "at-harness-langgraph" +version = "0.1.0" +description = "A Temporal-backed LangGraph agent (harness variant) whose nodes run as Temporal activities" +requires-python = ">=3.12" +dependencies = [ + "agentex-sdk", + "scale-gp", + "temporalio[langgraph]>=1.27.0", + "langchain-openai", + "langchain-core", + "grandalf", + "python-dotenv", +] + +[project.optional-dependencies] +dev = [ + "pytest", + "pytest-asyncio", + "httpx", + "black", + "isort", + "flake8", + "debugpy>=1.8.15", +] + +[tool.hatch.build.targets.wheel] +packages = ["project"] + +[tool.black] +line-length = 88 +target-version = ['py312'] + +[tool.isort] +profile = "black" +line_length = 88 diff --git a/examples/tutorials/10_async/10_temporal/harness_langgraph/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/harness_langgraph/tests/test_agent.py new file mode 100644 index 000000000..05d9ffa01 --- /dev/null +++ b/examples/tutorials/10_async/10_temporal/harness_langgraph/tests/test_agent.py @@ -0,0 +1,106 @@ +"""Integration tests for the Temporal harness LangGraph agent (live agent required). + +These drive a *running* agent over the AgentEx API and verify that: +- the agent sends a welcome message on task creation, +- a weather question triggers a tool_request / tool_response round-trip + (proving the LLM node ran as a Temporal activity and the tool node ran), +- the final answer reflects the tool output. + +To run: +1. Start the agent (worker + ACP server): ``agentex agents run --manifest manifest.yaml`` +2. Set AGENTEX_API_BASE_URL if not using the default +3. ``pytest tests/test_agent.py -v`` +""" + +import os +import uuid + +import pytest +import pytest_asyncio +from test_utils.async_utils import ( + poll_messages, + send_event_and_poll_yielding, +) + +from agentex import AsyncAgentex +from agentex.types.task_message import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest + +AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") +AGENT_NAME = os.environ.get("AGENT_NAME", "at-harness-langgraph") + + +@pytest_asyncio.fixture +async def client(): + client = AsyncAgentex(base_url=AGENTEX_API_BASE_URL) + yield client + await client.close() + + +@pytest.fixture +def agent_name(): + return AGENT_NAME + + +@pytest_asyncio.fixture +async def agent_id(client, agent_name): + agents = await client.agents.list() + for agent in agents: + if agent.name == agent_name: + return agent.id + raise ValueError(f"Agent with name {agent_name} not found.") + + +class TestNonStreamingEvents: + """The Temporal-backed LangGraph agent responds and uses tools.""" + + @pytest.mark.asyncio + async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): + """Create a task, ask about weather, verify the tool round-trip.""" + task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + task_creation_found = False + async for message in poll_messages(client=client, task_id=task.id, timeout=30, sleep_interval=1.0): + assert isinstance(message, TaskMessage) + if message.content and message.content.type == "text" and message.content.author == "agent": + task_creation_found = True + break + assert task_creation_found, "Task creation welcome message not found" + + seen_tool_request = False + seen_tool_response = False + final_message = None + async for message in send_event_and_poll_yielding( + client=client, + agent_id=agent_id, + task_id=task.id, + user_message="What is the weather in San Francisco? Use your tool.", + timeout=60, + sleep_interval=1.0, + ): + assert isinstance(message, TaskMessage) + + if message.content and message.content.type == "tool_request": + seen_tool_request = True + if message.content and message.content.type == "tool_response": + seen_tool_response = True + + if message.content and message.content.type == "text" and message.content.author == "agent": + final_message = message + content_length = len(getattr(message.content, "content", "") or "") + if getattr(message, "streaming_status", None) in (None, "DONE") and content_length > 0: + if seen_tool_response: + break + + assert seen_tool_request, "Expected a tool_request (agent calling get_weather)" + assert seen_tool_response, "Expected a tool_response (get_weather result)" + assert final_message is not None, "Expected a final agent text message" + final_text = getattr(final_message.content, "content", None) if final_message.content else None + assert isinstance(final_text, str) and len(final_text) > 0 + assert "72" in final_text, "Expected weather response to mention 72°F" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/src/agentex/lib/adk/_modules/_langgraph_async.py b/src/agentex/lib/adk/_modules/_langgraph_async.py index 3e61c42f9..02ef059eb 100644 --- a/src/agentex/lib/adk/_modules/_langgraph_async.py +++ b/src/agentex/lib/adk/_modules/_langgraph_async.py @@ -3,8 +3,21 @@ Converts LangGraph graph.astream() events into Agentex streaming updates and pushes them to Redis via adk.streaming contexts. For use with async ACP agents that stream via Redis rather than HTTP yields. + +Unified surface +--------------- +This module is now implemented on top of ``LangGraphTurn`` and +``UnifiedEmitter.auto_send_turn``, the same surface used by every other +harness adapter (pydantic-ai, openai-agents, etc.). The public signature +and return type are preserved identically. + +AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` events +(from "updates" events), NOT Start+Delta+Done like pydantic-ai. ``auto_send`` +handles Full events correctly; no coalescing wrapper is needed. """ +from agentex.lib.utils.temporal import workflow_now_if_in_workflow + async def stream_langgraph_events(stream, task_id: str) -> str: """Stream LangGraph events to Agentex via Redis. @@ -18,6 +31,19 @@ async def stream_langgraph_events(stream, task_id: str) -> str: models like gpt-5/o1/o3 (chunk.content is a list of typed content blocks in the Responses API responses/v1 format). + Reimplemented on ``UnifiedEmitter.auto_send_turn(LangGraphTurn(...))`` for + cross-harness consistency. Behavior is identical to the previous bespoke + implementation (verified by characterization tests in test_langgraph_async.py). + + AGX1-377 note: LangGraph emits tool requests as ``Full`` events (from "updates"), + NOT Start+Delta+Done like pydantic-ai. ``auto_send`` handles Full events + correctly; no coalescing wrapper is needed. + + AGX1-378 note: ``created_at`` is set from ``workflow.now()`` when called inside a + Temporal workflow, matching the pattern used by the openai/litellm providers. + Outside a workflow (plain async activities, sync agents) it is ``None`` and the + server's wall clock is used. + Args: stream: Async iterator from graph.astream(..., stream_mode=["messages", "updates"]) task_id: The Agentex task ID to stream messages to. @@ -25,178 +51,15 @@ async def stream_langgraph_events(stream, task_id: str) -> str: Returns: The accumulated final text output from the agent. """ - # Lazy imports so langgraph/langchain aren't required at module load time - from langchain_core.messages import ToolMessage, AIMessageChunk - - from agentex.lib import adk - from agentex.types.text_content import TextContent - from agentex.types.reasoning_content import ReasoningContent - from agentex.types.task_message_delta import TextDelta - from agentex.types.task_message_update import StreamTaskMessageDelta - from agentex.types.tool_request_content import ToolRequestContent - from agentex.types.tool_response_content import ToolResponseContent - from agentex.types.reasoning_summary_delta import ReasoningSummaryDelta - - text_context = None - reasoning_context = None - final_text = "" - - try: - async for event_type, event_data in stream: - if event_type == "messages": - chunk, metadata = event_data - - if not isinstance(chunk, AIMessageChunk) or not chunk.content: - continue - - # ---------------------------------------------------------- - # Case 1: content is a plain string (regular models) - # ---------------------------------------------------------- - if isinstance(chunk.content, str): - if reasoning_context: - await reasoning_context.close() - reasoning_context = None - - if not text_context: - final_text = "" - text_context = await adk.streaming.streaming_task_message_context( - task_id=task_id, - initial_content=TextContent( - author="agent", - content="", - format="markdown", - ), - ).__aenter__() - - final_text += chunk.content - await text_context.stream_update( - StreamTaskMessageDelta( - parent_task_message=text_context.task_message, - delta=TextDelta(type="text", text_delta=chunk.content), - type="delta", - ) - ) - - # ---------------------------------------------------------- - # Case 2: content is a list of typed blocks (reasoning models) - # Responses API (responses/v1) format: - # {"type": "reasoning", "summary": [{"type": "summary_text", "text": "..."}]} - # {"type": "text", "text": "..."} - # ---------------------------------------------------------- - elif isinstance(chunk.content, list): - for block in chunk.content: - if not isinstance(block, dict): - continue - - block_type = block.get("type") - - if block_type == "reasoning": - reasoning_text = "" - for s in block.get("summary", []): - if isinstance(s, dict) and s.get("type") == "summary_text": - reasoning_text += s.get("text", "") - if not reasoning_text: - continue - - if text_context: - await text_context.close() - text_context = None - - if not reasoning_context: - reasoning_context = await adk.streaming.streaming_task_message_context( - task_id=task_id, - initial_content=ReasoningContent( - author="agent", - summary=[], - content=[], - type="reasoning", - style="active", - ), - ).__aenter__() - - await reasoning_context.stream_update( - StreamTaskMessageDelta( - parent_task_message=reasoning_context.task_message, - delta=ReasoningSummaryDelta( - type="reasoning_summary", - summary_index=0, - summary_delta=reasoning_text, - ), - type="delta", - ) - ) - - elif block_type == "text": - text_delta = block.get("text", "") - if not text_delta: - continue - - if reasoning_context: - await reasoning_context.close() - reasoning_context = None - - if not text_context: - final_text = "" - text_context = await adk.streaming.streaming_task_message_context( - task_id=task_id, - initial_content=TextContent( - author="agent", - content="", - format="markdown", - ), - ).__aenter__() - - final_text += text_delta - await text_context.stream_update( - StreamTaskMessageDelta( - parent_task_message=text_context.task_message, - delta=TextDelta(type="text", text_delta=text_delta), - type="delta", - ) - ) - - elif event_type == "updates": - for node_name, state_update in event_data.items(): - if node_name == "agent": - messages = state_update.get("messages", []) - for msg in messages: - if text_context: - await text_context.close() - text_context = None - if reasoning_context: - await reasoning_context.close() - reasoning_context = None - - if hasattr(msg, "tool_calls") and msg.tool_calls: - for tc in msg.tool_calls: - await adk.messages.create( - task_id=task_id, - content=ToolRequestContent( - tool_call_id=tc["id"], - name=tc["name"], - arguments=tc["args"], - author="agent", - ), - ) - - elif node_name == "tools": - messages = state_update.get("messages", []) - for msg in messages: - if isinstance(msg, ToolMessage): - await adk.messages.create( - task_id=task_id, - content=ToolResponseContent( - tool_call_id=msg.tool_call_id, - name=msg.name or "unknown", - content=msg.content if isinstance(msg.content, str) else str(msg.content), - author="agent", - ), - ) - finally: - # Always close open contexts - if text_context: - await text_context.close() - if reasoning_context: - await reasoning_context.close() - - return final_text + from agentex.lib.core.harness.emitter import UnifiedEmitter + from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn + + # AGX1-377 note: LangGraph emits tool requests as Full events (from "updates"), + # NOT Start+Delta+Done like pydantic-ai. auto_send handles Full events correctly; + # no coalescing wrapper is needed. + # AGX1-378: stamp messages with workflow.now() inside Temporal for deterministic + # created_at ordering; falls back to None (server wall clock) outside a workflow. + turn = LangGraphTurn(stream, model=None) + emitter = UnifiedEmitter(task_id=task_id, trace_id=None, parent_span_id=None) + result = await emitter.auto_send_turn(turn, created_at=workflow_now_if_in_workflow()) + return result.final_text diff --git a/src/agentex/lib/adk/_modules/_langgraph_sync.py b/src/agentex/lib/adk/_modules/_langgraph_sync.py index 6d4ce715f..fcf1c10a9 100644 --- a/src/agentex/lib/adk/_modules/_langgraph_sync.py +++ b/src/agentex/lib/adk/_modules/_langgraph_sync.py @@ -3,10 +3,36 @@ Converts LangGraph graph.astream() events into Agentex TaskMessageUpdate events that are yielded back over the HTTP response. For use with sync ACP agents that stream via HTTP yields rather than Redis. + +Unified sync path +----------------- +Prefer using ``LangGraphTurn`` with ``UnifiedEmitter.yield_turn`` for new +agents, which adds usage capture and optional tracing via the shared harness +surface:: + + from agentex.lib.core.harness.emitter import UnifiedEmitter + from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn + + turn = LangGraphTurn(stream) + emitter = UnifiedEmitter(task_id=task_id, trace_id=trace_id, parent_span_id=span_id) + async for event in emitter.yield_turn(turn): + yield event + +``convert_langgraph_to_agentex_events`` remains available as a lower-level +primitive (e.g. for callers that need the raw event stream without the +harness envelope). """ +from __future__ import annotations + +from typing import Any, Callable, Optional +from collections.abc import AsyncGenerator -async def convert_langgraph_to_agentex_events(stream): + +async def convert_langgraph_to_agentex_events( + stream: Any, + on_final_ai_message: Optional[Callable[..., None]] = None, +) -> AsyncGenerator[Any, None]: """Convert LangGraph streaming events to Agentex TaskMessageUpdate events. Expects the stream from graph.astream() called with @@ -22,8 +48,17 @@ async def convert_langgraph_to_agentex_events(stream): Supports both regular models (chunk.content is a str) and reasoning models like gpt-5/o1/o3 (chunk.content is a list of typed content blocks). + AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` (from + "updates" events), NOT Start+Delta+Done like pydantic-ai. No coalesce_tool_requests + option is needed for LangGraph. + Args: stream: Async iterator from graph.astream(..., stream_mode=["messages", "updates"]) + on_final_ai_message: Optional callback ``(msg: AIMessage) -> None`` called for + each ``AIMessage`` in an "agent" node update. Use this to capture + ``usage_metadata`` for token accounting without re-traversing the stream. + The callback fires *after* all events for that message are yielded. + No-op when ``None`` (default). Yields: TaskMessageUpdate events (Start, Delta, Done, Full) @@ -205,6 +240,13 @@ async def convert_langgraph_to_agentex_events(stream): ) message_index += 1 + # Notify caller of the final AIMessage (e.g. for usage capture) + if on_final_ai_message is not None: + from langchain_core.messages import AIMessage as _AIMessage + + if isinstance(msg, _AIMessage): + on_final_ai_message(msg) + elif node_name == "tools": messages = state_update.get("messages", []) for msg in messages: diff --git a/src/agentex/lib/adk/_modules/_langgraph_tracing.py b/src/agentex/lib/adk/_modules/_langgraph_tracing.py index 74b8dcb57..2162201e1 100644 --- a/src/agentex/lib/adk/_modules/_langgraph_tracing.py +++ b/src/agentex/lib/adk/_modules/_langgraph_tracing.py @@ -1,4 +1,14 @@ -"""LangChain callback handler that creates Agentex spans for LLM calls and tool executions.""" +"""LangChain callback handler that creates Agentex spans for LLM calls and tool executions. + +.. deprecated:: + ``AgentexLangGraphTracingHandler`` and ``create_langgraph_tracing_handler`` are + superseded by the unified harness surface (``LangGraphTurn`` + + ``UnifiedEmitter``), which derives spans automatically from the canonical + event stream without requiring a LangChain callback handler. + + They remain importable and functional for backward compatibility, but new + agents should use the unified path instead. +""" # ruff: noqa: ARG002 # Callback methods must accept all arguments defined by LangChain's AsyncCallbackHandler interface. @@ -31,6 +41,11 @@ class AgentexLangGraphTracingHandler(AsyncCallbackHandler): ├── llm: (LLM call) ├── tool: (tool execution) └── llm: (LLM call) + + .. deprecated:: + Use ``LangGraphTurn`` with ``UnifiedEmitter`` instead. The unified + harness derives equivalent spans from the canonical event stream, + removing the need for a LangChain callback handler entirely. """ def __init__( @@ -237,6 +252,20 @@ def create_langgraph_tracing_handler( Returns: An ``AgentexLangGraphTracingHandler`` instance ready to use as a LangChain callback. + + .. deprecated:: + Use ``LangGraphTurn`` with ``UnifiedEmitter`` instead. The unified harness + derives equivalent spans from the canonical event stream automatically, with + no LangChain callback required:: + + from agentex.lib.core.harness.emitter import UnifiedEmitter + from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn + + turn = LangGraphTurn(stream) + emitter = UnifiedEmitter(task_id=task_id, trace_id=trace_id, parent_span_id=span_id) + result = await emitter.auto_send_turn(turn) + + This function remains available for backward compatibility. """ return AgentexLangGraphTracingHandler( trace_id=trace_id, diff --git a/src/agentex/lib/adk/_modules/_langgraph_turn.py b/src/agentex/lib/adk/_modules/_langgraph_turn.py new file mode 100644 index 000000000..6f0913623 --- /dev/null +++ b/src/agentex/lib/adk/_modules/_langgraph_turn.py @@ -0,0 +1,119 @@ +"""HarnessTurn adapter for LangGraph astream() event streams. + +Provides ``LangGraphTurn`` (a ``HarnessTurn`` implementation) and the +``langgraph_usage_to_turn_usage`` helper that maps LangGraph's +``AIMessage.usage_metadata`` onto the framework-agnostic ``TurnUsage`` model. + +AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` events +(from "updates" events), NOT Start+Delta+Done like pydantic-ai. ``auto_send`` +handles Full events correctly; no coalescing wrapper is needed. +""" + +from __future__ import annotations + +from typing import Any, AsyncIterator +from collections.abc import AsyncGenerator + +from agentex.lib.core.harness.types import TurnUsage, StreamTaskMessage +from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events + + +def langgraph_usage_to_turn_usage(usage_metadata: Any, model: str | None) -> TurnUsage: + """Map LangGraph ``AIMessage.usage_metadata`` onto ``TurnUsage``. + + ``usage_metadata`` may be ``None`` (model doesn't report usage). + Real zero token counts (e.g. 0 output tokens) are preserved as 0, NOT + coerced to ``None``. + + Mapping:: + + input_tokens -> input_tokens + output_tokens -> output_tokens + total_tokens -> total_tokens + input_token_details.cache_read -> cached_input_tokens + output_token_details.reasoning -> reasoning_tokens + + Args: + usage_metadata: The ``usage_metadata`` dict from an ``AIMessage``, + or ``None`` if the model did not report usage. + model: The model name string to attach to the ``TurnUsage``, or ``None``. + + Returns: + A populated ``TurnUsage`` instance. + """ + if usage_metadata is None: + return TurnUsage(model=model) + + raw_input = (usage_metadata or {}).get("input_tokens") + raw_output = (usage_metadata or {}).get("output_tokens") + raw_total = (usage_metadata or {}).get("total_tokens") + input_details = (usage_metadata or {}).get("input_token_details") or {} + output_details = (usage_metadata or {}).get("output_token_details") or {} + raw_cache_read = input_details.get("cache_read") + raw_reasoning = output_details.get("reasoning") + + return TurnUsage( + model=model, + input_tokens=raw_input, + output_tokens=raw_output, + total_tokens=raw_total, + cached_input_tokens=raw_cache_read, + reasoning_tokens=raw_reasoning, + ) + + +class LangGraphTurn: + """HarnessTurn wrapping a LangGraph ``astream()`` event stream. + + Implements the ``HarnessTurn`` Protocol so it can be passed to either + ``UnifiedEmitter.yield_turn`` (sync HTTP ACP) or + ``UnifiedEmitter.auto_send_turn`` (async / temporal). + + Usage:: + + stream = graph.astream( + {"messages": [{"role": "user", "content": user_message}]}, + stream_mode=["messages", "updates"], + ) + turn = LangGraphTurn(stream, model=model_name) + + # Sync HTTP ACP + async for event in emitter.yield_turn(turn): + yield event + + # Async / temporal + result = await emitter.auto_send_turn(turn) + + AGX1-377 note: LangGraph tool requests are ``StreamTaskMessageFull`` (from + "updates"), NOT Start+Delta+Done like pydantic-ai. No ``coalesce_tool_requests`` + option is needed. + + Usage data is captured lazily via the ``on_final_ai_message`` callback and + is only valid after ``events`` has been fully consumed. + """ + + def __init__(self, stream: Any, model: str | None = None) -> None: + self._stream = stream + self._model = model + self._usage: TurnUsage = TurnUsage(model=model) + + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: + return self._generate_events() + + async def _generate_events(self) -> AsyncGenerator[StreamTaskMessage, None]: + def _capture(ai_msg: Any) -> None: + usage_metadata = getattr(ai_msg, "usage_metadata", None) + if usage_metadata is not None: + self._usage = langgraph_usage_to_turn_usage(usage_metadata, self._model) + + async for ev in convert_langgraph_to_agentex_events(self._stream, on_final_ai_message=_capture): + yield ev + + def usage(self) -> TurnUsage: + """Return the usage captured from the last AIMessage in the stream. + + Valid only after ``events`` has been fully consumed. + Returns a zero-usage ``TurnUsage`` if the model did not report usage. + """ + return self._usage diff --git a/tests/lib/adk/test_langgraph_async.py b/tests/lib/adk/test_langgraph_async.py new file mode 100644 index 000000000..682bd43bc --- /dev/null +++ b/tests/lib/adk/test_langgraph_async.py @@ -0,0 +1,282 @@ +"""Characterization tests for stream_langgraph_events (unified surface). + +These tests verify the behavior of ``stream_langgraph_events`` after it was +reimplemented on top of ``LangGraphTurn`` + ``UnifiedEmitter.auto_send_turn`` +(Task 4). They serve as a contract test for the public signature. + +Key behavioral notes (unified surface vs. old bespoke implementation): +- Tool calls/responses are posted via ``streaming_task_message_context`` (not + ``adk.messages.create``); they appear as contexts with no stream_update calls. +- ``final_text`` accumulates ALL text across the turn (the old bespoke impl + only returned the last text segment — behavior varied across models). + +NOTE: langchain_core imports are deferred to test scope because conftest.py +stubs ``langchain_core.messages`` with MagicMock. +""" + +from __future__ import annotations + +import sys +from typing import Any +from dataclasses import field, dataclass + +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import StreamTaskMessageDelta +from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events + +TASK_ID = "task-test" + + +# --------------------------------------------------------------------------- +# Remove conftest stubs so real langchain_core types are used +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _real_langchain_core(): + stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] + saved = {k: sys.modules.pop(k) for k in stub_keys} + import importlib + + importlib.import_module("langchain_core.messages") + yield + sys.modules.update(saved) + + +# --------------------------------------------------------------------------- +# Fake streaming infrastructure (mirrors test_pydantic_ai_async.py pattern) +# --------------------------------------------------------------------------- + + +@dataclass +class FakeContext: + initial_content: Any + task_message: TaskMessage + closed: bool = False + updates: list[StreamTaskMessageDelta] = field(default_factory=list) + + async def __aenter__(self) -> "FakeContext": + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool: + await self.close() + return False + + async def stream_update(self, update: StreamTaskMessageDelta) -> None: + if self.closed: + raise AssertionError("stream_update called after close") + self.updates.append(update) + + async def close(self) -> None: + self.closed = True + + +class FakeStreamingModule: + def __init__(self) -> None: + self.contexts: list[FakeContext] = [] + + def streaming_task_message_context(self, *, task_id: str, initial_content: Any, **kw: Any) -> FakeContext: + tm = TaskMessage( + id=f"m{len(self.contexts) + 1}", + task_id=task_id, + content=initial_content, + streaming_status="IN_PROGRESS", + ) + ctx = FakeContext(initial_content=initial_content, task_message=tm) + self.contexts.append(ctx) + return ctx + + +class FakeMessagesModule: + def __init__(self) -> None: + self.created: list[dict[str, Any]] = [] + + async def create(self, *, task_id: str, content: Any) -> TaskMessage: + self.created.append({"task_id": task_id, "content": content}) + return TaskMessage( + id=f"created-{len(self.created)}", + task_id=task_id, + content=content, + streaming_status="DONE", + ) + + +@pytest.fixture +def fake_adk(monkeypatch): + from agentex.lib import adk as adk_module + + streaming = FakeStreamingModule() + messages = FakeMessagesModule() + monkeypatch.setattr(adk_module, "streaming", streaming) + monkeypatch.setattr(adk_module, "messages", messages) + return streaming, messages + + +def _make_stream(events: list[tuple[str, Any]]): + async def _gen(): + for e in events: + yield e + + return _gen() + + +def _text_deltas(ctx: FakeContext) -> list[str]: + out: list[str] = [] + for u in ctx.updates: + if isinstance(u.delta, TextDelta): + out.append(u.delta.text_delta or "") + return out + + +# --------------------------------------------------------------------------- +# Characterization tests (unified surface behavior) +# --------------------------------------------------------------------------- + + +class TestCharacterization: + async def test_plain_text_streams_and_returns_final_text( + self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] + ) -> None: + from langchain_core.messages import AIMessage, AIMessageChunk + + streaming, messages = fake_adk + chunk = AIMessageChunk(content="Hello, world!") + ai_msg = AIMessage(content="Hello, world!") + stream = _make_stream( + [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + ) + + final = await stream_langgraph_events(stream, TASK_ID) + + assert final == "Hello, world!" + assert len(streaming.contexts) == 1 + ctx = streaming.contexts[0] + assert isinstance(ctx.initial_content, TextContent) + assert _text_deltas(ctx) == ["Hello, world!"] + assert ctx.closed is True + # Unified surface: no messages.create for text + assert messages.created == [] + + async def test_empty_stream_returns_empty_string( + self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] + ) -> None: + streaming, _ = fake_adk + final = await stream_langgraph_events(_make_stream([]), TASK_ID) + assert final == "" + assert streaming.contexts == [] + + async def test_tool_call_posted_via_streaming_context( + self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] + ) -> None: + """Unified surface: tool calls go through streaming_task_message_context, + not adk.messages.create. The context is opened and immediately closed + (no deltas) so the initial_content is the tool request.""" + from langchain_core.messages import AIMessage + + streaming, messages = fake_adk + tc = {"id": "call_1", "name": "get_weather", "args": {"city": "Paris"}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + stream = _make_stream([("updates", {"agent": {"messages": [ai_msg]}})]) + + await stream_langgraph_events(stream, TASK_ID) + + # Unified surface: tool messages go via streaming_task_message_context + assert len(streaming.contexts) == 1 + assert messages.created == [], "Unified surface uses streaming_task_message_context, not messages.create" + + from agentex.types.tool_request_content import ToolRequestContent + + content = streaming.contexts[0].initial_content + assert isinstance(content, ToolRequestContent) + assert content.tool_call_id == "call_1" + assert content.name == "get_weather" + assert content.arguments == {"city": "Paris"} + # Full messages close immediately (no delta updates) + assert streaming.contexts[0].closed is True + assert streaming.contexts[0].updates == [] + + async def test_tool_response_posted_via_streaming_context( + self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] + ) -> None: + """Unified surface: tool responses go through streaming_task_message_context.""" + from langchain_core.messages import ToolMessage + + streaming, messages = fake_adk + tool_msg = ToolMessage(content="Sunny, 72F", tool_call_id="call_1", name="get_weather") + stream = _make_stream([("updates", {"tools": {"messages": [tool_msg]}})]) + + await stream_langgraph_events(stream, TASK_ID) + + assert len(streaming.contexts) == 1 + assert messages.created == [] + + from agentex.types.tool_response_content import ToolResponseContent + + content = streaming.contexts[0].initial_content + assert isinstance(content, ToolResponseContent) + assert content.tool_call_id == "call_1" + assert content.name == "get_weather" + assert content.content == "Sunny, 72F" + assert streaming.contexts[0].closed is True + + async def test_multi_step_text_then_tool_then_text_last_segment( + self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] + ) -> None: + """Unified surface: final_text uses last-segment semantics. + + auto_send resets final_text_parts when a new Start(TextContent) is seen, + so multi-step turns (text -> tool -> text) return only the LAST text segment. + Both text contexts are still opened and streamed to Redis; only the + return value is last-segment. This matches stream_pydantic_ai_events. + """ + from langchain_core.messages import AIMessage, ToolMessage, AIMessageChunk + + streaming, messages = fake_adk + chunk1 = AIMessageChunk(content="Looking up...") + ai_msg1 = AIMessage(content="Looking up...", tool_calls=[{"id": "c1", "name": "search", "args": {}}]) + tool_msg = ToolMessage(content="result", tool_call_id="c1", name="search") + chunk2 = AIMessageChunk(content="Found it!") + ai_msg2 = AIMessage(content="Found it!") + + stream = _make_stream( + [ + ("messages", (chunk1, {})), + ("updates", {"agent": {"messages": [ai_msg1]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ("messages", (chunk2, {})), + ("updates", {"agent": {"messages": [ai_msg2]}}), + ] + ) + + final = await stream_langgraph_events(stream, TASK_ID) + + # Last segment only — first text segment is NOT in final_text + assert final == "Found it!" + # Two text streaming contexts (one per text segment) — both streamed to Redis + text_ctxs = [c for c in streaming.contexts if isinstance(c.initial_content, TextContent)] + assert len(text_ctxs) == 2 + assert all(ctx.closed for ctx in text_ctxs) + # Tool request + tool response via streaming_task_message_context (not messages.create) + assert messages.created == [] + + async def test_context_closed_on_exception(self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule]) -> None: + from langchain_core.messages import AIMessageChunk + + streaming, _ = fake_adk + + async def _boom(): + chunk = AIMessageChunk(content="partial") + yield ("messages", (chunk, {})) + raise RuntimeError("upstream exploded") + + with pytest.raises(RuntimeError, match="upstream exploded"): + await stream_langgraph_events(_boom(), TASK_ID) + + assert streaming.contexts[0].closed is True diff --git a/tests/lib/adk/test_langgraph_sync.py b/tests/lib/adk/test_langgraph_sync.py new file mode 100644 index 000000000..d64b5b1e8 --- /dev/null +++ b/tests/lib/adk/test_langgraph_sync.py @@ -0,0 +1,217 @@ +"""Tests for the sync LangGraph -> Agentex stream event converter. + +Covers: +- Basic text, tool call, and tool response emission +- on_final_ai_message callback for usage capture +- create_langgraph_tracing_handler symbol is importable and functional + (runtime DeprecationWarning removed; deprecation is docstring-only) + +NOTE: langchain_core imports must be deferred to test-function scope because +conftest.py stubs out ``langchain_core.messages`` with MagicMock for ADK +package-level tests. The real classes are imported lazily inside each test. +""" + +from __future__ import annotations + +import sys +from typing import Any, AsyncIterator + +import pytest + +from agentex.types.task_message_update import ( + StreamTaskMessageFull, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._langgraph_sync import convert_langgraph_to_agentex_events + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _collect(stream: AsyncIterator[Any]) -> list[Any]: + return [e async for e in stream] + + +def _make_stream(events: list[tuple[str, Any]]) -> AsyncIterator[tuple[str, Any]]: + async def _gen(): + for e in events: + yield e + + return _gen() + + +# --------------------------------------------------------------------------- +# Remove the conftest stubs for langchain_core so real classes are used +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _real_langchain_core(): + """Remove conftest MagicMock stubs so real langchain_core types are used.""" + stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] + saved = {k: sys.modules.pop(k) for k in stub_keys} + # Re-import the real modules + import importlib + + importlib.import_module("langchain_core.messages") + yield + # Restore stubs after the test + sys.modules.update(saved) + + +class TestTextStreaming: + async def test_plain_text_emits_start_delta_done(self): + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="Hello, world!") + events = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [AIMessage(content="Hello, world!")]}}), + ] + out = await _collect(convert_langgraph_to_agentex_events(_make_stream(events))) + types = [type(e).__name__ for e in out] + assert "StreamTaskMessageStart" in types + assert "StreamTaskMessageDelta" in types + assert "StreamTaskMessageDone" in types + + async def test_empty_chunk_content_is_skipped(self): + from langchain_core.messages import AIMessageChunk + + chunk = AIMessageChunk(content="") + events = [("messages", (chunk, {}))] + out = await _collect(convert_langgraph_to_agentex_events(_make_stream(events))) + assert out == [] + + +class TestToolCallEmission: + async def test_tool_call_emits_full_message(self): + from langchain_core.messages import AIMessage + + tc = {"id": "call_1", "name": "get_weather", "args": {"city": "Paris"}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + events = [("updates", {"agent": {"messages": [ai_msg]}})] + out = await _collect(convert_langgraph_to_agentex_events(_make_stream(events))) + assert len(out) == 1 + assert isinstance(out[0], StreamTaskMessageFull) + content = out[0].content + assert isinstance(content, ToolRequestContent) + assert content.tool_call_id == "call_1" + assert content.name == "get_weather" + assert content.arguments == {"city": "Paris"} + assert content.author == "agent" + + async def test_tool_response_emits_full_message(self): + from langchain_core.messages import ToolMessage + + tool_msg = ToolMessage(content="Sunny, 72F", tool_call_id="call_1", name="get_weather") + events = [("updates", {"tools": {"messages": [tool_msg]}})] + out = await _collect(convert_langgraph_to_agentex_events(_make_stream(events))) + assert len(out) == 1 + assert isinstance(out[0], StreamTaskMessageFull) + content = out[0].content + assert isinstance(content, ToolResponseContent) + assert content.tool_call_id == "call_1" + assert content.name == "get_weather" + assert content.content == "Sunny, 72F" + assert content.author == "agent" + + +class TestOnFinalAiMessageCallback: + async def test_callback_called_for_ai_message_in_agent_node(self): + from langchain_core.messages import AIMessage + + captured: list[Any] = [] + ai_msg = AIMessage(content="Hello!") + + events = [("updates", {"agent": {"messages": [ai_msg]}})] + await _collect(convert_langgraph_to_agentex_events(_make_stream(events), on_final_ai_message=captured.append)) + assert len(captured) == 1 + assert captured[0] is ai_msg + + async def test_callback_not_called_for_tool_messages(self): + from langchain_core.messages import ToolMessage + + captured: list[Any] = [] + tool_msg = ToolMessage(content="result", tool_call_id="c1", name="t") + + events = [("updates", {"tools": {"messages": [tool_msg]}})] + await _collect(convert_langgraph_to_agentex_events(_make_stream(events), on_final_ai_message=captured.append)) + assert captured == [] + + async def test_callback_receives_usage_metadata(self): + from langchain_core.messages import AIMessage + + captured: list[Any] = [] + usage = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15} + ai_msg = AIMessage(content="Answer.", usage_metadata=usage) + + events = [("updates", {"agent": {"messages": [ai_msg]}})] + await _collect(convert_langgraph_to_agentex_events(_make_stream(events), on_final_ai_message=captured.append)) + assert len(captured) == 1 + assert captured[0].usage_metadata == usage + + async def test_no_callback_is_noop(self): + from langchain_core.messages import AIMessage + + ai_msg = AIMessage(content="Hello!") + events = [("updates", {"agent": {"messages": [ai_msg]}})] + out = await _collect(convert_langgraph_to_agentex_events(_make_stream(events))) + assert isinstance(out, list) + + async def test_callback_called_multiple_times_for_multi_step(self): + from langchain_core.messages import AIMessage + + captured: list[Any] = [] + ai_msg_1 = AIMessage(content="Step 1") + ai_msg_2 = AIMessage(content="Step 2") + + events = [ + ("updates", {"agent": {"messages": [ai_msg_1]}}), + ("updates", {"agent": {"messages": [ai_msg_2]}}), + ] + await _collect(convert_langgraph_to_agentex_events(_make_stream(events), on_final_ai_message=captured.append)) + assert len(captured) == 2 + assert captured[0] is ai_msg_1 + assert captured[1] is ai_msg_2 + + async def test_callback_called_after_tool_call_events_yielded(self): + """The callback fires after all events for that AIMessage are yielded.""" + from langchain_core.messages import AIMessage + + yield_order: list[str] = [] + + async def _gen(): + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + yield ("updates", {"agent": {"messages": [ai_msg]}}) + + def _cb(msg): + yield_order.append("callback") + + async for _ in convert_langgraph_to_agentex_events(_gen(), on_final_ai_message=_cb): + yield_order.append("event") + + # The tool call Full event is emitted before the callback fires + assert yield_order.index("event") < yield_order.index("callback") + + +class TestLangGraphTracingHandlerBackwardCompat: + def test_create_langgraph_tracing_handler_no_runtime_warning(self): + """Deprecated symbol remains importable and emits no runtime DeprecationWarning. + + The runtime warnings.warn was removed (docstring-only deprecation) to + align with PR 4/6 and avoid breaking callers under warnings-as-errors. + Using ``warnings.simplefilter("error", DeprecationWarning)`` verifies + that calling the function is safe under -W error conditions. + """ + import warnings + + from agentex.lib.adk._modules._langgraph_tracing import create_langgraph_tracing_handler + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("error", DeprecationWarning) + create_langgraph_tracing_handler(trace_id="t1", parent_span_id="p1") + + assert w == [], "create_langgraph_tracing_handler must NOT emit a runtime DeprecationWarning" diff --git a/tests/lib/adk/test_langgraph_sync_unified.py b/tests/lib/adk/test_langgraph_sync_unified.py new file mode 100644 index 000000000..cfd522828 --- /dev/null +++ b/tests/lib/adk/test_langgraph_sync_unified.py @@ -0,0 +1,214 @@ +"""Unified sync path tests for LangGraphTurn + UnifiedEmitter. + +Verifies: +1. Passthrough: events from emitter.yield_turn(LangGraphTurn(stream)) equal + LangGraphTurn(stream).events collected directly. +2. Span derivation: with trace_id + fake tracer, tool spans are derived from + the event stream. + +NOTE: langchain_core imports are deferred to test scope because conftest.py +stubs ``langchain_core.messages`` with MagicMock. +""" + +from __future__ import annotations + +import sys +from typing import Any +from datetime import datetime, timezone +from dataclasses import field, dataclass + +import pytest + +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn + +# --------------------------------------------------------------------------- +# Remove conftest stubs so real langchain_core types are used +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _real_langchain_core(): + stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] + saved = {k: sys.modules.pop(k) for k in stub_keys} + import importlib + + importlib.import_module("langchain_core.messages") + yield + sys.modules.update(saved) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_stream(events: list[tuple[str, Any]]): + async def _gen(): + for e in events: + yield e + + return _gen() + + +# --------------------------------------------------------------------------- +# Fake SpanTracer +# --------------------------------------------------------------------------- + + +@dataclass +class _FakeTracingBackend: + spans_started: list[dict[str, Any]] = field(default_factory=list) + spans_ended: list[str] = field(default_factory=list) + + async def start_span(self, **kw) -> Any: + from agentex.types.span import Span + + sp = Span( + id=f"span-{len(self.spans_started) + 1}", + trace_id=kw.get("trace_id", "trace1"), + name=kw.get("name", ""), + start_time=datetime.now(tz=timezone.utc), + ) + self.spans_started.append(kw) + return sp + + async def end_span(self, *, trace_id: str, span: Any) -> None: + self.spans_ended.append(span.id if span else "") + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestPassthrough: + async def test_yield_turn_events_equal_direct_events(self): + """Events from emitter.yield_turn(LangGraphTurn(stream)) must equal + LangGraphTurn(stream).events collected directly — the emitter must not + add, drop, or reorder events in yield mode.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="Hello!") + ai_msg = AIMessage(content="Hello!") + + # Build two identical streams + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + + # Direct collection + direct = [e async for e in LangGraphTurn(_make_stream(events_raw)).events] + + # Via emitter.yield_turn + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + via_emitter = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert len(direct) == len(via_emitter), "yield_turn must not add or drop events relative to direct iteration" + for a, b in zip(direct, via_emitter, strict=True): + assert type(a) == type(b), f"Event type mismatch: {type(a).__name__} vs {type(b).__name__}" + + async def test_yield_turn_passes_all_event_types(self): + """Start, Delta, Done, Full — each type is preserved.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="hi") + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="hi", tool_calls=[tc]) + + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + types = {type(e).__name__ for e in out} + # text chunk emits Start + Delta + assert "StreamTaskMessageStart" in types + assert "StreamTaskMessageDelta" in types + # tool call emits Full + assert "StreamTaskMessageFull" in types + + async def test_empty_stream_yields_no_events(self): + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream([])))] + assert out == [] + + +class TestSpanDerivation: + @pytest.fixture + def fake_tracer(self): + backend = _FakeTracingBackend() + tracer = SpanTracer( + trace_id="trace1", + parent_span_id=None, + task_id="t", + tracing=backend, # type: ignore[arg-type] + ) + return tracer, backend + + async def test_tool_span_derived_from_full_events(self, fake_tracer): + """AGX1-377: SpanDeriver now handles Full tool events for LangGraph. + + Full(ToolRequestContent) opens a tool span keyed by tool_call_id; + Full(ToolResponseContent) closes it. This bridges the previous gap where + LangGraph's Full-event path produced no spans, aligning it with + Start+Done harnesses (pydantic-ai, openai-agents). + """ + from langchain_core.messages import AIMessage, ToolMessage + + tracer, backend = fake_tracer + tc = {"id": "c1", "name": "get_weather", "args": {"city": "Paris"}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="Sunny", tool_call_id="c1", name="get_weather") + + events_raw = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] + + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert len(backend.spans_started) == 1, "Full(ToolRequestContent) opens one tool span" + started = backend.spans_started[0] + assert started["name"] == "get_weather" + assert started["input"] == {"city": "Paris"} + + async def test_no_spans_when_no_tool_calls(self, fake_tracer): + """yield_turn with tracer but no tool calls emits no spans.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + tracer, backend = fake_tracer + chunk = AIMessageChunk(content="Hello!") + ai_msg = AIMessage(content="Hello!") + + events_raw = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=tracer) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + + assert backend.spans_started == [], "No tool spans when there are no tool calls" + + async def test_tracer_none_means_no_spans(self): + """With tracer=False, no spans should be emitted.""" + from langchain_core.messages import AIMessage, ToolMessage + + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="ok", tool_call_id="c1", name="t") + + events_raw = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] + + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id=None, tracer=False) + _ = [e async for e in emitter.yield_turn(LangGraphTurn(_make_stream(events_raw)))] + # No assertion on spans since tracer=False means emitter.tracer is None + assert emitter.tracer is None diff --git a/tests/lib/adk/test_langgraph_turn.py b/tests/lib/adk/test_langgraph_turn.py new file mode 100644 index 000000000..66d3a007d --- /dev/null +++ b/tests/lib/adk/test_langgraph_turn.py @@ -0,0 +1,224 @@ +"""Tests for LangGraphTurn and langgraph_usage_to_turn_usage.""" + +from __future__ import annotations + +import sys +from typing import Any + +import pytest + +from agentex.lib.core.harness.types import TurnUsage +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn, langgraph_usage_to_turn_usage + +# --------------------------------------------------------------------------- +# Remove conftest stubs so real langchain_core types are used +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _real_langchain_core(): + stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] + saved = {k: sys.modules.pop(k) for k in stub_keys} + import importlib + + importlib.import_module("langchain_core.messages") + yield + sys.modules.update(saved) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_stream(events: list[tuple[str, Any]]): + async def _gen(): + for e in events: + yield e + + return _gen() + + +async def _drain(turn: LangGraphTurn) -> list[Any]: + return [e async for e in turn.events] + + +# --------------------------------------------------------------------------- +# langgraph_usage_to_turn_usage +# --------------------------------------------------------------------------- + + +class TestLangGraphUsageToTurnUsage: + def test_none_usage_returns_empty_turn_usage(self): + result = langgraph_usage_to_turn_usage(None, model="gpt-4") + assert result == TurnUsage(model="gpt-4") + + def test_basic_token_fields_mapped(self): + usage = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15} + result = langgraph_usage_to_turn_usage(usage, model="gpt-4") + assert result.input_tokens == 10 + assert result.output_tokens == 5 + assert result.total_tokens == 15 + assert result.model == "gpt-4" + + def test_zero_output_tokens_preserved_not_coerced_to_none(self): + """Real zero counts must be preserved as 0, not None.""" + usage = {"input_tokens": 10, "output_tokens": 0, "total_tokens": 10} + result = langgraph_usage_to_turn_usage(usage, model=None) + assert result.output_tokens == 0 + + def test_cache_read_mapped_to_cached_input_tokens(self): + usage = { + "input_tokens": 20, + "output_tokens": 5, + "total_tokens": 25, + "input_token_details": {"cache_read": 8}, + } + result = langgraph_usage_to_turn_usage(usage, model=None) + assert result.cached_input_tokens == 8 + + def test_reasoning_mapped_to_reasoning_tokens(self): + usage = { + "input_tokens": 10, + "output_tokens": 15, + "total_tokens": 25, + "output_token_details": {"reasoning": 6}, + } + result = langgraph_usage_to_turn_usage(usage, model=None) + assert result.reasoning_tokens == 6 + + def test_missing_optional_fields_are_none(self): + usage = {"input_tokens": 5, "output_tokens": 3, "total_tokens": 8} + result = langgraph_usage_to_turn_usage(usage, model=None) + assert result.cached_input_tokens is None + assert result.reasoning_tokens is None + + def test_full_usage_object(self): + usage = { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "input_token_details": {"cache_read": 30}, + "output_token_details": {"reasoning": 20}, + } + result = langgraph_usage_to_turn_usage(usage, model="claude-3-5-sonnet") + assert result == TurnUsage( + model="claude-3-5-sonnet", + input_tokens=100, + output_tokens=50, + total_tokens=150, + cached_input_tokens=30, + reasoning_tokens=20, + ) + + def test_model_none_is_preserved(self): + result = langgraph_usage_to_turn_usage({"input_tokens": 1}, model=None) + assert result.model is None + + def test_empty_input_token_details_does_not_crash(self): + usage = {"input_tokens": 5, "input_token_details": {}} + result = langgraph_usage_to_turn_usage(usage, model=None) + assert result.cached_input_tokens is None + + def test_empty_output_token_details_does_not_crash(self): + usage = {"output_tokens": 5, "output_token_details": {}} + result = langgraph_usage_to_turn_usage(usage, model=None) + assert result.reasoning_tokens is None + + +# --------------------------------------------------------------------------- +# LangGraphTurn +# --------------------------------------------------------------------------- + + +class TestLangGraphTurn: + async def test_events_yields_from_sync_converter(self): + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="Hello!") + ai_msg = AIMessage(content="Hello!") + stream = _make_stream( + [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + ) + turn = LangGraphTurn(stream) + events = await _drain(turn) + assert len(events) > 0 + + async def test_usage_is_empty_before_stream_consumed(self): + turn = LangGraphTurn(_make_stream([])) + # usage() before events consumed should return a default TurnUsage + usage = turn.usage() + assert isinstance(usage, TurnUsage) + + async def test_usage_captured_from_ai_message(self): + from langchain_core.messages import AIMessage + + usage_meta = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15} + ai_msg = AIMessage(content="Hi!", usage_metadata=usage_meta) + stream = _make_stream([("updates", {"agent": {"messages": [ai_msg]}})]) + turn = LangGraphTurn(stream, model="gpt-4") + await _drain(turn) + + usage = turn.usage() + assert usage.input_tokens == 10 + assert usage.output_tokens == 5 + assert usage.total_tokens == 15 + assert usage.model == "gpt-4" + + async def test_usage_not_updated_when_no_usage_metadata(self): + from langchain_core.messages import AIMessage + + ai_msg = AIMessage(content="Hi!") + stream = _make_stream([("updates", {"agent": {"messages": [ai_msg]}})]) + turn = LangGraphTurn(stream, model="gpt-4") + await _drain(turn) + + usage = turn.usage() + assert usage == TurnUsage(model="gpt-4") + + async def test_usage_captures_cache_read_and_reasoning(self): + from langchain_core.messages import AIMessage + + usage_meta = { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "input_token_details": {"cache_read": 30}, + "output_token_details": {"reasoning": 20}, + } + ai_msg = AIMessage(content="Result", usage_metadata=usage_meta) + stream = _make_stream([("updates", {"agent": {"messages": [ai_msg]}})]) + turn = LangGraphTurn(stream, model="claude-3-5-sonnet") + await _drain(turn) + + usage = turn.usage() + assert usage.cached_input_tokens == 30 + assert usage.reasoning_tokens == 20 + + async def test_harness_turn_protocol_conformance(self): + """LangGraphTurn satisfies the HarnessTurn Protocol.""" + from agentex.lib.core.harness.types import HarnessTurn + + turn = LangGraphTurn(_make_stream([])) + assert isinstance(turn, HarnessTurn), "LangGraphTurn must satisfy HarnessTurn Protocol" + + async def test_empty_stream_yields_no_events(self): + turn = LangGraphTurn(_make_stream([])) + events = await _drain(turn) + assert events == [] + + async def test_model_none_default(self): + turn = LangGraphTurn(_make_stream([])) + assert turn.usage().model is None + + async def test_model_passed_through_to_usage(self): + from langchain_core.messages import AIMessage + + ai_msg = AIMessage(content="ok", usage_metadata={"input_tokens": 1, "output_tokens": 0, "total_tokens": 1}) + stream = _make_stream([("updates", {"agent": {"messages": [ai_msg]}})]) + turn = LangGraphTurn(stream, model="my-model") + await _drain(turn) + assert turn.usage().model == "my-model" diff --git a/tests/lib/adk/test_pydantic_ai_async.py b/tests/lib/adk/test_pydantic_ai_async.py index dadda5914..62cc3970a 100644 --- a/tests/lib/adk/test_pydantic_ai_async.py +++ b/tests/lib/adk/test_pydantic_ai_async.py @@ -82,7 +82,7 @@ class FakeStreamingModule: def __init__(self) -> None: self.contexts: list[FakeContext] = [] - def streaming_task_message_context(self, *, task_id: str, initial_content: Any) -> FakeContext: + def streaming_task_message_context(self, *, task_id: str, initial_content: Any, **kw: Any) -> FakeContext: tm = TaskMessage( id=f"m{len(self.contexts) + 1}", task_id=task_id, diff --git a/tests/lib/core/harness/conformance/test_langgraph_conformance.py b/tests/lib/core/harness/conformance/test_langgraph_conformance.py new file mode 100644 index 000000000..4c64a223c --- /dev/null +++ b/tests/lib/core/harness/conformance/test_langgraph_conformance.py @@ -0,0 +1,206 @@ +"""LangGraph conformance fixtures for the cross-channel span-derivation test. + +Registers 4 LangGraph event sequences as conformance fixtures: +- text-only: a plain text response (no tool calls) +- single-tool: one tool call + response +- reasoning: a reasoning block + text +- multi-step: two turns with tool calls + +AGX1-377 note: LangGraph emits tool requests as ``StreamTaskMessageFull`` +(from "updates" events), NOT Start+Delta+Done like pydantic-ai. The SpanDeriver +does not produce tool spans from Full events today; that gap is tracked in +AGX1-373. The fixtures here document the current behavior and will be updated +when AGX1-373 resolves. +""" + +from __future__ import annotations + +import pytest + +from agentex.types.text_content import TextContent +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.reasoning_content_delta import ReasoningContentDelta + +from .runner import Fixture, register, derive_all + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +_TEXT_ONLY = Fixture( + name="langgraph-text-only", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="Hello from LangGraph!"), + ), + StreamTaskMessageDone(type="done", index=0), + ], +) + +_SINGLE_TOOL = Fixture( + name="langgraph-single-tool", + events=[ + # LangGraph tool request is a Full event (AGX1-377) + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_1", + name="get_weather", + arguments={"city": "Paris"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_1", + name="get_weather", + content="Sunny, 72F", + ), + ), + StreamTaskMessageStart( + type="start", + index=2, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=2, + delta=TextDelta(type="text", text_delta="The weather in Paris is sunny, 72F."), + ), + StreamTaskMessageDone(type="done", index=2), + ], +) + +_REASONING = Fixture( + name="langgraph-reasoning", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ReasoningContent( + type="reasoning", + author="agent", + summary=[], + content=[], + style="active", + ), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ReasoningContentDelta( + type="reasoning_content", + content_index=0, + content_delta="Thinking about this...", + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageStart( + type="start", + index=1, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=1, + delta=TextDelta(type="text", text_delta="The answer is 42."), + ), + StreamTaskMessageDone(type="done", index=1), + ], +) + +_MULTI_STEP = Fixture( + name="langgraph-multi-step", + events=[ + # Turn 1: text + tool call + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="Let me search for that."), + ), + StreamTaskMessageDone(type="done", index=0), + # Tool request (Full — AGX1-377) + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_2", + name="search", + arguments={"query": "langgraph"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=2, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_2", + name="search", + content="LangGraph is a framework for...", + ), + ), + # Turn 2: final text + StreamTaskMessageStart( + type="start", + index=3, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=3, + delta=TextDelta(type="text", text_delta="Based on my research, LangGraph is..."), + ), + StreamTaskMessageDone(type="done", index=3), + ], +) + +_LANGGRAPH_FIXTURES = [_TEXT_ONLY, _SINGLE_TOOL, _REASONING, _MULTI_STEP] + +for _fixture in _LANGGRAPH_FIXTURES: + register(_fixture) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("fixture", _LANGGRAPH_FIXTURES, ids=lambda f: f.name) +def test_langgraph_span_derivation_is_deterministic(fixture: Fixture): + """Exercises the cross-channel guarantee: yield and auto-send observe the + same event stream, so span derivation must be deterministic/idempotent. + + Deriving twice over the same events yields identical signals (the property + that makes yield vs auto-send equivalent, since both observe the same stream). + """ + assert derive_all(fixture.events) == derive_all(fixture.events) diff --git a/tests/lib/core/harness/test_harness_langgraph_async.py b/tests/lib/core/harness/test_harness_langgraph_async.py new file mode 100644 index 000000000..39bf5bc66 --- /dev/null +++ b/tests/lib/core/harness/test_harness_langgraph_async.py @@ -0,0 +1,298 @@ +"""Integration test: async (Redis-streaming) channel with a LangGraph agent. + +Exercises the unified harness surface (UnifiedEmitter.auto_send_turn + LangGraphTurn) +with a minimal fake LangGraph stream so the test runs fully offline (no API +keys, no Redis, no Agentex server). + +Agent description +----------------- +A simulated single-tool agent run using hand-crafted LangGraph event tuples: +one tool request + response, followed by a final text reply. + +What is tested +-------------- +- The async handler pushes the correct sequence of messages to the fake streaming + backend: Full(ToolRequest) + Full(ToolResponse) + text Start/Delta/Done. +- final_text accumulates all text (not just last segment — AGX1-377 unified behavior). +- Tool messages go through streaming_task_message_context (not messages.create). +- With a SpanTracer, no tool spans are produced (AGX1-377: Full events are not + handled by SpanDeriver today). + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual Redis streaming (requires a running Redis instance). +- The ACP on_task_event_send / on_task_create / on_task_cancel lifecycle. +- Real LLM calls or real LangGraph graph execution. +- The full FastACP async request lifecycle. + +See also: test_harness_langgraph_sync.py and test_harness_langgraph_temporal.py +for the other two channels. +""" + +from __future__ import annotations + +import sys +from typing import Any +from dataclasses import field, dataclass + +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import TurnResult +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn + +# --------------------------------------------------------------------------- +# Remove conftest stubs so real langchain_core types are used +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _real_langchain_core(): + stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] + saved = {k: sys.modules.pop(k) for k in stub_keys} + import importlib + + importlib.import_module("langchain_core.messages") + yield + sys.modules.update(saved) + + +# --------------------------------------------------------------------------- +# Fake streaming backend (replaces adk.streaming; no Redis required) +# --------------------------------------------------------------------------- + + +@dataclass +class _FakeCtx: + ctype: str + initial_content: Any + task_message: TaskMessage + closed: bool = False + deltas: list[Any] = field(default_factory=list) + + async def __aenter__(self) -> "_FakeCtx": + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + self.closed = True + + async def stream_update(self, update: Any) -> Any: + self.deltas.append(update) + return update + + +class _FakeStreaming: + def __init__(self) -> None: + self.contexts: list[_FakeCtx] = [] + + def streaming_task_message_context(self, task_id: str, initial_content: Any, **kw: Any) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + tm = TaskMessage(id=f"m{len(self.contexts) + 1}", task_id=task_id, content=initial_content) + ctx = _FakeCtx(ctype=ctype, initial_content=initial_content, task_message=tm) + self.contexts.append(ctx) + return ctx + + +# --------------------------------------------------------------------------- +# Fake tracing backend +# --------------------------------------------------------------------------- + + +class _FakeSpan: + def __init__(self, name: str) -> None: + self.name = name + self.output: Any = None + + +class _FakeTracing: + def __init__(self) -> None: + self.started: list[tuple[str, Any]] = [] + self.ended: list[tuple[str, Any]] = [] + + async def start_span(self, *, trace_id: str, name: str, **kw: Any) -> _FakeSpan: + self.started.append((name, kw.get("parent_id"))) + return _FakeSpan(name) + + async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None: + self.ended.append((span.name, span.output)) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_stream(events: list[tuple[str, Any]]): + async def _gen(): + for e in events: + yield e + + return _gen() + + +async def _run_auto_send_turn( + stream_events: list[tuple[str, Any]], + trace_id: str | None = None, +) -> tuple[TurnResult, _FakeStreaming, _FakeTracing | None]: + fake_streaming = _FakeStreaming() + fake_tracing = _FakeTracing() if trace_id else None + + tracer: SpanTracer | bool = False + if trace_id and fake_tracing is not None: + tracer = SpanTracer(trace_id=trace_id, parent_span_id=None, task_id="task1", tracing=fake_tracing) + + turn = LangGraphTurn(_make_stream(stream_events), model=None) + emitter = UnifiedEmitter( + task_id="task1", + trace_id=trace_id, + parent_span_id=None, + tracer=tracer, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + return result, fake_streaming, fake_tracing + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestAsyncAutoSendChannel: + async def test_text_only_streams_text_and_returns_final(self): + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="Hello from LangGraph!") + ai_msg = AIMessage(content="Hello from LangGraph!") + events = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + result, fake_streaming, _ = await _run_auto_send_turn(events) + + assert result.final_text == "Hello from LangGraph!" + text_ctxs = [c for c in fake_streaming.contexts if c.ctype == "text"] + assert len(text_ctxs) == 1 + assert text_ctxs[0].closed is True + + async def test_tool_call_posted_via_streaming_context(self): + from langchain_core.messages import AIMessage + + tc = {"id": "call_1", "name": "get_weather", "args": {"city": "Paris"}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + events = [("updates", {"agent": {"messages": [ai_msg]}})] + + result, fake_streaming, _ = await _run_auto_send_turn(events) + + # Tool request via streaming_task_message_context (Full event) + tool_req_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, ToolRequestContent)] + assert len(tool_req_ctxs) == 1 + assert tool_req_ctxs[0].initial_content.tool_call_id == "call_1" + assert tool_req_ctxs[0].closed is True + assert tool_req_ctxs[0].deltas == [], "Full messages have no deltas" + + async def test_tool_response_posted_via_streaming_context(self): + from langchain_core.messages import ToolMessage + + tool_msg = ToolMessage(content="Sunny, 72F", tool_call_id="call_1", name="get_weather") + events = [("updates", {"tools": {"messages": [tool_msg]}})] + + _, fake_streaming, _ = await _run_auto_send_turn(events) + + tool_resp_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, ToolResponseContent)] + assert len(tool_resp_ctxs) == 1 + assert tool_resp_ctxs[0].initial_content.content == "Sunny, 72F" + assert tool_resp_ctxs[0].closed is True + + async def test_multi_step_final_text_is_last_segment(self): + """Unified surface: final_text uses last-segment semantics. + + auto_send resets final_text_parts when a new Start(TextContent) is seen, + so multi-step turns (text -> tool -> text) return only the LAST text segment. + This matches the behaviour documented in auto_send.py and mirrors + stream_pydantic_ai_events. + """ + from langchain_core.messages import AIMessage, ToolMessage, AIMessageChunk + + chunk1 = AIMessageChunk(content="Searching...") + ai_msg1 = AIMessage(content="Searching...", tool_calls=[{"id": "c1", "name": "s", "args": {}}]) + tool_msg = ToolMessage(content="results", tool_call_id="c1", name="s") + chunk2 = AIMessageChunk(content="Found it!") + ai_msg2 = AIMessage(content="Found it!") + + events = [ + ("messages", (chunk1, {})), + ("updates", {"agent": {"messages": [ai_msg1]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ("messages", (chunk2, {})), + ("updates", {"agent": {"messages": [ai_msg2]}}), + ] + result, fake_streaming, _ = await _run_auto_send_turn(events) + + # Last segment only — first text segment is NOT in final_text + assert result.final_text == "Found it!" + + # Two text streaming contexts still opened (both streamed to Redis) + text_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, TextContent)] + assert len(text_ctxs) == 2 + + async def test_empty_stream_returns_empty_final_text(self): + result, fake_streaming, _ = await _run_auto_send_turn([]) + assert result.final_text == "" + assert fake_streaming.contexts == [] + + async def test_turn_usage_populated_after_events_consumed(self): + """LangGraphTurn.usage() is populated via the on_final_ai_message callback + during event iteration. TurnResult.usage is a snapshot from before events run + (emitter.auto_send_turn evaluates turn.usage() eagerly); the authoritative + post-iteration usage is on turn.usage() directly.""" + from langchain_core.messages import AIMessage + + fake_streaming = _FakeStreaming() + usage_meta = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15} + ai_msg = AIMessage(content="hi", usage_metadata=usage_meta) + events = [("updates", {"agent": {"messages": [ai_msg]}})] + + turn = LangGraphTurn(_make_stream(events), model="gpt-4") + emitter = UnifiedEmitter( + task_id="task1", trace_id=None, parent_span_id=None, tracer=False, streaming=fake_streaming + ) + await emitter.auto_send_turn(turn) + + # After auto_send_turn, turn.usage() has the captured values + usage = turn.usage() + assert usage.input_tokens == 10 + assert usage.output_tokens == 5 + assert usage.total_tokens == 15 + + async def test_tracer_produces_tool_spans_for_full_events(self): + """AGX1-377: SpanDeriver now handles Full tool events (request opens, response closes). + + Full(ToolRequestContent) opens a tool span; Full(ToolResponseContent) closes it. + This aligns LangGraph tracing with Start+Done harnesses (pydantic-ai, openai-agents). + """ + from langchain_core.messages import AIMessage, ToolMessage + + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="ok", tool_call_id="c1", name="t") + + events = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] + _, _, fake_tracing = await _run_auto_send_turn(events, trace_id="trace-1") + + assert fake_tracing is not None + assert len(fake_tracing.started) == 1, "Full(ToolRequestContent) opens one tool span" + assert fake_tracing.started[0][0] == "t", "span name matches the tool name" + assert len(fake_tracing.ended) == 1, "Full(ToolResponseContent) closes the span" diff --git a/tests/lib/core/harness/test_harness_langgraph_sync.py b/tests/lib/core/harness/test_harness_langgraph_sync.py new file mode 100644 index 000000000..5a6667f7e --- /dev/null +++ b/tests/lib/core/harness/test_harness_langgraph_sync.py @@ -0,0 +1,228 @@ +"""Integration test: sync (HTTP-yield) channel with a LangGraph agent. + +Exercises the unified harness surface (UnifiedEmitter.yield_turn + LangGraphTurn) +with a minimal fake LangGraph stream so the test runs fully offline (no API +keys, no Redis, no Agentex server). + +Agent description +----------------- +A simulated single-tool agent run using hand-crafted LangGraph event tuples: +one tool request + response, followed by a final text reply. + +What is tested +-------------- +- The sync handler correctly yields StreamTaskMessage* events in order: + Full(ToolRequest) then Full(ToolResponse) then text Start+Delta+Done. +- With trace_id + fake tracing, the SpanDeriver fires for text events. +- AGX1-377: tool calls are Full events (not Start+Done), so the SpanDeriver + does NOT produce tool spans for LangGraph (documented gap, tracked in AGX1-373). +- Final text is accumulated via yield mode. + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual HTTP streaming over the ACP sync endpoint. +- Real LLM calls or real LangGraph graph execution. +- The full FastACP request/response lifecycle. + +See also: test_harness_langgraph_async.py and test_harness_langgraph_temporal.py +for the other two channels. +""" + +from __future__ import annotations + +import sys +from typing import Any + +import pytest + +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_update import ( + StreamTaskMessageFull, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn + +# --------------------------------------------------------------------------- +# Remove conftest stubs so real langchain_core types are used +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _real_langchain_core(): + stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] + saved = {k: sys.modules.pop(k) for k in stub_keys} + import importlib + + importlib.import_module("langchain_core.messages") + yield + sys.modules.update(saved) + + +# --------------------------------------------------------------------------- +# Fake tracing backend +# --------------------------------------------------------------------------- + + +class _FakeSpan: + def __init__(self, name: str) -> None: + self.name = name + self.output: Any = None + + +class _FakeTracing: + def __init__(self) -> None: + self.started: list[tuple[str, Any]] = [] + self.ended: list[tuple[str, Any]] = [] + + async def start_span( + self, *, trace_id: str, name: str, input: Any = None, parent_id: Any = None, **kw: Any + ) -> _FakeSpan: + self.started.append((name, parent_id)) + return _FakeSpan(name) + + async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None: + self.ended.append((span.name, span.output)) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_stream(events: list[tuple[str, Any]]): + async def _gen(): + for e in events: + yield e + + return _gen() + + +async def _run_yield_turn( + stream_events: list[tuple[str, Any]], trace_id: str | None = None +) -> tuple[list[Any], _FakeTracing | None]: + fake_tracing = _FakeTracing() if trace_id else None + tracer: SpanTracer | bool | None = None + if trace_id and fake_tracing is not None: + tracer = SpanTracer(trace_id=trace_id, parent_span_id=None, task_id="task1", tracing=fake_tracing) + + emitter = UnifiedEmitter( + task_id="task1", + trace_id=trace_id, + parent_span_id=None, + tracer=tracer if tracer is not None else False, + ) + turn = LangGraphTurn(_make_stream(stream_events), model=None) + out = [e async for e in emitter.yield_turn(turn)] + return out, fake_tracing + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestSyncYieldChannel: + async def test_text_only_stream_yields_start_delta_done(self): + from langchain_core.messages import AIMessage, AIMessageChunk + + chunk = AIMessageChunk(content="Hello from LangGraph!") + ai_msg = AIMessage(content="Hello from LangGraph!") + events = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + out, _ = await _run_yield_turn(events) + + types = [type(e).__name__ for e in out] + assert "StreamTaskMessageStart" in types + assert "StreamTaskMessageDelta" in types + assert "StreamTaskMessageDone" in types + + async def test_tool_call_yields_full_events(self): + from langchain_core.messages import AIMessage, ToolMessage + + tc = {"id": "call_1", "name": "get_weather", "args": {"city": "Paris"}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="Sunny, 72F", tool_call_id="call_1", name="get_weather") + events = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] + out, _ = await _run_yield_turn(events) + + full_events = [e for e in out if isinstance(e, StreamTaskMessageFull)] + assert len(full_events) == 2 + + contents = [e.content for e in full_events] + assert any(isinstance(c, ToolRequestContent) for c in contents) + assert any(isinstance(c, ToolResponseContent) for c in contents) + + async def test_multi_step_yields_events_in_order(self): + from langchain_core.messages import AIMessage, ToolMessage, AIMessageChunk + + chunk1 = AIMessageChunk(content="Searching...") + ai_msg1 = AIMessage(content="Searching...", tool_calls=[{"id": "c1", "name": "search", "args": {"q": "test"}}]) + tool_msg = ToolMessage(content="results", tool_call_id="c1", name="search") + chunk2 = AIMessageChunk(content="Found it!") + ai_msg2 = AIMessage(content="Found it!") + + events = [ + ("messages", (chunk1, {})), + ("updates", {"agent": {"messages": [ai_msg1]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ("messages", (chunk2, {})), + ("updates", {"agent": {"messages": [ai_msg2]}}), + ] + out, _ = await _run_yield_turn(events) + + # Should have multiple start events (one per text segment) + starts = [e for e in out if isinstance(e, StreamTaskMessageStart)] + assert len(starts) >= 2 + # And two Full events (tool req + tool resp) + fulls = [e for e in out if isinstance(e, StreamTaskMessageFull)] + assert len(fulls) == 2 + + async def test_empty_stream_yields_nothing(self): + out, _ = await _run_yield_turn([]) + assert out == [] + + async def test_tracer_produces_tool_spans_for_full_events(self): + """AGX1-377: SpanDeriver now handles Full tool events (request opens, response closes). + + Full(ToolRequestContent) opens a tool span; Full(ToolResponseContent) closes it. + This aligns LangGraph tracing with Start+Done harnesses (pydantic-ai, openai-agents). + """ + from langchain_core.messages import AIMessage, ToolMessage + + tc = {"id": "c1", "name": "t", "args": {}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="ok", tool_call_id="c1", name="t") + + events = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ] + _, fake_tracing = await _run_yield_turn(events, trace_id="trace-1") + + assert fake_tracing is not None + assert len(fake_tracing.started) == 1, "Full(ToolRequestContent) opens one tool span" + assert fake_tracing.started[0][0] == "t", "span name matches the tool name" + assert len(fake_tracing.ended) == 1, "Full(ToolResponseContent) closes the span" + + async def test_usage_captured_after_yield(self): + from langchain_core.messages import AIMessage + + usage_meta = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15} + ai_msg = AIMessage(content="Hi!", usage_metadata=usage_meta) + events = [("updates", {"agent": {"messages": [ai_msg]}})] + + turn = LangGraphTurn(_make_stream(events), model="gpt-4") + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + _ = [e async for e in emitter.yield_turn(turn)] + + usage = turn.usage() + assert usage.input_tokens == 10 + assert usage.output_tokens == 5 diff --git a/tests/lib/core/harness/test_harness_langgraph_temporal.py b/tests/lib/core/harness/test_harness_langgraph_temporal.py new file mode 100644 index 000000000..1a094a33c --- /dev/null +++ b/tests/lib/core/harness/test_harness_langgraph_temporal.py @@ -0,0 +1,233 @@ +"""Integration test: Temporal channel with a LangGraph agent. + +The Temporal LangGraph agent pattern uses ``emit_langgraph_messages`` (from +``_langgraph_messages.py``) inside a Temporal activity. That module is not +yet unified onto the harness surface (it has its own Redis-streaming code). + +This test file verifies the LangGraph Temporal agent's streaming behavior using +the same fake streaming infrastructure as test_harness_langgraph_async.py. The +key difference from the non-temporal async path is that in Temporal, each agent +turn runs inside a Temporal activity that has already been handed the task_id +and a pre-wired streaming client — so the ``UnifiedEmitter.auto_send_turn`` +path is identical. The graph activities and workflow scaffolding are not tested +here; that requires a running Temporal cluster. + +What is tested +-------------- +- stream_langgraph_events (the public async API used by temporal agent acp.py via + the workflow activity) produces the same result via the unified surface. +- Usage from AIMessage.usage_metadata is captured in TurnResult.usage. +- The auto_send_turn path for a temporal-style call (same as async). + +What is NOT covered without live infrastructure +----------------------------------------------- +- Actual Temporal workflow execution (requires a running Temporal cluster). +- The Temporal activity retry/compensation logic. +- LangGraph checkpoint storage via TemporalCheckpointer. +- emit_langgraph_messages (the Temporal-specific streaming helper). +- Real LLM calls or real LangGraph graph execution. + +See also: test_harness_langgraph_sync.py and test_harness_langgraph_async.py. +""" + +from __future__ import annotations + +import sys +from typing import Any +from dataclasses import field, dataclass + +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.adk._modules._langgraph_turn import LangGraphTurn +from agentex.lib.adk._modules._langgraph_async import stream_langgraph_events + +# --------------------------------------------------------------------------- +# Remove conftest stubs so real langchain_core types are used +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _real_langchain_core(): + stub_keys = [k for k in sys.modules if k.startswith("langchain_core") or k.startswith("langgraph")] + saved = {k: sys.modules.pop(k) for k in stub_keys} + import importlib + + importlib.import_module("langchain_core.messages") + yield + sys.modules.update(saved) + + +# --------------------------------------------------------------------------- +# Fake streaming backend +# --------------------------------------------------------------------------- + + +@dataclass +class _FakeCtx: + ctype: str + initial_content: Any + task_message: TaskMessage + closed: bool = False + deltas: list[Any] = field(default_factory=list) + + async def __aenter__(self) -> "_FakeCtx": + return self + + async def __aexit__(self, *args: Any) -> bool: + await self.close() + return False + + async def close(self) -> None: + self.closed = True + + async def stream_update(self, update: Any) -> Any: + self.deltas.append(update) + return update + + +class _FakeStreaming: + def __init__(self) -> None: + self.contexts: list[_FakeCtx] = [] + + def streaming_task_message_context(self, task_id: str, initial_content: Any, **kw: Any) -> _FakeCtx: + ctype = getattr(initial_content, "type", None) or "" + tm = TaskMessage(id=f"m{len(self.contexts) + 1}", task_id=task_id, content=initial_content) + ctx = _FakeCtx(ctype=ctype, initial_content=initial_content, task_message=tm) + self.contexts.append(ctx) + return ctx + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_stream(events: list[tuple[str, Any]]): + async def _gen(): + for e in events: + yield e + + return _gen() + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestTemporalAutoSendChannel: + async def test_stream_langgraph_events_plain_text(self, monkeypatch): + """stream_langgraph_events (used by temporal agents via the acp.py activity) returns + the accumulated final text.""" + from langchain_core.messages import AIMessage, AIMessageChunk + + from agentex.lib import adk as adk_module + + fake_streaming = _FakeStreaming() + monkeypatch.setattr(adk_module, "streaming", fake_streaming) + + chunk = AIMessageChunk(content="Hello Temporal!") + ai_msg = AIMessage(content="Hello Temporal!") + events = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + + final = await stream_langgraph_events(_make_stream(events), "task-1") + assert final == "Hello Temporal!" + + async def test_stream_langgraph_events_tool_call(self, monkeypatch): + from langchain_core.messages import AIMessage, ToolMessage + + from agentex.lib import adk as adk_module + + fake_streaming = _FakeStreaming() + monkeypatch.setattr(adk_module, "streaming", fake_streaming) + + tc = {"id": "c1", "name": "search", "args": {"q": "test"}} + ai_msg = AIMessage(content="", tool_calls=[tc]) + tool_msg = ToolMessage(content="results", tool_call_id="c1", name="search") + chunk_final = AIMessage(content="Here are the results.") + + events = [ + ("updates", {"agent": {"messages": [ai_msg]}}), + ("updates", {"tools": {"messages": [tool_msg]}}), + ("updates", {"agent": {"messages": [chunk_final]}}), + ] + + final = await stream_langgraph_events(_make_stream(events), "task-1") + + # Check tool request and response posted to fake streaming + tool_req_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, ToolRequestContent)] + tool_resp_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, ToolResponseContent)] + assert len(tool_req_ctxs) == 1 + assert len(tool_resp_ctxs) == 1 + assert tool_req_ctxs[0].initial_content.name == "search" + + async def test_langgraph_turn_auto_send_via_unified_emitter(self): + """Direct UnifiedEmitter.auto_send_turn path used by temporal agent workflow + activities. Uses a fake streaming backend (no Redis).""" + from langchain_core.messages import AIMessage, AIMessageChunk + + fake_streaming = _FakeStreaming() + chunk = AIMessageChunk(content="Temporal answer!") + ai_msg = AIMessage(content="Temporal answer!") + events = [ + ("messages", (chunk, {})), + ("updates", {"agent": {"messages": [ai_msg]}}), + ] + + turn = LangGraphTurn(_make_stream(events), model=None) + emitter = UnifiedEmitter( + task_id="task-1", + trace_id=None, + parent_span_id=None, + streaming=fake_streaming, + ) + result = await emitter.auto_send_turn(turn) + + assert result.final_text == "Temporal answer!" + text_ctxs = [c for c in fake_streaming.contexts if isinstance(c.initial_content, TextContent)] + assert len(text_ctxs) == 1 + + async def test_usage_captured_via_turn_after_events_consumed(self): + """Usage from AIMessage.usage_metadata is captured via the on_final_ai_message + callback during event iteration. The authoritative usage is on turn.usage() + after events are consumed (emitter.auto_send_turn evaluates turn.usage() + eagerly before iteration, so TurnResult.usage is a pre-iteration snapshot).""" + from langchain_core.messages import AIMessage + + fake_streaming = _FakeStreaming() + usage_meta = {"input_tokens": 20, "output_tokens": 10, "total_tokens": 30} + ai_msg = AIMessage(content="answer", usage_metadata=usage_meta) + events = [("updates", {"agent": {"messages": [ai_msg]}})] + + turn = LangGraphTurn(_make_stream(events), model="gpt-4o") + emitter = UnifiedEmitter( + task_id="task-1", + trace_id=None, + parent_span_id=None, + streaming=fake_streaming, + ) + await emitter.auto_send_turn(turn) + + # After auto_send_turn, turn.usage() has the captured values + usage = turn.usage() + assert usage.input_tokens == 20 + assert usage.output_tokens == 10 + assert usage.total_tokens == 30 + + async def test_empty_stream_returns_empty_string(self, monkeypatch): + from agentex.lib import adk as adk_module + + fake_streaming = _FakeStreaming() + monkeypatch.setattr(adk_module, "streaming", fake_streaming) + + final = await stream_langgraph_events(_make_stream([]), "task-1") + assert final == "" + assert fake_streaming.contexts == []