MCP 可以与各种 AI 框架和工具集成,扩展其功能和应用场景。以下是详细的集成方法和最佳实践:
集成架构设计
MCP 集成应考虑以下方面:
- 框架兼容性:确保与目标框架的兼容性
- 性能影响:最小化对系统性能的影响
- 功能完整性:保持 MCP 和框架功能的完整性
- 错误处理:正确处理集成过程中的错误
- 配置管理:统一管理集成配置
1. 与 LangChain 集成
pythonfrom langchain.agents import AgentExecutor, create_openai_tools_agent from langchain_openai import ChatOpenAI from langchain.tools import Tool from mcp.server import Server class MCPLangChainIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.langchain_tools = [] async def convert_mcp_to_langchain_tools(self) -> list: """将 MCP 工具转换为 LangChain 工具""" mcp_tools = await self.mcp_server.list_tools() langchain_tools = [] for tool_info in mcp_tools: tool = Tool( name=tool_info["name"], description=tool_info["description"], func=self._create_tool_wrapper(tool_info["name"]) ) langchain_tools.append(tool) return langchain_tools def _create_tool_wrapper(self, tool_name: str): """创建工具包装器""" async def wrapper(**kwargs): result = await self.mcp_server.call_tool( tool_name, kwargs ) return result return wrapper async def create_langchain_agent(self): """创建 LangChain Agent""" # 转换 MCP 工具 tools = await self.convert_mcp_to_langchain_tools() # 创建 LLM llm = ChatOpenAI( model="gpt-4", temperature=0 ) # 创建 Agent agent = create_openai_tools_agent(llm, tools) # 创建 AgentExecutor agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True ) return agent_executor async def run_agent(self, query: str): """运行 Agent""" agent_executor = await self.create_langchain_agent() result = await agent_executor.ainvoke({ "input": query }) return result["output"]
2. 与 LlamaIndex 集成
pythonfrom llama_index.core import VectorStoreIndex, SimpleDirectoryReader from llama_index.core.tools import QueryEngineTool, ToolMetadata from llama_index.core.agent import ReActAgent from mcp.server import Server class MCPLlamaIndexIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server async def create_mcp_query_engine(self, tool_name: str): """创建 MCP 查询引擎""" async def query_engine_fn(query: str) -> str: result = await self.mcp_server.call_tool( tool_name, {"query": query} ) return result return query_engine_fn async def create_llamaindex_tools(self) -> list: """创建 LlamaIndex 工具""" mcp_tools = await self.mcp_server.list_tools() tools = [] for tool_info in mcp_tools: query_engine = await self.create_mcp_query_engine( tool_info["name"] ) tool = QueryEngineTool( query_engine=query_engine, metadata=ToolMetadata( name=tool_info["name"], description=tool_info["description"] ) ) tools.append(tool) return tools async def create_react_agent(self): """创建 ReAct Agent""" tools = await self.create_llamaindex_tools() agent = ReActAgent.from_tools( tools=tools, verbose=True ) return agent async def query_with_agent(self, query: str): """使用 Agent 查询""" agent = await self.create_react_agent() response = agent.query(query) return response.response
3. 与 AutoGPT 集成
pythonfrom autogpt.agent.agent import Agent from autogpt.config import Config from autogpt.models.command import Command from mcp.server import Server class MCPAutoGPTIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.command_registry = {} async def register_mcp_commands(self): """注册 MCP 命令""" mcp_tools = await self.mcp_server.list_tools() for tool_info in mcp_tools: command = Command( name=tool_info["name"], description=tool_info["description"], function=self._create_command_function(tool_info["name"]) ) self.command_registry[tool_info["name"]] = command def _create_command_function(self, tool_name: str): """创建命令函数""" async def command_function(**kwargs): result = await self.mcp_server.call_tool( tool_name, kwargs ) return result return command_function async def create_autogpt_agent(self, config: Config): """创建 AutoGPT Agent""" # 注册 MCP 命令 await self.register_mcp_commands() # 创建 Agent agent = Agent( ai_name="MCP-Agent", ai_role="Assistant", commands=self.command_registry, config=config ) return agent async def run_autogpt_task(self, task: str): """运行 AutoGPT 任务""" config = Config() agent = await self.create_autogpt_agent(config) result = await agent.run(task) return result
4. 与 FastAPI 集成
pythonfrom fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from mcp.server import Server from pydantic import BaseModel class MCPFastAPIIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.app = FastAPI(title="MCP API") self._setup_middleware() self._setup_routes() def _setup_middleware(self): """设置中间件""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def _setup_routes(self): """设置路由""" @self.app.get("/tools") async def list_tools(): """列出所有工具""" tools = await self.mcp_server.list_tools() return {"tools": tools} @self.app.post("/tools/{tool_name}/call") async def call_tool(tool_name: str, params: dict): """调用工具""" try: result = await self.mcp_server.call_tool( tool_name, params ) return {"result": result} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/resources") async def list_resources(): """列出所有资源""" resources = await self.mcp_server.list_resources() return {"resources": resources} @self.app.get("/resources/{uri}") async def read_resource(uri: str): """读取资源""" try: content = await self.mcp_server.read_resource(uri) return {"content": content} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/prompts") async def list_prompts(): """列出所有提示词""" prompts = await self.mcp_server.list_prompts() return {"prompts": prompts} @self.app.get("/health") async def health_check(): """健康检查""" return {"status": "healthy"} def get_app(self) -> FastAPI: """获取 FastAPI 应用""" return self.app
5. 与 WebSocket 集成
pythonimport asyncio import json from fastapi import WebSocket from mcp.server import Server class MCPWebSocketIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.active_connections = [] async def handle_websocket(self, websocket: WebSocket): """处理 WebSocket 连接""" await websocket.accept() self.active_connections.append(websocket) try: while True: # 接收消息 data = await websocket.receive_text() message = json.loads(data) # 处理消息 response = await self._handle_message(message) # 发送响应 await websocket.send_text(json.dumps(response)) except Exception as e: print(f"WebSocket 错误: {e}") finally: self.active_connections.remove(websocket) async def _handle_message(self, message: dict) -> dict: """处理消息""" message_type = message.get("type") if message_type == "list_tools": tools = await self.mcp_server.list_tools() return {"type": "tools_list", "data": tools} elif message_type == "call_tool": result = await self.mcp_server.call_tool( message["tool_name"], message.get("params", {}) ) return {"type": "tool_result", "data": result} elif message_type == "list_resources": resources = await self.mcp_server.list_resources() return {"type": "resources_list", "data": resources} else: return {"type": "error", "message": "未知消息类型"} async def broadcast_message(self, message: dict): """广播消息到所有连接""" message_text = json.dumps(message) for connection in self.active_connections: try: await connection.send_text(message_text) except Exception as e: print(f"发送消息失败: {e}")
6. 与 GraphQL 集成
pythonimport strawberry from strawberry.types import Info from mcp.server import Server @strawberry.type class MCPTool: name: str description: str @strawberry.type class MCPResource: uri: str name: str description: str @strawberry.type class Query: @strawberry.field async def tools(self, info: Info) -> list[MCPTool]: """获取所有工具""" mcp_server = info.context["mcp_server"] tools = await mcp_server.list_tools() return [ MCPTool( name=tool["name"], description=tool["description"] ) for tool in tools ] @strawberry.field async def resources(self, info: Info) -> list[MCPResource]: """获取所有资源""" mcp_server = info.context["mcp_server"] resources = await mcp_server.list_resources() return [ MCPResource( uri=resource["uri"], name=resource["name"], description=resource["description"] ) for resource in resources ] @strawberry.type class Mutation: @strawberry.mutation async def call_tool( self, tool_name: str, params: dict, info: Info ) -> str: """调用工具""" mcp_server = info.context["mcp_server"] result = await mcp_server.call_tool(tool_name, params) return str(result) class MCPGraphQLIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.schema = strawberry.Schema( query=Query, mutation=Mutation ) async def execute_query(self, query: str, variables: dict = None): """执行 GraphQL 查询""" context = {"mcp_server": self.mcp_server} result = await self.schema.execute( query, variable_values=variables, context_value=context ) if result.errors: return { "errors": [str(error) for error in result.errors] } return {"data": result.data}
7. 与 gRPC 集成
pythonimport grpc from concurrent import futures import mcp_pb2 import mcp_pb2_grpc from mcp.server import Server class MCPServicer(mcp_pb2_grpc.MCPServicer): def __init__(self, mcp_server: Server): self.mcp_server = mcp_server async def ListTools( self, request: mcp_pb2.ListToolsRequest, context: grpc.ServicerContext ) -> mcp_pb2.ListToolsResponse: """列出工具""" tools = await self.mcp_server.list_tools() tool_protos = [ mcp_pb2.Tool( name=tool["name"], description=tool["description"] ) for tool in tools ] return mcp_pb2.ListToolsResponse(tools=tool_protos) async def CallTool( self, request: mcp_pb2.CallToolRequest, context: grpc.ServicerContext ) -> mcp_pb2.CallToolResponse: """调用工具""" params = dict(request.params) result = await self.mcp_server.call_tool( request.tool_name, params ) return mcp_pb2.CallToolResponse(result=str(result)) class MCPGRPCIntegration: def __init__(self, mcp_server: Server, port: int = 50051): self.mcp_server = mcp_server self.port = port self.server = None async def start_server(self): """启动 gRPC 服务器""" self.server = grpc.aio.server( futures.ThreadPoolExecutor(max_workers=10) ) mcp_pb2_grpc.add_MCPServicer_to_server( MCPServicer(self.mcp_server), self.server ) self.server.add_insecure_port(f'[::]:{self.port}') await self.server.start() print(f"gRPC 服务器启动在端口 {self.port}") async def stop_server(self): """停止 gRPC 服务器""" if self.server: await self.server.stop(0) print("gRPC 服务器已停止")
最佳实践:
- 异步处理:使用异步编程避免阻塞
- 错误处理:正确处理集成过程中的错误
- 性能优化:缓存频繁调用的结果
- 日志记录:记录所有集成操作
- 测试覆盖:编写集成测试确保功能正常
- 文档完善:提供清晰的集成文档
通过与其他 AI 框架和工具的集成,可以扩展 MCP 的功能和应用场景。