-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathgoogle_codegen.py
More file actions
304 lines (264 loc) · 12.9 KB
/
Copy pathgoogle_codegen.py
File metadata and controls
304 lines (264 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""Render a ``GoogleDeployPlan`` into a self-contained source package.
Agent Engine deploys by *building a container from source*: agentlift ships a
generated Python package (via ``extra_packages``) that the engine imports
remotely through ``ModuleAgent``. This module produces that package -- and it is
pure: same plan in, same files out, no network, no clock. ``agent.py`` is the
contract the engine runs.
The generated ``agent.py`` imports ONLY ``google-adk`` + ``vertexai`` (never
agentlift), so it builds in the engine's own environment. It constructs the ADK
agents lazily at import (MCP toolsets do not connect until first use, verified),
loads each skill from the bundle shipped alongside it, and exposes:
- ``root_agent`` : the root ``LlmAgent`` (ADK's conventional entrypoint name)
- ``adk_app`` : ``AdkApp(agent=root_agent)`` -- what ``ModuleAgent`` targets
Secrets are never written here. An MCP auth header reads its value from a named
engine env var at runtime (``os.environ.get("AGENTLIFT_MCP_...")``); the value is
populated by the deploy step as an Agent Engine env var, not inlined into source.
"""
from __future__ import annotations
import os
from typing import Any
from .google_plan import GoogleAgentNode, GoogleDeployPlan, web_tool_agent_name
PACKAGE_NAME = "agentlift_engine"
MODULE_NAME = f"{PACKAGE_NAME}.agent"
APP_SYMBOL = "adk_app"
ROOT_SYMBOL = "root_agent"
SKILLS_SUBDIR = "skills"
# Built-in web tools lower to a dedicated single-tool ADK sub-agent wrapped in an
# AgentTool (the shape ADK uses in create_google_search_agent / create_url_context_agent).
# propagate_grounding_metadata surfaces the inner Google-Search / URL-Context grounding
# up to the outer event stream -- the objective signal a live receipt keys "EXERCISED" on.
# Strong, narrow prompts keep each wrapped agent on-task as a tool, not a chat partner.
_WEB_SEARCH_DESCRIPTION = (
"Search the public web with Google Search and return grounded, cited findings. "
"Use this whenever the task needs current or external information you do not already have."
)
_WEB_SEARCH_INSTRUCTION = (
"You are a web search tool backed by Google Search. Given the caller's query, find "
"accurate, up-to-date information and return a concise, factual answer together with the "
"source URLs you relied on. If the search surfaces nothing useful, say so plainly rather "
"than guessing."
)
_WEB_FETCH_DESCRIPTION = (
"Fetch and read the contents of specific URLs and answer strictly from those pages. "
"Use this when the caller provides one or more URLs to retrieve."
)
_WEB_FETCH_INSTRUCTION = (
"You are a URL retrieval tool backed by URL Context. Retrieve the exact URL(s) you are "
"given and answer only from their contents. Always list the source URL(s) you actually "
"read. If a URL cannot be retrieved, say which one failed and why instead of guessing."
)
_HEADER = (
"Generated by `agentlift deploy --target google` -- DO NOT hand-edit.\n"
"Edit the .managed-agents/ folder and redeploy. Imports only google-adk +\n"
"vertexai so it builds in the Agent Engine container. MCP auth headers read\n"
"their values from engine env vars at runtime (never inlined here)."
)
# --------------------------------------------------------------------------- #
# python literal helpers
# --------------------------------------------------------------------------- #
def _pystr(s: str) -> str:
"""A triple-quoted Python string literal that always parses.
Doubles backslashes, escapes any ``\"\"\"`` run, and escapes a trailing quote
so the content can never collide with the closing delimiter. (Covered by an
adversarial codegen test.)"""
body = s.replace("\\", "\\\\").replace('"""', '\\"\\"\\"')
if body.endswith('"'):
body = body[:-1] + '\\"'
return '"""' + body + '"""'
def _var(node: GoogleAgentNode) -> str:
return "agent_" + node.safe_name
def _mcp_toolset_src(recipe) -> str:
"""One ``McpToolset(...)`` constructor call as source text."""
if recipe.auth_env_vars:
items = ", ".join(
f"{header!r}: os.environ.get({env!r}, \"\")"
for header, env in sorted(recipe.auth_env_vars.items())
)
conn = f"StreamableHTTPConnectionParams(url={recipe.url!r}, headers={{{items}}})"
else:
conn = f"StreamableHTTPConnectionParams(url={recipe.url!r})"
tf = f", tool_filter={list(recipe.tool_filter)!r}" if recipe.tool_filter is not None else ""
return f"McpToolset(connection_params={conn}{tf})"
def _web_tool_src(node: GoogleAgentNode, tool: str) -> str:
"""One web built-in lowered to a wrapped tool-agent constructor call.
The wrapped sub-agent is built with ``web_model(...)`` (not ``vertex_model``):
Google Search grounding and URL Context are Gemini built-ins, so the web tool-agent
must run on a web-capable Gemini model even when its parent agent does not -- the
mixed-model invariant a Claude-on-Vertex parent would otherwise break.
"""
name = web_tool_agent_name(node.safe_name, tool)
factory = "_web_search_tool" if tool == "web_search" else "_web_fetch_tool"
return f"{factory}({name!r}, web_model({node.folder_model!r}))"
def _agent_block(node: GoogleAgentNode, var_by_name: dict[str, str]) -> list[str]:
lines = [f"# --- agent: {node.name} ---"]
tool_srcs: list[str] = [_mcp_toolset_src(r) for r in node.mcp]
if node.skills:
skills = ", ".join(f"_skill({name!r})" for name in node.skills)
tool_srcs.append(f"SkillToolset(skills=[{skills}])")
for tool in node.builtin_web: # sorted in the plan: web_fetch before web_search
tool_srcs.append(_web_tool_src(node, tool))
lines.append(f"{_var(node)} = LlmAgent(")
lines.append(f" name={node.safe_name!r},")
lines.append(f" model=vertex_model({node.folder_model!r}),")
lines.append(f" instruction={_pystr(node.instruction)},")
lines.append(f" description={node.description!r},")
if tool_srcs:
lines.append(" tools=[")
for ts in tool_srcs:
lines.append(f" {ts},")
lines.append(" ],")
if node.sub_agents:
subs = ", ".join(var_by_name[s] for s in node.sub_agents)
lines.append(f" sub_agents=[{subs}],")
lines.append(")")
lines.append("")
return lines
def _web_helper_defs(any_search: bool, any_fetch: bool) -> list[str]:
"""Generated factories for the wrapped web tool-agents (emitted only when used)."""
defs: list[str] = []
if any_search or any_fetch:
defs += [
"# Google Search grounding and URL Context are Gemini built-ins, so a wrapped web",
"# tool-agent must run on a web-capable Gemini model even when its parent agent does",
"# not (e.g. a Claude-on-Vertex parent). Independent of vertex_model's Claude policy.",
"def web_model(folder_model):",
' return folder_model if folder_model.startswith("gemini") else DEFAULT_VERTEX_MODEL',
"",
"",
]
if any_search:
defs += [
"def _web_search_tool(name, model):",
" return AgentTool(",
" agent=LlmAgent(",
" name=name,",
" model=model,",
f" description={_pystr(_WEB_SEARCH_DESCRIPTION)},",
f" instruction={_pystr(_WEB_SEARCH_INSTRUCTION)},",
" tools=[GoogleSearchTool()],",
" ),",
" propagate_grounding_metadata=True,",
" )",
"",
"",
]
if any_fetch:
defs += [
"def _web_fetch_tool(name, model):",
" return AgentTool(",
" agent=LlmAgent(",
" name=name,",
" model=model,",
f" description={_pystr(_WEB_FETCH_DESCRIPTION)},",
f" instruction={_pystr(_WEB_FETCH_INSTRUCTION)},",
" tools=[url_context],",
" ),",
" propagate_grounding_metadata=True,",
" )",
"",
"",
]
return defs
def render_agent_py(plan: GoogleDeployPlan) -> str:
var_by_name = {n.name: _var(n) for n in plan.agents}
claude_models = sorted({
n.folder_model for n in plan.agents if n.folder_model.startswith("claude")
})
map_lines = [f" {m!r}: DEFAULT_VERTEX_MODEL," for m in claude_models]
any_search = any("web_search" in n.builtin_web for n in plan.agents)
any_fetch = any("web_fetch" in n.builtin_web for n in plan.agents)
# Web-tool imports ride in only when a node maps a web built-in, so a folder
# without web tools generates an agent.py with no unused symbols.
web_imports: list[str] = []
if any_search or any_fetch:
web_imports.append("from google.adk.tools.agent_tool import AgentTool")
if any_search:
web_imports.append("from google.adk.tools.google_search_tool import GoogleSearchTool")
if any_fetch:
web_imports.append("from google.adk.tools import url_context")
out = [
'"""' + _HEADER + '"""',
"import os",
"",
"from google.adk.agents import LlmAgent",
"from google.adk.tools.mcp_tool import McpToolset, StreamableHTTPConnectionParams",
"from google.adk.tools.skill_toolset import SkillToolset",
"from google.adk.skills import load_skill_from_dir",
*web_imports,
"from vertexai.preview.reasoning_engines import AdkApp",
"",
"_HERE = os.path.dirname(os.path.abspath(__file__))",
"",
"# Model ids change often, so the folder -> Vertex mapping lives in one place.",
"# Override the default for every Claude-origin agent with AGENTLIFT_GOOGLE_MODEL.",
f'DEFAULT_VERTEX_MODEL = os.environ.get("AGENTLIFT_GOOGLE_MODEL", {plan.deploy_model!r})',
"MODEL_MAP = {",
*map_lines,
"}",
"",
"",
"def vertex_model(folder_model):",
" if folder_model in MODEL_MAP:",
" return MODEL_MAP[folder_model]",
' return DEFAULT_VERTEX_MODEL if folder_model.startswith("claude") else folder_model',
"",
"",
"def _skill(name):",
f' return load_skill_from_dir(os.path.join(_HERE, {SKILLS_SUBDIR!r}, name))',
"",
"",
*_web_helper_defs(any_search, any_fetch),
]
for node in plan.agents:
out += _agent_block(node, var_by_name)
root_var = var_by_name[plan.root_agent]
out += [
f"{ROOT_SYMBOL} = {root_var}",
f"{APP_SYMBOL} = AdkApp(agent={ROOT_SYMBOL}, enable_tracing=False)",
"",
]
return "\n".join(out)
# --------------------------------------------------------------------------- #
# package assembly
# --------------------------------------------------------------------------- #
def render_text_files(plan: GoogleDeployPlan) -> dict[str, str]:
"""Text files of the build, keyed by relpath from the build root."""
return {
f"{PACKAGE_NAME}/__init__.py": "",
f"{PACKAGE_NAME}/agent.py": render_agent_py(plan),
"requirements.txt": "\n".join(plan.requirements) + "\n",
}
def skill_file_manifest(plan: GoogleDeployPlan) -> list[tuple[str, str]]:
"""(relpath_in_build, abs_source) for every skill file shipped into the package.
arcname already carries the ``<skill-name>/...`` prefix, so it lands under
``agentlift_engine/skills/<skill-name>/...`` -- exactly where ``_skill(name)``
(``load_skill_from_dir``) looks for it."""
out: list[tuple[str, str]] = []
for bundle in plan.skill_bundles:
for arcname, abs_path in bundle.files:
out.append((f"{PACKAGE_NAME}/{SKILLS_SUBDIR}/{arcname}", abs_path))
return out
def write_package(plan: GoogleDeployPlan, build_dir: str) -> dict[str, Any]:
"""Materialize the package on disk under ``build_dir`` (impure: file IO only).
Returns the handles the deploy step needs: where the package is, the module +
symbol ``ModuleAgent`` targets, and the resolved requirements."""
os.makedirs(build_dir, exist_ok=True)
for rel, text in render_text_files(plan).items():
dest = os.path.join(build_dir, rel.replace("/", os.sep))
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(dest, "w", encoding="utf-8") as fh:
fh.write(text)
for rel, abs_src in skill_file_manifest(plan):
dest = os.path.join(build_dir, rel.replace("/", os.sep))
os.makedirs(os.path.dirname(dest), exist_ok=True)
with open(abs_src, "rb") as src, open(dest, "wb") as dst:
dst.write(src.read())
return {
"build_dir": build_dir,
"package_dir": os.path.join(build_dir, PACKAGE_NAME),
"package_name": PACKAGE_NAME,
"module_name": MODULE_NAME,
"app_symbol": APP_SYMBOL,
"root_symbol": ROOT_SYMBOL,
"requirements": list(plan.requirements),
}