-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Description
Describe the Bug
When deploying an agent with a custom BaseLlm client using adk deploy agent_engine, the deployment succeeds, but querying the deployed agent fails with the following error:
"Agent Engine Error: Default method query not found. Available methods are: ['async_delete_session', 'get_session', 'delete_session', 'create_session', 'async_create_session', 'async_search_memory', 'async_get_session', 'list_sessions', 'async_list_sessions', 'async_add_session_to_memory']."
This indicates that the stream_query and query methods are not registered on the deployed agent, with only session/memory management methods being available.
Important Notes:
The same agent functions correctly in ADK Playground (adk web agents), recognizing intents and triggering tools.
The agent also works perfectly when deployed using agent_engines.create() inline, with all code in a single file.
The failure specifically occurs when deploying via the adk deploy agent_engine CLI command.
To Reproduce
Install Dependencies
requirements.txt:
google-adk>=1.0.0
google-cloud-aiplatform[adk,agent_engines]>=1.60.0
google-cloud-bigquery>=3.0.0
google-auth>=2.0.0
pydantic>=2.7.0
httpx>=0.27.0
tenacity>=8.2.0
python-dotenv>=1.0.0
Install using: pip install -r requirements.txt
Create Agent with Custom BaseLlm Client
Project Structure:
agents/
├── __init__.py
├── agent.py
├── toolbox_agent.py
├── requirements.txt
├── .env
├── clients/
│ ├── __init__.py
│ └── custom_client.py
└── tools/
├── __init__.py
└── memory_callbacks.py
agents/init.py:
from .agent import root_agent
__all__ = ['root_agent']
agents/clients/custom_client.py:
import os
import uuid
import time
import httpx
import logging
from typing import AsyncGenerator, Optional, List, Dict, Any
from google.adk.models import BaseLlm, LlmRequest, LlmResponse
from google.genai.types import Content, Part
logger = logging.getLogger(__name__)
class CustomAPIClient(BaseLlm):
"""Custom ADK Model Client for internal API gateway."""
model: str = "custom-proxy"
target_model: str = "gemini-2.0-flash-lite-001"
provider: str = "CUSTOM"
base_url: str = "https://api.example.com/v2/chat/generation/"
environment: str = "DEV"
client_id: Optional[str] = None
client_secret: Optional[str] = None
@classmethod
def supported_models(cls) -> list[str]:
return ["custom/.*"]
def __init__(self, **data):
super().__init__(**data)
self.client_id = os.environ.get("CLIENT_ID")
self.client_secret = os.environ.get("CLIENT_SECRET")
if self.model.startswith("custom/"):
self.target_model = self.model.split("custom/", 1)[1]
def __getstate__(self):
"""Custom pickle serialization."""
state = self.__dict__.copy()
state['__pydantic_fields_set__'] = getattr(self, '__pydantic_fields_set__', set())
state['__pydantic_extra__'] = getattr(self, '__pydantic_extra__', None)
state['__pydantic_private__'] = getattr(self, '__pydantic_private__', None)
return state
def __setstate__(self, state):
"""Custom pickle deserialization."""
object.__setattr__(self, '__pydantic_fields_set__', state.pop('__pydantic_fields_set__', set()))
object.__setattr__(self, '__pydantic_extra__', state.pop('__pydantic_extra__', None))
object.__setattr__(self, '__pydantic_private__', state.pop('__pydantic_private__', None))
self.__dict__.update(state)
self.__dict__['client_id'] = os.environ.get("CLIENT_ID")
self.__dict__['client_secret'] = os.environ.get("CLIENT_SECRET")
async def generate_content_async(
self,
llm_request: LlmRequest,
stream: bool = False
) -> AsyncGenerator[LlmResponse, None]:
"""Generate response from custom API gateway."""
if not self.client_id or not self.client_secret:
self.client_id = os.environ.get("CLIENT_ID")
self.client_secret = os.environ.get("CLIENT_SECRET")
if not self.client_id or not self.client_secret:
raise ValueError("CLIENT_ID and CLIENT_SECRET must be set")
messages = self._convert_contents(llm_request.contents)
payload = {
"provider": "GEMINI",
"model": self.target_model,
"messages": messages,
"params": {"stream": stream, "temperature": 0.7, "maxOutputTokens": 2048},
"infos": {
"environment": self.environment,
"conversationId": str(uuid.uuid4()),
}
}
headers = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=60.0) as http_client:
response = await http_client.post(self.base_url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
yield self._parse_response(data)
def _convert_contents(self, contents: List[Content]) -> List[Dict]:
converted = []
for content in contents:
parts = []
if content.parts:
for part in content.parts:
if part.text:
parts.append({"type": "text", "text": part.text})
elif part.function_call:
parts.append({
"type": "functionCall",
"name": part.function_call.name,
"parameters": part.function_call.args
})
elif part.function_response:
parts.append({
"name": part.function_response.name,
"response": part.function_response.response
})
role = content.role
if any(p.get("response") for p in parts):
role = "tool"
converted.append({"role": role, "parts": parts})
return converted
def _parse_response(self, data: Dict) -> LlmResponse:
content_parts = []
raw_parts = data.get("message", {}).get("parts", [])
for part in raw_parts:
if part.get("type") == "text":
content_parts.append(Part(text=part.get("text")))
elif part.get("type") == "functionCall":
from google.genai.types import FunctionCall
content_parts.append(Part(
function_call=FunctionCall(
name=part.get("name"),
args=part.get("parameters")
)
))
return LlmResponse(content=Content(parts=content_parts, role="model"))
agents/agent.py:
from google.adk.agents.llm_agent import Agent
from google.adk.tools.preload_memory_tool import PreloadMemoryTool
from google.adk.models import LLMRegistry
from .toolbox_agent import toolbox_agent
from .tools.memory_callbacks import auto_save_session_to_memory_callback
from .clients.custom_client import CustomAPIClient
# Register the custom model client
LLMRegistry.register(CustomAPIClient)
custom_model = CustomAPIClient(
provider="CUSTOM",
model="custom/gemini-2.0-flash-lite-001",
environment="DEV"
)
root_agent = Agent(
model=custom_model,
name='Orchestrator',
description='Main coordinator that routes tasks to specialized agents',
instruction="""You are an intelligent orchestrator agent.
You have a specialized ToolboxAgent for BigQuery operations. When users ask about:
- Listing datasets or tables
- Running SQL queries
Use transfer_to_agent('ToolboxAgent') automatically.
For general questions, answer directly.""",
sub_agents=[toolbox_agent],
tools=[PreloadMemoryTool()],
after_agent_callback=auto_save_session_to_memory_callback,
)
agents/tools/memory_callbacks.py:
async def auto_save_session_to_memory_callback(callback_context):
"""Automatically save session to memory after each agent interaction."""
memory_service = callback_context._invocation_context.memory_service
if memory_service is not None:
await memory_service.add_session_to_memory(
callback_context._invocation_context.session
)
agents/.env:
CLIENT_ID=your_client_id
CLIENT_SECRET=your_client_secret
Deploy with adk CLI
adk deploy agent_engine \
--project YOUR_PROJECT_ID \
--region europe-west1 \
--staging_bucket gs://your-bucket \
--display_name "My Agent" \
agents
Deployment output (SUCCESS):
Staging all files in: agents_tmpxxx
Copying agent source code...
Copying agent source code complete.
Resolving files and dependencies...
Reading environment variables from agents/.env
Initializing Vertex AI...
Vertex AI initialized.
Created agents_tmpxxx/agent_engine_app.py
Files and dependencies resolved
Deploying to agent engine...
✅ Created agent engine: projects/xxx/locations/europe-west1/reasoningEngines/xxx
Cleaning up the temp folder: agents_tmpxxx
Query the Deployed Agent
import vertexai
from vertexai import agent_engines
PROJECT_ID = "your-project-id"
REGION = "europe-west1"
AGENT_ENGINE_ID = "1234567890"
vertexai.init(project=PROJECT_ID, location=REGION)
resource_name = f"projects/{PROJECT_ID}/locations/{REGION}/reasoningEngines/{AGENT_ENGINE_ID}"
agent = agent_engines.get(resource_name)
session = agent.create_session(user_id="test-user")
# This fails
for event in agent.stream_query(
user_id="test-user",
session_id=session["id"],
message="Hello, who are you?",
):
print(event)
Error
Error Details: "detail":"Agent Engine Error: Default method query not found. Available methods are: ['async_delete_session', 'get_session', 'delete_session', 'create_session', 'async_create_session', 'async_search_memory', 'async_get_session', 'list_sessions', 'async_list_sessions', 'async_add_session_to_memory']."
Expected Behavior
The deployed agent should have stream_query and query methods available, consistent with its behavior when:
Running locally with adk web agents.
Deploying with inline agent_engines.create().
Desktop Environment
OS: macOS Sequoia 15.16.1 / Ubuntu (GitHub Actions)
Python Version: 3.11.x / 3.12.x
ADK Version: 1.22.1
Model Information
Using LiteLLM: No
Model Used: Custom BaseLlm implementation that proxies to gemini-2.0-flash-lite-001 via an internal API gateway.
Additional Context
What Works:
adk web agents (local playground): Agent responds correctly, tools trigger, intents recognized.
Inline deployment with agent_engines.create() where all code is in a single file: Query methods work correctly.
What Doesn't Work:
adk deploy agent_engine CLI command: Deployment succeeds but query methods are not registered.
Hypothesis
The adk deploy agent_engine CLI might not be correctly packaging or serializing custom BaseLlm implementations. When cloudpickle serializes:
Inline code is serialized by value (the full code is included).
Imported classes are serialized by reference (only the import path is included).
This discrepancy could lead to the custom model client not being properly restored at runtime, which in turn could prevent the query methods from being registered.
Workaround
Using an inline deployment script with agent_engines.create() directly works. However, this negates the benefits of a clean project structure enabled by adk deploy.