# agent.py (modify get_tools_async and other parts as needed)
import os
import asyncio
from dotenv import load_dotenv
from google.genai import types
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.artifacts.in_memory_artifact_service import InMemoryArtifactService # Optional
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, SseServerParams, StdioServerParameters
from qwen import llm
load_dotenv('ADK/.env') # 换成自己的env地址
global Amaps_api_key
Amaps_api_key = os.getenv('AMAP_MAPS_API_KEY')
# 使用高德地图mcp server
async def get_tools_async():""" Step 1: Gets tools from the 高德地图 MCP Server."""# IMPORTANT: Replace with your actual key# if "YOUR_API_KEY" in Amaps_api_key:
# raise ValueError("Please replace 'YOUR_API_KEY_FROM_STEP_1' with your actual Google Maps API key.")print("Attempting to connect to MCP 高德 Maps server...")tools, exit_stack = await MCPToolset.from_server(connection_params=StdioServerParameters(command='npx',args=["-y","@amap/amap-maps-mcp-server"],# Pass the API key as an environment variable to the npx processenv={"AMAP_MAPS_API_KEY": Amaps_api_key}))print("MCP Toolset created successfully.")return tools, exit_stack# --- Step 2: Agent Definition ---
async def get_agent_async():"""Creates an ADK Agent equipped with tools from the MCP Server."""tools, exit_stack = await get_tools_async()print(f"Fetched {len(tools)} tools from MCP server.")root_agent = LlmAgent(model=llm, # Adjust if neededname='maps_assistant',instruction='Help user with mapping and directions using available tools.',tools=tools,)return root_agent, exit_stack# --- Step 3: Main Execution Logic (modify query) ---
async def async_main():session_service = InMemorySessionService()artifacts_service = InMemoryArtifactService() # Optionalsession = session_service.create_session(state={}, app_name='mcp_maps_app', user_id='user_maps')# TODO: Use specific addresses for reliable results with this serverquery = "杭州站的位置在哪里"print(f"User Query: '{query}'")content = types.Content(role='user', parts=[types.Part(text=query)])root_agent, exit_stack = await get_agent_async()runner = Runner(app_name='mcp_maps_app',agent=root_agent,artifact_service=artifacts_service, # Optionalsession_service=session_service,)print("Running agent...")final_response_content = "No final response received."events_async = runner.run_async(session_id=session.id, user_id=session.user_id, new_message=content)# 取出完整结果async for event in events_async:if event.is_final_response() and event.content and event.content.parts:# For output_schema, the content is the JSON string itselffinal_response_content = event.content.parts[0].textprint(final_response_content)# print(f"Event received: {event}")print("Closing MCP server connection...")await exit_stack.aclose()print("Cleanup complete.")if __name__ == '__main__':try:asyncio.run(async_main())except Exception as e:print(f"An error occurred: {e}")
运行结果: