面试题手册

梳理高频技术问题,帮助你按主题复习和查漏补缺。

服务端阅读 05月30日 20:38

MCP 错误处理和重试机制应该怎么实现?

MCP 的错误处理和重试机制,关键不是“失败就再试一次”,而是先判断失败能不能重试。参数校验失败、权限不足、工具不存在这类错误重试没有意义;网络抖动、上游 5xx、限流、临时超时才适合重试。把错误分类、超时、退避、熔断和降级放在一起设计,MCP Server 才不会在上游变慢时把自己也拖垮。追问哪些 MCP 错误不应该重试?400 参数错误、401/403 权限错误、工具名不存在、schema 不匹配都不该重试。应该快速返回,并提示调用方修正输入或重新授权。限流错误应该怎么处理?如果上游返回 Retry-After,优先按它等待;没有这个头时再用指数退避。还要给每个用户或会话设置并发上限。熔断和重试是什么关系?重试解决偶发失败,熔断处理持续失败。上游连续超时或 5xx 达到阈值后,应短时间直接失败或走降级。错误日志怎么写才方便排查?每次工具调用都生成 requestId,并在 MCP 响应、应用日志、上游请求日志里贯穿它。敏感参数要脱敏。降级策略有哪些边界?降级只适合可接受旧数据或部分结果的场景。涉及写入、支付、权限变更或不可逆操作时不要自动降级。写段代码async function retry(fn, max = 3) { for (let i = 0; i < max; i++) { try { return await fn(); } catch (e) { if (!e.retryable || i === max - 1) throw e; await new Promise(r => setTimeout(r, Math.min(1000 * 2 ** i, 8000))); } }}
服务端阅读 05月30日 20:38

MCP 数据持久化和缓存策略该怎么设计?

MCP 的持久化和缓存不能只理解成“把数据存起来”。先区分哪些数据必须长期保留,哪些只是为了减少重复计算:工具定义、资源索引、用户会话、授权状态通常需要持久化;工具调用结果、资源快照、schema 解析结果更适合缓存。分清这条线,才不会把缓存当数据库,也不会把数据库拖成临时变量仓库。追问MCP 哪些数据一定要持久化?工具清单、资源元数据、授权状态和会话恢复信息最好持久化,因为它们影响协议行为和用户体验。单次工具调用中间结果不一定落库,除非要审计或断点续跑。Redis 缓存和数据库版本怎么配合?关键记录带 version 或 updatedAt,缓存 key 或 value 里也保存这个版本。读取时发现版本不一致就丢弃缓存,这比单纯依赖 TTL 更可靠。持久化会带来哪些安全边界?不要把 access token、用户输入的敏感资源和工具返回原文无脑落库。确实要保存时,应加密、设置保留周期,并在日志里只记录 request id。缓存最常见的坑是什么?权限变化后缓存没失效,可能导致用户还能看到旧资源;失败结果缓存太久,也会让故障恢复后仍然返回错误。单机和多实例有什么取舍?单机可以用 SQLite 加内存缓存;多实例必须把会话和缓存外置到 Redis 或数据库,否则请求打到不同实例时会状态丢失。写段代码async function getToolSchema(name: string) { const key = `mcp:schema:${name}`; const cached = await redis.get(key); if (cached) return JSON.parse(cached); const schema = await store.loadToolSchema(name); await redis.set(key, JSON.stringify(schema), { EX: 300 }); return schema;}
服务端阅读 05月28日 06:55

如何在 MCP 中实现会话管理和上下文维护?

MCP 的会话管理和上下文维护直接影响多轮对话的连贯性和服务端的可扩展性。面试中这道题考察的是你对 MCP 协议层的理解深度,而不只是写一个内存字典。核心答案MCP 会话管理分三个层级:无状态会话(每次请求携带完整上下文)、服务端托管会话(服务器维护状态,客户端持 session ID)、客户端托管会话(上下文存储在客户端,随请求发出)。选择哪种取决于你的部署架构——单机用服务端托管最简单,K8s 水平扩展优先无状态或客户端托管。上下文维护的关键在于:会话生命周期管理(创建/恢复/迁移/销毁)、上下文窗口的裁剪与摘要、以及状态转换的合法性校验。下面逐个拆解。MCP 协议层的会话机制MCP 基于 JSON-RPC 2.0 通信,传输层支持 Stdio(本地进程)和 HTTP+SSE(远程服务)。每个 client-server 连接是一对一的,客户端负责会话的完整生命周期——包括超时、重连和关闭。2025 年引入 Streamable HTTP 后,MCP 服务可以部署为远程服务,但有状态会话在水平扩展时暴露了问题:服务端内存中的 session 导致负载均衡必须使用 sticky routing,跨 Pod 部署需要 Redis 做会话映射。这也是 2026 MCP 规范重点解决的架构问题。会话创建与生命周期import uuidimport jsonimport osfrom datetime import datetime, timedeltafrom typing import Dict, List, Optional, Anyfrom enum import Enumfrom collections import defaultdictclass SessionState(Enum): ACTIVE = "active" IDLE = "idle" SUSPENDED = "suspended" CLOSED = "closed"class Session: def __init__(self, session_id: str, initial_context: dict = None): self.session_id = session_id self.context = initial_context or {} self.state = SessionState.ACTIVE self.created_at = datetime.now() self.last_activity = datetime.now() self.message_history: List[dict] = [] self.metadata: dict = {} def to_dict(self) -> dict: return { "session_id": self.session_id, "context": self.context, "state": self.state.value, "created_at": self.created_at.isoformat(), "last_activity": self.last_activity.isoformat(), "message_history": self.message_history, "metadata": self.metadata, } @classmethod def from_dict(cls, data: dict) -> "Session": session = cls(data["session_id"], data.get("context", {})) session.state = SessionState(data["state"]) session.created_at = datetime.fromisoformat(data["created_at"]) session.last_activity = datetime.fromisoformat(data["last_activity"]) session.message_history = data.get("message_history", []) session.metadata = data.get("metadata", {}) return sessionclass SessionManager: def __init__(self, timeout_seconds: int = 3600): self.sessions: Dict[str, Session] = {} self.session_timeout = timeout_seconds def create_session(self, initial_context: dict = None) -> str: session_id = str(uuid.uuid4()) self.sessions[session_id] = Session(session_id, initial_context) return session_id def get_session(self, session_id: str) -> Optional[Session]: session = self.sessions.get(session_id) if not session: return None if self._is_expired(session): self.close_session(session_id) return None return session def update_session(self, session_id: str, **updates) -> None: session = self.get_session(session_id) if not session: raise ValueError(f"Session {session_id} does not exist or expired") for key, value in updates.items(): if hasattr(session, key): setattr(session, key, value) session.last_activity = datetime.now() def close_session(self, session_id: str) -> None: session = self.sessions.pop(session_id, None) if session: session.state = SessionState.CLOSED def _is_expired(self, session: Session) -> bool: elapsed = (datetime.now() - session.last_activity).total_seconds() return elapsed > self.session_timeout def cleanup_expired(self) -> int: expired_ids = [ sid for sid, s in self.sessions.items() if self._is_expired(s) ] for sid in expired_ids: self.close_session(sid) return len(expired_ids)这里和简单字典实现的关键区别:Session 是独立实体,支持序列化/反序列化,为后续持久化和迁移打基础。cleanup_expired 是生产环境必须的定时清理逻辑。上下文管理器class ContextManager: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager def set_context(self, session_id: str, key: str, value: Any) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context[key] = value session.last_activity = datetime.now() def get_context(self, session_id: str, key: str, default: Any = None) -> Any: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") return session.context.get(key, default) def update_context(self, session_id: str, updates: dict) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context.update(updates) session.last_activity = datetime.now() def clear_context(self, session_id: str) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context = {} session.last_activity = datetime.now() def get_context_size(self, session_id: str) -> int: session = self.session_manager.get_session(session_id) if not session: return 0 return len(json.dumps(session.context, ensure_ascii=False).encode("utf-8")) def trim_context(self, session_id: str, max_bytes: int = 8192) -> dict: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") trimmed = {} current_size = 0 for key in reversed(list(session.context.keys())): value_json = json.dumps({key: session.context[key]}, ensure_ascii=False) entry_size = len(value_json.encode("utf-8")) if current_size + entry_size > max_bytes: break trimmed[key] = session.context[key] current_size += entry_size removed_keys = set(session.context.keys()) - set(trimmed.keys()) session.context = trimmed session.last_activity = datetime.now() return {"removed_keys": list(removed_keys), "remaining_bytes": current_size}trim_context 是上下文维护中容易被忽略但生产必须的逻辑——LLM 的上下文窗口有限,当上下文膨胀时需要裁剪。简单的 LRU 策略优先丢弃最旧的数据。消息历史管理class MessageHistoryManager: def __init__(self, session_manager: SessionManager, max_history: int = 100): self.session_manager = session_manager self.max_history = max_history def add_message( self, session_id: str, role: str, content: str, metadata: dict = None ) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") message = { "role": role, "content": content, "timestamp": datetime.now().isoformat(), "metadata": metadata or {}, } session.message_history.append(message) if len(session.message_history) > self.max_history: session.message_history = session.message_history[-self.max_history:] session.last_activity = datetime.now() def get_history(self, session_id: str, limit: int = None) -> List[dict]: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") history = session.message_history return history[-limit:] if limit else history def get_conversation_summary(self, session_id: str) -> dict: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") history = session.message_history return { "total_messages": len(history), "user_messages": sum(1 for m in history if m["role"] == "user"), "assistant_messages": sum(1 for m in history if m["role"] == "assistant"), "first_message_time": history[0]["timestamp"] if history else None, "last_message_time": history[-1]["timestamp"] if history else None, } def summarize_history(self, session_id: str, keep_recent: int = 10) -> str: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") if len(session.message_history) <= keep_recent: return "" old_messages = session.message_history[:-keep_recent] summary_parts = [] for msg in old_messages: summary_parts.append(f"[{msg['role']}]: {msg['content'][:100]}") summary = "\n".join(summary_parts) session.message_history = [ { "role": "system", "content": f"Earlier conversation summary:\n{summary}", "timestamp": old_messages[-1]["timestamp"], "metadata": {"type": "summary"}, } ] + session.message_history[-keep_recent:] session.last_activity = datetime.now() return summarysummarize_history 解决的是长对话场景下的上下文窗口溢出问题。保留最近消息的同时将早期内容压缩为摘要,这是实际 MCP 应用中常见的需求——特别是在对话持续几十轮的情况下。状态机管理class StateMachine: TRANSITIONS = { SessionState.ACTIVE: [SessionState.IDLE, SessionState.SUSPENDED, SessionState.CLOSED], SessionState.IDLE: [SessionState.ACTIVE, SessionState.CLOSED], SessionState.SUSPENDED: [SessionState.ACTIVE, SessionState.CLOSED], SessionState.CLOSED: [], } def __init__(self, session_manager: SessionManager): self.session_manager = session_manager def transition(self, session_id: str, new_state: SessionState) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") current = session.state if new_state not in self.TRANSITIONS.get(current, []): raise ValueError( f"Invalid transition: {current.value} -> {new_state.value}" ) session.state = new_state session.last_activity = datetime.now() def get_state(self, session_id: str) -> SessionState: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") return session.state def can_transition(self, session_id: str, new_state: SessionState) -> bool: session = self.session_manager.get_session(session_id) if not session: return False return new_state in self.TRANSITIONS.get(session.state, [])状态机保证会话不会从 CLOSED 状态复活,也不会跳过中间状态直接 SUSPENDED。这在多实例部署时尤其重要——状态转换必须是原子的、可审计的。会话持久化内存存储只适合开发和单机部署。生产环境必须持久化,否则进程重启所有会话丢失。class SessionPersistence: def __init__(self, storage_path: str): self.storage_path = storage_path os.makedirs(storage_path, exist_ok=True) def save_session(self, session: Session) -> None: file_path = os.path.join(self.storage_path, f"{session.session_id}.json") with open(file_path, "w", encoding="utf-8") as f: json.dump(session.to_dict(), f, indent=2, ensure_ascii=False) def load_session(self, session_id: str) -> Optional[Session]: file_path = os.path.join(self.storage_path, f"{session_id}.json") if not os.path.exists(file_path): return None with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) return Session.from_dict(data) def delete_session(self, session_id: str) -> None: file_path = os.path.join(self.storage_path, f"{session_id}.json") if os.path.exists(file_path): os.remove(file_path) def list_sessions(self) -> List[str]: return [ f[:-5] for f in os.listdir(self.storage_path) if f.endswith(".json") ]class RedisSessionPersistence: def __init__(self, redis_url: str = "redis://localhost:6379", key_prefix: str = "mcp:session:"): import redis self.client = redis.from_url(redis_url) self.key_prefix = key_prefix self.default_ttl = 3600 def save_session(self, session: Session, ttl: int = None) -> None: key = f"{self.key_prefix}{session.session_id}" data = json.dumps(session.to_dict(), ensure_ascii=False) self.client.setex(key, ttl or self.default_ttl, data) def load_session(self, session_id: str) -> Optional[Session]: key = f"{self.key_prefix}{session_id}" data = self.client.get(key) if not data: return None return Session.from_dict(json.loads(data)) def delete_session(self, session_id: str) -> None: key = f"{self.key_prefix}{session_id}" self.client.delete(key) def extend_ttl(self, session_id: str, ttl: int = None) -> None: key = f"{self.key_prefix}{session_id}" self.client.expire(key, ttl or self.default_ttl)Redis 方案解决的是水平扩展场景:多个 MCP 服务实例共享同一个 Redis,任何实例都能读取任意会话的状态。TTL 自动过期比定时清理更可靠。extend_ttl 在用户活跃时续期,实现"空闲超时"而非"绝对超时"。会话监控与指标class SessionAnalytics: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager self.metrics: Dict[str, int] = defaultdict(int) self.event_log: List[dict] = [] def track_event(self, event_type: str, session_id: str = None, detail: str = None) -> None: self.metrics[event_type] += 1 self.event_log.append({ "event_type": event_type, "session_id": session_id, "detail": detail, "timestamp": datetime.now().isoformat(), }) def get_metrics(self) -> dict: return dict(self.metrics) def get_session_statistics(self) -> dict: sessions = self.session_manager.sessions if not sessions: return { "total_sessions": 0, "active_sessions": 0, "idle_sessions": 0, "average_duration_seconds": 0.0, "total_messages": 0, } durations = [] active_count = 0 idle_count = 0 total_messages = 0 for session in sessions.values(): duration = (session.last_activity - session.created_at).total_seconds() durations.append(duration) total_messages += len(session.message_history) if session.state == SessionState.ACTIVE: active_count += 1 elif session.state == SessionState.IDLE: idle_count += 1 return { "total_sessions": len(sessions), "active_sessions": active_count, "idle_sessions": idle_count, "average_duration_seconds": sum(durations) / len(durations), "total_messages": total_messages, } def get_recent_events(self, limit: int = 50) -> List[dict]: return self.event_log[-limit:]监控不只是统计数字。event_log 记录每一次状态变更,可以用来排查"会话为什么丢失"这类线上问题。生产部署的三种会话模型2026 MCP 规范明确提出了三种会话模型,选择取决于架构:无状态模型:每次请求客户端发送完整上下文。负载均衡最友好,任何实例都能处理任何请求。缺点是网络开销大,上下文窗口大时不实际。服务端托管模型:服务端维护状态,客户端只持 session ID。实现最简单,但水平扩展需要 sticky routing 或共享存储(Redis)。适合中小规模部署。客户端托管模型:上下文存在客户端,随请求发出。结合无状态服务端,两者优势兼得——服务端无状态可水平扩展,客户端有完整控制权。缺点是客户端逻辑更复杂。实际选择建议:如果 MCP 服务只部署 1-2 个实例,服务端托管 + 文件持久化足够。如果需要 K8s 弹性伸缩,客户端托管或无状态模型更合适。Redis 持久化是折中方案——服务端托管但状态外置到 Redis。追问:MCP 会话和普通 HTTP Session 有什么区别?MCP 会话不是 HTTP Session。HTTP Session 基于浏览器 Cookie 机制,而 MCP 会话是 JSON-RPC 层面的连接生命周期管理。MCP 客户端在一个会话内可以发起多次请求/通知,会话 ID 关联的是对话上下文而非浏览器状态。此外,MCP 会话需要维护消息历史用于 LLM 的上下文窗口,这是普通 HTTP Session 不需要考虑的。
服务端阅读 05月28日 06:54

如何实现一个 MCP 服务器?有哪些最佳实践?

MCP 服务器的核心概念MCP(Model Context Protocol)是一个开放协议,让大语言模型能够通过标准化接口连接外部数据源和工具。理解服务器实现之前,需要先厘清三个核心概念:Tool(工具):服务器暴露给客户端的可调用函数,接受结构化参数并返回结果Resource(资源):服务器提供的可读数据源,客户端可以按 URI 访问Prompt(提示模板):服务器预设的提示词模板,客户端可以复用服务器与客户端之间通过 Capability 协商 建立连接——客户端在初始化时声明自己支持哪些能力,服务器也声明自己提供哪些能力,双方只使用共同支持的功能。选择传输方式MCP 支持两种传输协议,选择哪种直接影响部署架构:Stdio 传输:客户端以子进程方式启动服务器,通过标准输入/输出通信。适合本地工具集成,延迟低,无需处理网络和认证问题。Claude Desktop 和 Cursor 的本地服务器都走这条路径。Streamable HTTP 传输:服务器以独立 HTTP 服务运行,客户端通过 POST 请求通信。适合远程部署、多客户端共享、需要认证的场景。注意,旧版基于 SSE 的 HTTP 传输已被废弃,新项目应直接使用 Streamable HTTP。# Stdio 模式启动mcp.run(transport="stdio")# HTTP 模式启动mcp.run(transport="streamable-http", host="0.0.0.0", port=8080)用 Python 实现一个 MCP 服务器Python SDK(mcp 包)提供了 FastMCP 高阶 API,可以快速搭建服务器。安装依赖pip install "mcp[cli]"定义工具工具是 MCP 服务器的核心能力。每个工具需要声明名称、描述和参数 Schema:from mcp.server.fastmcp import FastMCPmcp = FastMCP("my-server")@mcp.tool()def search_docs(query: str, limit: int = 5) -> str: """在文档库中搜索相关内容,返回匹配结果摘要""" # 实际场景中对接搜索引擎或向量数据库 results = do_search(query, limit) return format_results(results)关键要点:工具函数的 docstring 会成为客户端看到的工具描述,直接影响模型能否正确调用,务必写得准确具体。参数类型注解会自动转换为 JSON Schema。定义资源资源用于暴露可读数据,客户端通过 URI 主动拉取:@mcp.resource("config://app/settings")def get_settings() -> str: """返回当前应用配置""" return json.dumps(load_settings(), ensure_ascii=False)@mcp.resource("data://users/{user_id}/profile")def get_user_profile(user_id: str) -> str: """按用户 ID 返回用户资料""" profile = fetch_profile(user_id) return json.dumps(profile, ensure_ascii=False)URI 模板支持路径参数(如 {user_id}),SDK 会自动解析并传入函数。定义提示模板提示模板是预设的 Prompt,客户端可以按名称获取:@mcp.prompt()def code_review(code: str) -> str: """代码审查提示模板""" return f"请审查以下代码,关注安全性和性能问题:\n\n```\n{code}\n```"启动服务器if __name__ == "__main__": mcp.run()mcp run 命令默认使用 stdio 传输。也可通过 CLI 指定:mcp run server.py --transport streamable-http用 TypeScript 实现一个 MCP 服务器TypeScript SDK 是官方维护的首选 SDK,生态最完整:import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";import { z } from "zod";const server = new McpServer({ name: "my-server", version: "1.0.0",});server.tool( "search_docs", "在文档库中搜索相关内容",{ query: z.string(), limit: z.number().default(5) },async ({ query, limit }) => { const results = await doSearch(query, limit); return { content: [{ type: "text", text: formatResults(results) }] }; });const transport = new StdioServerTransport();await server.connect(transport);TypeScript SDK 使用 zod 定义参数 Schema,类型安全且自动生成 JSON Schema。生产环境的最佳实践错误处理不要让异常穿透到客户端变成不友好的报错。在每个工具函数中做防御性处理:@mcp.tool()def query_database(sql: str) -> str: """执行只读 SQL 查询""" try: if not sql.strip().upper().startswith("SELECT"): return "错误:仅允许 SELECT 查询" result = execute_readonly_query(sql) return json.dumps(result, ensure_ascii=False) except ConnectionError: return "错误:数据库连接失败,请稍后重试" except Exception as e: return f"查询执行出错:{type(e).__name__}"注意上面捕获了具体异常类型,对用户屏蔽了内部实现细节。输入验证永远不要信任客户端传入的参数,即使模型通常不会构造恶意输入:@mcp.tool()def run_command(cmd: str) -> str: """执行预定义的系统命令(仅允许白名单命令)""" # 不要直接拼接执行用户输入! allowed = {"list_files", "check_status", "get_version"} if cmd not in allowed: return f"错误:不支持的命令,允许的命令:{', '.join(allowed)}" return execute_safe(cmd)认证与安全远程部署的 MCP 服务器必须实现认证。2026 协议推荐使用 OAuth 2.1:为每个客户端颁发独立的 access token,绑定最小权限 scope工具级别的权限控制:不同 scope 对应不同的工具访问权限传输层强制 HTTPS,禁止明文 HTTP本地 stdio 服务器依赖操作系统用户隔离,不需要额外认证异步与性能工具函数如果涉及 I/O 操作(网络请求、文件读写、数据库查询),应该使用异步实现:@mcp.tool()async def fetch_url(url: str) -> str: """获取 URL 内容""" async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(url) return resp.text[:5000] # 限制返回长度其他性能策略:对频繁调用的工具实现结果缓存,避免重复计算为耗时操作设置超时,防止客户端无限等待大量数据分页返回,不要一次吐出全部内容日志与监控生产服务器必须有可观测性:import logginglogger = logging.getLogger("mcp-server")@mcp.tool()def search_docs(query: str) -> str: logger.info(f"search_docs called: query={query!r}") result = do_search(query) logger.info(f"search_docs returned {len(result)} results") return result建议记录:每次工具调用的入参摘要、执行耗时、异常信息。远程服务器还应实现健康检查端点供负载均衡器探活。常见陷阱在工具中执行任意代码:绝不能用 eval()、exec() 或直接拼接 shell 命令。这是最严重的安全漏洞,恶意输入可以完全控制服务器。忽略 Capability 协商:服务器声明的能力必须与实际实现一致。声明了 tools capability 却没有注册任何工具,或者工具描述与实际行为不符,都会导致客户端调用失败。过度暴露工具:一个服务器注册几十个工具会占用客户端大量上下文窗口。按功能域拆分成多个独立服务器,每个只暴露相关工具,客户端按需连接。同步阻塞:在异步框架中使用同步的阻塞 I/O 会拖慢整个服务器。使用 asyncio.to_thread 将同步调用包装为异步,或直接使用异步库。面试追问Q:MCP 的 Stdio 传输和 HTTP 传输分别适合什么场景?Stdio 适合本地单客户端场景,客户端以子进程方式启动服务器,零配置、低延迟、无需认证。HTTP 适合远程部署、多客户端共享、需要认证和网关管理的生产环境。2026 协议推荐新项目使用 Streamable HTTP 替代已废弃的 SSE 传输。Q:如何在 MCP 服务器中防止工具被滥用?三层防线:输入验证拒绝非法参数,OAuth scope 控制工具访问权限,速率限制防止高频调用。核心原则是永远不信任客户端输入,白名单优于黑名单,最小权限优于便利性。Q:一个 MCP 服务器应该注册多少个工具?建议控制在 10 个以内。工具过多会占用客户端上下文窗口,影响模型选择准确性。按功能域拆分多个服务器,客户端按需连接,比一个臃肿服务器更易维护。
服务端阅读 05月28日 06:51

MCP性能优化有哪些核心策略?从协议层到工程实践的完整方案

MCP 的性能瓶颈在哪里?优化之前,先定位瓶颈。MCP(Model Context Protocol)基于 JSON-RPC 2.0 协议通信,性能问题主要集中在三个层面:通信延迟:每次工具调用都是一次完整的请求-响应周期,批量场景下延迟叠加严重序列化开销:JSON 编解码在高吞吐场景下消耗 15%-20% 的 CPU 时间,远高于 FlatBuffers 等二进制方案上下文膨胀:随着对话轮次增加,上下文窗口不断膨胀,传输和解析成本线性增长实测数据:在单节点 5000 QPS 场景下,JSON-RPC 2.0 的序列化开销比 Cap'n Proto 高约 18%,这对延迟敏感型应用影响显著。协议层:减少通信开销批量工具调用MCP 支持在单次请求中发起多个工具调用,减少网络往返:{ "jsonrpc": "2.0", "method": "tools/call", "params": { "calls": [ {"name": "read_file", "arguments": {"path": "/config.yaml"}}, {"name": "list_directory", "arguments": {"path": "/data"}} ] }}批量调用可将 5 次独立请求的延迟从 ~500ms 压缩到 ~120ms(假设单次 RTT 100ms)。二进制序列化替代MCP 社区正在推进 Transport Evolution 工作组,探索 JSON-RPC 2.0 的二进制扩展。在生产环境中,可以引入中间层将 JSON 序列化替换为 MessagePack 或 Protocol Buffers:import msgpackdef serialize_mcp_message(msg: dict) -> bytes: """用 MessagePack 替代 JSON 序列化,减少 40%-60% 体积""" return msgpack.packb(msg, use_bin_type=True)def deserialize_mcp_message(data: bytes) -> dict: return msgpack.unpackb(data, raw=False)连接复用与流式响应优先使用 HTTP/2 或 SSE(Server-Sent Events)传输层,避免反复建立 TCP 连接。对长时间运行的工具调用,使用流式响应配合 progressToken 逐步返回结果,而不是阻塞等待完整输出。缓存层:避免重复计算结果缓存对幂等性工具调用(如读取配置文件、查询静态数据)实施结果缓存。LRU 缓存在工具调用场景下命中率通常可达 60% 以上:from functools import lru_cachefrom typing import Anyclass ToolCache: def __init__(self, maxsize: int = 512): self._cache = lru_cache(maxsize=maxsize) @lru_cache(maxsize=512) def get_tool_result(self, tool_name: str, args_key: str) -> Any: """缓存工具执行结果,args_key 为参数的稳定哈希""" return self._execute_tool(tool_name, args_key)元数据缓存工具列表(tools/list)和资源描述(resources/list)变化频率低,应在客户端侧缓存,避免每次会话重新拉取。MCP Server Cards(.well-known URL)可进一步加速服务发现,在 100+ 服务器环境中降低 30% 初始连接延迟。智能失效策略缓存不是永久的。结合 TTL 和事件驱动两种失效机制:短周期数据用 TTL(如 60s),配置变更通过 notifications/resources/updated 主动推送失效。并发层:提升吞吐量异步 I/O 模型MCP Server 应采用异步编程模型。Python 推荐 asyncio,Node.js 天然异步,Go 用 goroutine:import asyncioasync def handle_batch_calls(calls: list[dict]) -> list[dict]: """并行执行独立的工具调用""" tasks = [execute_single_call(call) for call in calls] results = await asyncio.gather(*tasks, return_exceptions=True) return [ r if not isinstance(r, Exception) else {"error": str(r)} for r in results ]无锁并发控制高并发场景下,锁竞争会成为瓶颈。优先使用无锁数据结构(如 asyncio.Queue)或乐观并发控制,避免线程/协程间的阻塞等待。C++ 实现可考虑 lock-free queue 或 RCU(Read-Copy-Update)模式。连接池管理对外部资源(数据库、HTTP API)建立连接池,设置合理的 min_size 和 max_size。连接池耗尽时排队等待而非无限创建新连接,防止资源泄漏拖垮整个服务。上下文管理:控制膨胀上下文膨胀是 MCP 特有的性能问题。随着对话推进,发送给模型的上下文越来越大,直接影响 token 计费和响应延迟。上下文裁剪策略滑动窗口:只保留最近 N 轮对话,丢弃早期历史摘要压缩:对超过阈值的早期对话生成摘要,替代原文传入相关性过滤:根据当前查询,只检索相关的上下文片段(RAG 思路)def prune_context(messages: list[dict], max_tokens: int = 4000) -> list[dict]: """滑动窗口 + 摘要压缩""" total = sum(estimate_tokens(m["content"]) for m in messages) if total <= max_tokens: return messages # 保留最近 3 轮,对更早的内容生成摘要 recent = messages[-6:] # 最近 3 轮(user+assistant 各1) older = messages[:-6] summary = generate_summary(older) return [{"role": "system", "content": f"历史摘要:{summary}"}] + recent资源按需加载不要在初始化时加载全部资源。使用 resources/read 按需获取,配合 URI 模板(如 file:///data/{category}/{id})实现懒加载。监控与调优:数据驱动关键指标| 指标 | 告警阈值 | 说明 ||------|----------|------|| 工具调用 P99 延迟 | > 500ms | 单次调用耗时过长 || 序列化耗时占比 | > 15% | 需要考虑二进制替代 || 上下文 token 数 | > 8000 | 膨胀严重,需裁剪 || 缓存命中率 | 1% | 排查工具实现问题 |基准测试优化前后都要跑基准测试。推荐使用 locust 或 k6 模拟并发工具调用场景,记录 QPS、P50/P95/P99 延迟的变化。APM 集成接入 OpenTelemetry 或 SkyWalking,对工具调用的完整链路进行追踪。重点关注:从 MCP 客户端发出请求到收到响应的全链路耗时分解。生产部署优化水平扩展MCP Server 设计为无状态,支持水平扩展。在 Kubernetes 中配置 HPA(Horizontal Pod Autoscaler),基于 CPU 使用率或自定义指标(如请求队列深度)自动扩缩容。健康检查与故障隔离实现 /health 端点,配合负载均衡器的健康检查自动剔除异常实例。设置合理的超时时间,避免慢请求阻塞整个连接池。区域部署对延迟敏感的场景,在多个地理区域部署 MCP Server 实例,客户端就近接入。MCP Gateway 可统一入口,内部路由到最近的实例。性能优化不是一次性工程。从定位瓶颈开始,依次优化协议层、缓存层、并发层和上下文管理,用监控数据验证每一步的效果,才能持续保持 MCP 系统在高负载下的稳定响应。
服务端阅读 05月28日 06:51

MCP 协议的核心架构包含哪些关键组件?

MCP 协议的三层架构MCP(Model Context Protocol)采用传输层、会话层、能力层三层解耦设计,各层职责独立、协同工作,保障协议的兼容性与扩展性。传输层负责底层通信,支持 Stdio、HTTP Streamable、WebSocket 等多种传输方式。Stdio 适用于本地进程间通信,HTTP Streamable 是 2026 年规范升级后的推荐方式,解决了早期长连接不稳定的问题。会话层负责连接鉴权与心跳维护。2026 年 3 月规范更新后,Auth 认证机制从草案进入正式版,支持 OAuth 2.0 标准流程,取代了此前被广泛诟病的明文密码方案。能力层负责工具声明、调用和事件通知,是协议的核心业务层。三大核心角色:Host、Client、ServerMCP 的运行时架构由三个角色组成:Host(主机):发起连接的 LLM 应用程序,如 Claude Desktop、VS Code 中的 AI 扩展。一个 Host 可以管理多个 Client。Client(客户端):在 Host 内部运行,与 Server 保持 1:1 连接,负责将请求转换为结构化的 JSON-RPC 2.0 消息,管理会话生命周期(超时、重连、中断)。Server(服务器):向外暴露能力,提供工具、资源和提示。Server 可以连接数据库、文件系统、API 等外部服务。三者关系:Host 包含多个 Client,每个 Client 连接一个 Server,形成星形拓扑。核心原语:Tools、Resources、Prompts能力层定义了三种原语,是 Server 向 Client 暴露能力的唯一方式:Tools(工具)——可执行的操作,类似定义明确的函数。每个工具包含名称、描述和 JSON Schema 定义的参数。LLM 根据工具描述自主决定何时调用。{ "name": "query_database", "description": "执行 SQL 查询并返回结果", "inputSchema": { "type": "object", "properties": { "sql": { "type": "string", "description": "SQL 查询语句" } }, "required": ["sql"] }}Resources(资源)——可读取的上下文对象,如文件内容、数据库记录、日志条目。Client 主动拉取,Server 不主动推送。Prompts(提示模板)——结构化的工作流模板,预设工具调用序列和参数,引导 LLM 按特定步骤完成任务。核心工作流程完整的 MCP 交互分为四步:工具注册:Server 启动后声明自身提供的工具、资源和提示,包含参数定义与权限信息。连接鉴权:Client 与 Server 建立连接,完成身份校验(OAuth 2.0 流程)。能力发现:Client 获取 Server 支持的工具清单与调用规则,LLM 据此决定可调用哪些工具。工具调用与结果返回:LLM 发出工具调用指令,Server 执行后将结果通过 JSON-RPC 响应返回 Client。MCP 协议的核心优势MCP 相比自定义工具接口的差异化价值在于:标准化交互:统一的 JSON-RPC 2.0 消息格式,无需为每个工具单独开发适配逻辑。跨框架兼容:支持 LangChain、CrewAI 等主流 Agent 框架互通,一套 Server 实现多框架接入。动态能力发现:工具即插即用,Server 上线后 Client 自动感知新增能力。内置安全机制:OAuth 2.0 鉴权 + 加密传输 + 权限粒度控制。2026 年 4 月,Meta、Docker 相继宣布支持 MCP,Linux Foundation 成立 AAIF(AI Agent Interoperability Foundation)接管协议规范,MCP 正在成为 AI Agent 互联互通的事实标准。追问:MCP 与 Function Calling 有什么区别?Function Calling 是单模型厂商的私有能力,调用格式和参数定义因模型而异,无法跨模型复用。MCP 是开放协议,定义了标准化的能力声明、发现和调用机制,Server 实现一次即可被任何支持 MCP 的 Client 接入。类比来说,Function Calling 像"专车接口",MCP 像"USB 接口"——前者绑定特定实现,后者面向通用适配。
服务端阅读 05月28日 06:51

MCP 流式处理如何实现?Streamable HTTP 传输与进度通知完整指南

MCP(Model Context Protocol)的流式处理能力是远程部署和实时交互的核心基础。2025 年 3 月规范引入 Streamable HTTP 传输后,MCP 的流式架构发生了根本性变化。本文将基于最新规范,从传输层到应用层,系统讲解 MCP 流式处理的实现方法。MCP 流式处理的架构基础MCP 定义了两种标准传输方式:| 传输方式 | 适用场景 | 连接模式 ||---------|---------|---------|| stdio | 本地通信(同机器) | 标准输入输出管道 || Streamable HTTP | 远程通信(跨网络) | 单端点 HTTP + 按需 SSE |Streamable HTTP 的核心设计是单一端点、按需流式:所有通信通过一个 HTTP 端点(如 /mcp)完成,服务器可以灵活选择返回普通 JSON 响应还是升级为 SSE 流式响应。Streamable HTTP 传输实现端点设计Streamable HTTP 使用单一端点同时支持 POST 和 GET 请求:POST /mcp:客户端发送 JSON-RPC 消息,服务器根据请求内容决定响应方式GET /mcp:客户端建立 SSE 监听流,接收服务器主动推送的通知客户端请求格式客户端发起请求时,必须在 Accept 头中同时声明支持 JSON 和 SSE:POST /mcp HTTP/1.1Content-Type: application/jsonAccept: application/json, text/event-streamMcp-Session-Id: session-abc123{ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "analyze_data", "arguments": {"dataset": "large_corpus"} }}注意:2026 规范要求请求携带 Mcp-Method 和 Mcp-Name 头部,便于负载均衡器和网关进行路由(SEP-2243)。服务器响应策略服务器根据请求特性动态选择响应方式:场景一:普通 JSON 响应(短操作,立即返回结果)HTTP/1.1 200 OKContent-Type: application/json{ "jsonrpc": "2.0", "id": 1, "result": { "content": [{"type": "text", "text": "分析完成:共 1,024 条记录"}] }}场景二:SSE 流式响应(长操作,需要进度反馈)HTTP/1.1 200 OKContent-Type: text/event-streamevent: messagedata: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"op-123","progress":25,"total":100}}event: messagedata: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"op-123","progress":75,"total":100}}event: messagedata: {"jsonrpc":"2.0","id":1,"result":{"content":[{"type":"text","text":"分析完成:共 1,024 条记录"}]}}Python SDK 流式工具实现安装与初始化pip install mcp带进度通知的流式工具MCP Python SDK 通过 request_context 提供进度通知能力:from mcp.server import Serverfrom mcp import typesimport asyncioserver = Server("stream-demo")@server.call_tool()async def analyze_large_dataset( name: str, arguments: dict) -> list[types.TextContent]: context = server.request_context progress_token = context.meta.progressToken if context.meta else None dataset = arguments.get("dataset", "") total_steps = 10 results = [] for step in range(total_steps): chunk_result = await process_chunk(dataset, step) results.append(chunk_result) if progress_token: await context.session.send_progress_notification( progress_token=progress_token, progress=step + 1, total=total_steps, ) await asyncio.sleep(0.1) summary = f"分析完成:处理了 {len(results)} 个数据块" return [types.TextContent(type="text", text=summary)]async def process_chunk(dataset: str, step: int) -> dict: return {"step": step, "status": "done"}客户端接收进度通知客户端通过 progress_callback 参数接收进度更新:from mcp import ClientSession, StdioServerParametersfrom mcp.client.stdio import stdio_clientasync def call_with_progress(): server_params = StdioServerParameters( command="python", args=["-m", "stream_demo_server"], ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() def on_progress(progress: float, total: float | None): percent = (progress / total * 100) if total else 0 print(f"进度: {percent:.0f}%") result = await session.call_tool( "analyze_large_dataset", arguments={"dataset": "large_corpus"}, progress_callback=on_progress, ) print(f"结果: {result.content[0].text}")服务器主动推送与通知通知类型MCP 定义了多种服务器通知类型,均可通过 SSE 流推送到客户端:| 通知方法 | 用途 | 典型场景 ||---------|------|---------|| notifications/progress | 操作进度 | 长时间运行的任务 || notifications/tools/list_changed | 工具列表变更 | 动态注册/注销工具 || notifications/resources/updated | 资源内容更新 | 文件变化通知 || notifications/message | 日志消息 | 调试和监控 |发送工具列表变更通知@server.call_tool()async def register_dynamic_tool(name: str, arguments: dict) -> list[types.TextContent]: # 注册新工具逻辑... await server.request_context.session.send_notification( types.ServerNotification( method="notifications/tools/list_changed", ) ) return [types.TextContent(type="text", text=f"工具 {arguments['tool_name']} 已注册")]断点续传与连接恢复Streamable HTTP 通过 Last-Event-ID 头部支持 SSE 流的断点续传:GET /mcp HTTP/1.1Accept: text/event-streamLast-Event-ID: evt-42Mcp-Session-Id: session-abc123服务器收到带有 Last-Event-ID 的请求后,应从指定事件之后继续推送,确保客户端不会丢失中间状态的通知。实现要点:每个 SSE 事件应携带唯一 ID服务器需要维护最近的事件缓冲区超出缓冲范围时,建议客户端重新初始化会话与旧版 SSE 传输的对比| 特性 | 旧版 HTTP+SSE | Streamable HTTP ||------|-------------|----------------|| 端点数量 | 2 个(/sse + /messages) | 1 个(/mcp) || 连接方式 | 必须先建立 SSE 长连接 | 按需升级为 SSE || 无状态服务器 | 不支持 | 支持(可部署 Serverless) || 断点续传 | 不支持 | 通过 Last-Event-ID 支持 || 基础设施兼容 | 部分代理/CDN 不兼容 | 标准 HTTP,完全兼容 |旧版 SSE 传输仅建议用于兼容未升级到 2025-03-26 规范的老客户端。最佳实践1. 合理选择响应模式:短操作直接返回 JSON,长操作才升级为 SSE 流。避免对所有请求都建立流式连接。2. 提供进度反馈:耗时超过 1 秒的操作,应通过 notifications/progress 提供进度更新,改善用户体验。3. 实现断点续传:为 SSE 事件分配唯一 ID,维护事件缓冲区,支持客户端从断点恢复。4. 资源清理:及时关闭完成的 SSE 流,避免服务器端资源泄漏。设置合理的超时时间。5. 会话管理:妥善管理 Mcp-Session-Id,确保在负载均衡场景下会话亲和性。6. 监控与日志:通过 notifications/message 发送日志级别消息,便于调试和性能监控。通过合理运用 Streamable HTTP 传输和进度通知机制,可以构建高效、可靠的 MCP 流式处理系统,满足实时交互和大规模数据处理的需求。
服务端阅读 05月28日 06:49

MCP 和 OpenAI Function Calling、LangChain Tools 的本质区别是什么?

MCP 采用 JSON-RPC 2.0 作为底层通信协议,实现了协议与实现的彻底分离。这意味着无论是 Claude、GPT 还是本地模型,只要实现了 MCP 客户端规范,就能连接任意 MCP 服务器——就像 USB-C 统一了充电接口一样,MCP 统一了 AI 与外部工具的交互方式。核心区别:协议 vs API vs 框架三者的本质定位完全不同:MCP 是协议:定义的是通信规范(请求/响应格式、传输方式、能力协商),不依赖任何特定语言或框架,类似于 HTTP 之于 WebOpenAI Function Calling 是 API 能力:OpenAI 模型的一项功能,函数定义嵌入在 API 请求的 tools 参数中,离开 OpenAI API 就无法使用LangChain Tools 是框架抽象:Python/TypeScript 的类定义,工具逻辑在进程内直接调用,适合快速编排但与 LangChain 生态绑定这决定了它们在工具发现、供应商锁定、企业治理三个维度上的根本差异。工具发现:运行时 vs 编译时这是最容易被忽视但影响最大的区别。MCP 的运行时发现:客户端通过 tools/list RPC 调用获取当前可用工具列表,服务器可以在不重启的情况下动态增减工具。对于拥有上百个工具、分属不同团队管理的企业环境,这意味着新工具上线无需重新部署 AI 应用。OpenAI Function Calling 的编译时定义:每次 API 请求都必须在 tools 参数中完整描述所有可用函数。增加一个工具?修改代码、重新部署。工具多了?Token 消耗线性增长。LangChain Tools 的启动时注册:工具以 Python 类注册到 Agent,添加新工具需要修改代码并重启应用。# OpenAI Function Calling — 每次请求都要传完整工具列表response = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "查一下北京天气"}], tools=[{ "type": "function", "function": { "name": "get_weather", "description": "获取指定城市天气", "parameters": { "type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"] } } }])# MCP — 工具由服务器管理,客户端动态发现async with ClientSession(...) as session: tools = await session.list_tools() # 运行时获取 result = await session.call_tool("get_weather", {"city": "北京"})供应商锁定:零依赖 vs 强绑定| 维度 | MCP | OpenAI FC | LangChain Tools ||------|-----|-----------|-----------------|| 模型兼容 | Claude/GPT/Gemini/本地模型均可用 | 仅 OpenAI 模型 | 多模型但需适配 || 切换成本 | 改一行模型配置即可 | 重写工具定义和调用逻辑 | 改 Agent 初始化参数 || 传输协议 | stdio / HTTP+SSE / WebSocket | 仅 HTTPS API | 进程内调用 |MCP 的跨模型能力不是理论上的——到 2026 年初,已有 97M+ 月度 SDK 下载量,300+ 社区构建的 MCP Server 覆盖 GitHub、Slack、PostgreSQL、Stripe 等主流服务。OpenAI 和 Google 均已官方支持 MCP,协议已被捐赠给 Linux 基金会下的 Agentic AI Foundation。企业治理:MCP 的差异化优势MCP 是三者中唯一在设计层面考虑企业级治理的:权限控制:支持 OPA 等策略引擎,可按用户/角色/场景控制工具访问审计追踪:每次工具调用都有完整的请求-响应记录计量计费:内置用量统计,支持按工具调用次数或 Token 消耗计费多租户:同一 MCP Server 可根据客户端身份返回不同的工具列表OpenAI Function Calling 和 LangChain Tools 要实现同等能力,需要自行搭建中间层。性能与延迟工具调用开销本身可忽略,瓶颈始终在 LLM 推理(100ms-10s):LangChain Tools:进程内调用,0ms 额外开销OpenAI FC:本地执行函数,0ms 额外开销MCP:经网关代理,亚毫秒级开销实际场景中这点差异无关紧要,但在超低延迟要求下需注意 MCP 的网络跳数。实际迁移路径很多团队的自然演进路径是:先用 OpenAI Function Calling 或 LangChain 快速验证想法,当面临多模型需求、工具数量增长、或合规审计要求时引入 MCP。好消息是三者并非互斥——LangChain MCP 适配器允许 LangChain Agent 原生调用 MCP 工具,OpenAI 也已支持 MCP 协议,迁移成本远低于预期。面试追问方向Q:MCP 的 JSON-RPC 2.0 为什么不用 REST?REST 是请求-响应模式,MCP 需要双向通信(如服务器主动推送资源更新、进度通知),JSON-RPC 2.0 在 stdio 和 SSE 传输上更自然。Q:MCP 和 Google A2A 协议的关系?A2A(Agent-to-Agent)解决的是 Agent 之间的协作通信,MCP 解决的是 Agent 与工具/数据的连接。两者互补而非竞争,A2A 的 Agent 可以通过 MCP 获取工具能力。Q:已有 OpenAI FC 工具,如何迁移到 MCP?将函数定义转为 MCP Server 的 tools/list 返回值,函数实现包装为 tools/call handler,客户端用 MCP SDK 替换 OpenAI API 调用。核心逻辑无需重写。
服务端阅读 05月28日 06:49

MCP 与传统函数调用机制有什么区别和优势?

什么是 MCP 和传统函数调用MCP(Model Context Protocol)是 Anthropic 于 2024 年底推出的开放协议,旨在标准化 AI 模型与外部工具、数据源之间的交互方式。传统函数调用(Function Calling)则是 OpenAI 在 2023 年引入的机制,让大模型在推理过程中主动调用预定义函数获取结果。表面上看两者都解决"AI 调用外部工具"的问题,但在架构理念、交互模式和工程实践上存在根本差异。通信协议:私有 API vs 开放标准传统函数调用本质上是各家模型厂商的私有 API 约定。OpenAI 有自己的 function calling 格式,Google Gemini 有 functionDeclarations,Anthropic 有 tool_use——格式不兼容,开发者同一套工具逻辑要写多遍适配代码。MCP 则定义了一套基于 JSON-RPC 2.0 的开放协议标准,包含三个核心原语:Tools:模型可调用的函数,类似传统函数调用中的 functionResources:应用向模型暴露的可读数据源,如文件、数据库记录Prompts:预定义的提示模板,可接受动态参数这意味着只要工具端实现一次 MCP Server,任何支持 MCP 的客户端(Claude Desktop、Cursor、ChatGPT、Gemini 等)都能直接调用。2026 年的数据显示,92% 的新发布 Agent 框架已内置 MCP 支持。工具发现:硬编码 vs 动态注册传统函数调用中,工具列表在请求时硬编码传入。模型只能看到开发者预先定义好的函数签名,运行时无法获取新工具,环境变化需要改代码重新部署。MCP 支持运行时动态工具发现。客户端连接 MCP Server 后,通过 tools/list 方法自动获取可用工具清单。当服务端新增工具,客户端无需任何改动即可感知并使用。// 传统方式:工具定义硬编码在请求中const tools = [ { name: "query_database", parameters: { ... } }, { name: "send_email", parameters: { ... } }];response = await openai.chat.completions.create({ model: "gpt-4", messages: messages, tools: tools // 每次请求都带上完整列表});// MCP 方式:客户端自动发现工具const mcpClient = new MCPClient();await mcpClient.connect(new StdioTransport("db-query-server"));const availableTools = await mcpClient.listTools();// 服务端新增工具后,listTools() 自动返回最新列表上下文管理:无状态 vs 有状态传统函数调用是无状态的。每次调用都是独立的请求-响应循环,模型本身负责在对话历史中维护上下文。当对话过长超出上下文窗口,只能依赖开发者自行实现摘要或检索机制。MCP 内置了上下文管理能力。通过 Resources 原语,MCP Server 可以按需向模型暴露数据,模型不需要把所有信息塞进上下文窗口,而是在需要时通过 resources/read 动态加载。这有效缓解了长上下文场景下的 token 消耗问题。扩展性:逐个适配 vs 一次开发多处复用在传统模式下,为 GPT-4 写的数据库查询工具,换到 Claude 或 Gemini 就需要重新适配函数定义格式。每增加一个模型提供商,集成成本线性增长。MCP 的架构将工具提供方(MCP Server)和模型消费方(MCP Client)彻底解耦。开发者只需实现一次 MCP Server,所有支持 MCP 的客户端都能接入。2025 年的统计显示,MCP 将单次工具集成的平均耗时从 18 小时降至 4.2 小时。安全与权限传统函数调用的安全机制完全依赖开发者自行实现,缺乏统一标准,容易出现疏漏。MCP 在协议层面内置了安全设计:OAuth 2.1 认证:2025 年 MCP 规范更新后,远程 MCP Server 采用 OAuth 2.1 作为标准认证流程权限声明:工具可以声明所需权限,客户端在调用前进行授权检查沙箱隔离:MCP Server 以独立进程运行,天然具备进程级隔离传输机制:HTTP 请求 vs 双向通信传统函数调用基于 HTTP 请求-响应模式,每次调用都是一轮同步交互,模型发出函数调用请求后必须等待结果返回才能继续推理。MCP 支持双向通信:Stdio 传输:本地场景下通过标准输入输出通信,零网络开销Streamable HTTP:2025 年推出的新传输方式,取代了原先的 SSE,支持无状态化部署,服务端扩缩容对客户端透明双向通信使得 MCP 可以支持更复杂的交互模式,如工具执行进度的实时反馈、长时间任务的中断与恢复。什么时候用函数调用,什么时候用 MCP两者并非完全替代关系,适用场景各有侧重:适合传统函数调用的场景:工具数量少(3-5 个),且不打算跨客户端复用快速原型验证,不需要复杂的安全和权限体系单一模型提供商的封闭系统内使用适合 MCP 的场景:工具需要被多个 AI 客户端或 Agent 复用需要动态工具发现和运行时扩展对安全认证和权限管理有明确要求团队协作场景,不同开发者维护不同工具MCP 的发展趋势MCP 在 2025-2026 年经历了爆发式增长。GitHub 星标数三个月突破 25000,MCP Server 注册数量从 2025 年初的 1200 个增长到 2026 年 4 月的 9400+。OpenAI、Google、Microsoft 三大厂商均已宣布支持 MCP,78% 的企业 AI 团队已在生产环境中使用 MCP。2025 年底,Anthropic 将 MCP 捐赠给 Linux 基金会下的 Agentic AI Foundation,OpenAI 和 Block 作为联合创始人参与治理。2026 年的路线图聚焦于无状态化,目标是让 MCP Server 重启或扩缩容时客户端完全无感。对于面试而言,理解 MCP 与传统函数调用在协议标准化、工具发现、上下文管理和安全机制上的本质区别,比背诵八条差异点更有价值。面试官更看重你能否讲清楚"为什么 MCP 出现"这个架构演进逻辑。
服务端阅读 05月28日 06:49

MCP 在实际项目中有哪些应用场景?

MCP(Model Context Protocol)作为连接 AI 模型与外部工具的标准协议,已经在众多实际项目中落地。下面从开发者在工作中最常遇到的场景出发,逐一分析 MCP 的具体用法和实现思路。数据库查询与分析这是 MCP 最直接也最常用的场景。通过搭建一个 MCP 数据库 Server,LLM 就能直接理解自然语言查询意图,生成并执行 SQL,返回结构化结果。核心实现思路:在 MCP Server 中暴露 query 工具,接收自然语言描述,内部转换为 SQL 执行。关键是要在工具描述中提供表结构和字段含义,这样 LLM 才能生成准确的 SQL。实际应用举例:运营人员用自然语言问"上个月客单价最高的前 10 个城市",LLM 自动生成聚合查询并返回结果。相比传统的 BI 看板,这种方式零配置、更灵活,尤其适合探索性分析。需要注意的安全问题:必须设置只读权限,限制返回行数,防止 LLM 生成 DROP、DELETE 等危险操作。生产环境中建议增加查询审计日志。第三方 API 集成企业系统经常需要对接支付、物流、地图等外部服务。传统做法是为每个 API 写适配代码,换一个服务就要重写一遍。MCP 的做法是为每类服务封装一个 MCP Server,定义统一的工具接口。举个例子,支付场景可以封装一个 create_payment 工具和一个 query_payment_status 工具,LLM 只需调用工具即可完成支付流程编排,无需关心底层是支付宝还是微信支付。当需要更换支付渠道时,只改 Server 端实现,上层调用逻辑完全不变。这种方式特别适合 SaaS 产品:Forrester 预测 2026 年 30% 的企业应用供应商将推出自己的 MCP Server,让 AI 代理能直接与其数据交互。文件系统与代码仓库操作让 LLM 安全地读写文件是开发工具类应用的基础能力。通过 MCP 文件系统 Server,可以控制访问范围(如限定目录)、文件类型(如只允许读写 .md 和 .json),以及操作权限(如只读)。典型用例:代码生成工具读取项目模板文件、替换变量后写入新文件;文档处理工具批量转换 Markdown 到其他格式;日志分析工具扫描指定目录下的日志文件并提取关键信息。Cursor、Claude Code 等 IDE 工具已经在底层通过 MCP 实现了文件读写能力,开发者可能每天都在用而不自知。知识库与 RAG 检索MCP 天然适合对接向量数据库和知识库。Server 端封装 embedding 和检索逻辑,暴露 search_knowledge 工具,LLM 在回答用户问题时自动调用工具获取相关上下文,再基于检索结果生成回答。与直接调用向量数据库 API 相比,MCP 的优势在于标准化:同一套工具定义可以被 Claude、GPT、DeepSeek 等不同模型复用,无需为每个宿主写适配代码。这也印证了 MCP 解决的核心问题——将 N×M 的集成复杂度降为 N+M。实际部署要点:工具描述中要说明知识库覆盖的主题范围,这样 LLM 才知道什么时候该调用检索、什么时候用自己的知识就够。同时建议在返回结果中加入来源标注,便于用户核实信息。自动化工作流编排当任务涉及多个工具的顺序调用或条件分支时,MCP 也能发挥作用。Server 端可以将多个原子工具编排成一个更高层的组合工具,也可以让 LLM 自行决定调用顺序。一个实际案例:电商客服场景中,LLM 先调用订单查询工具获取订单状态,再根据状态决定是调用物流查询工具还是退款工具。整个流程对用户来说就是一段自然对话,但背后是多个 MCP 工具的协同调用。AutoBlogging.Pro 的案例更有说服力:从自定义 OpenAI 函数调用迁移到 MCP 架构后,新工具集成的部署时间从 3 天缩短到 11 分钟。监控与告警通过 MCP 对接 Prometheus、Grafana 等监控平台,LLM 可以实时查询系统指标、分析异常模式、甚至触发告警。相比传统的固定阈值告警,LLM 能理解"最近 CPU 使用率持续上升但尚未触发阈值"这种模糊描述,给出预警建议。实现上,MCP Server 封装监控平台的查询 API,暴露 query_metrics 和 list_alerts 等工具。LLM 根据用户描述选择合适的工具和参数,再将结果翻译成人类可读的分析报告。MCP 应用架构的核心要点不管具体场景如何变化,MCP 应用的架构模式是统一的:Host:发起连接的 AI 应用,如 Claude Desktop、Cursor IDEClient:维护与 Server 的 1:1 连接,处理协议细节Server:暴露工具(Tools)、资源(Resources)和提示词(Prompts)三种能力通信基于 JSON-RPC 2.0,支持 stdio(本地进程通信)和 SSE(远程 HTTP 通信)两种传输方式。开发者的主要工作是编写 Server 端的工具逻辑,Host 和 Client 都是现成的。工具设计的关键原则:工具描述要精确,参数 Schema 要完整,返回格式要结构化。描述越清晰,LLM 选择和调用工具的准确率越高。截至 2026 年初,MCP SDK 月下载量已达 9700 万次,社区构建了超过 200 个开源 MCP Server,覆盖 GitHub、Slack、PostgreSQL、Stripe、Figma 等主流平台。MCP 已经成为 AI 应用连接外部工具的事实标准。
服务端阅读 05月28日 06:45

MCP 如何实现多租户隔离?从协议机制到工程落地

MCP(Model Context Protocol)在多租户场景下面临核心挑战:单一 MCP 服务器如何同时为多个组织或客户提供隔离的、安全的服务?这涉及数据隔离、资源隔离、认证授权和性能隔离四个层面。下面从协议机制和工程实现两个维度展开。MCP 协议层面的多租户机制MCP 采用 Client-Server 架构,通过 JSON-RPC 2.0 通信。多租户支持需要在协议层面解决三个问题:租户身份传递:MCP 规范本身不定义租户字段,但通过 meta 字段可以携带租户标识。OpenAI Agents SDK 的 tool_meta_resolver 就利用这个机制,在每次工具调用时解析请求元数据中的 tenant_id。会话级隔离:每个 MCP Client 与 Server 建立独立会话(session),会话本身天然具有隔离性。多租户实现可以将租户上下文绑定到会话上,确保同一会话内的所有操作都在租户上下文中执行。OAuth 2.0 认证:远程 MCP Server 支持 OAuth 2.0 流程,通过 token 中的 tenant_id claim 实现租户识别。Microsoft Entra 的多租户应用注册模式就是典型实践——不同组织的用户通过各自 Azure AD 租户认证后访问同一个 MCP Server。租户上下文管理租户上下文是多租户架构的基础设施,它需要在请求的整个生命周期中保持可用:from dataclasses import dataclass, fieldfrom typing import Optionalimport contextvars@dataclassclass TenantContext: tenant_id: str tenant_name: str user_id: str permissions: list = field(default_factory=list) quotas: dict = field(default_factory=dict)current_tenant: contextvars.ContextVar[Optional[TenantContext]] = \ contextvars.ContextVar('current_tenant', default=None)class TenantContextManager: def __init__(self): self._contexts: dict[str, TenantContext] = {} def register(self, tenant_id: str, tenant_name: str, user_id: str, permissions: list = None, quotas: dict = None) -> TenantContext: ctx = TenantContext( tenant_id=tenant_id, tenant_name=tenant_name, user_id=user_id, permissions=permissions or [], quotas=quotas or self._default_quotas(), ) self._contexts[tenant_id] = ctx return ctx def activate(self, tenant_id: str) -> None: ctx = self._contexts.get(tenant_id) if ctx is None: raise KeyError(f"租户 {tenant_id} 未注册") current_tenant.set(ctx) def deactivate(self) -> None: current_tenant.set(None) def current(self) -> TenantContext: ctx = current_tenant.get() if ctx is None: raise RuntimeError("当前无活跃租户上下文") return ctx @staticmethod def _default_quotas() -> dict: return { "max_tools": 100, "max_resources": 1000, "max_requests_per_minute": 1000, "max_storage_mb": 1024, }使用 contextvars.ContextVar 而非线程局部变量,是因为 MCP Server 通常运行在 asyncio 事件循环中,ContextVar 能正确支持协程间的上下文隔离。activate / deactivate 的设计使得中间件可以在请求进入时设置上下文、请求结束时清理,避免上下文泄漏。数据隔离的三种策略多租户数据隔离有三种主流方案,各有取舍:方案一:共享数据库 + 行级隔离(tenant_id 列过滤)所有租户数据存同一张表,通过 tenant_id 列区分。实现简单,但需要在每次查询中都加入 tenant_id 过滤条件,遗漏即造成数据泄漏。方案二:共享数据库 + Schema 隔离每个租户拥有独立的数据库 Schema,表结构相同但数据物理隔离。安全性高于行级隔离,但 Schema 迁移管理复杂。方案三:独立数据库每个租户使用独立的数据库实例,隔离性最强,但运维成本和资源开销最大。以下是基于方案一的行级隔离实现,配合 SQLAlchemy 全局自动过滤:from sqlalchemy import create_engine, Column, String, Integer, Text, Indexfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, scoped_session, SessionBase = declarative_base()class TenantData(Base): __tablename__ = 'tenant_data' id = Column(Integer, primary_key=True) tenant_id = Column(String(50), nullable=False, index=True) data_key = Column(String(100), nullable=False) data_value = Column(Text) __table_args__ = ( Index('idx_tenant_key', 'tenant_id', 'data_key', unique=True), )class TenantSession(Session): """自动注入租户过滤的 Session 子类""" def __init__(self, tenant_id: str, *args, **kwargs): super().__init__(*args, **kwargs) self._tenant_id = tenant_id def query(self, *entities, **kwargs): q = super().query(*entities, **kwargs) for entity in entities: if hasattr(entity, 'tenant_id'): q = q.filter(entity.tenant_id == self._tenant_id) return qclass MultiTenantDatabase: def __init__(self, database_url: str): self.engine = create_engine(database_url) Base.metadata.create_all(self.engine) def session(self, tenant_id: str) -> TenantSession: return TenantSession( tenant_id, bind=self.engine, ) def save(self, tenant_id: str, key: str, value: str): with self.session(tenant_id) as s: existing = s.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == key, ).first() if existing: existing.data_value = value else: s.add(TenantData( tenant_id=tenant_id, data_key=key, data_value=value, )) s.commit()关键设计点:TenantSession 继承自 Session 并在 query 方法中自动注入 tenant_id 过滤,从根本上杜绝了忘记加过滤条件导致的数据泄漏。写入时通过 Session 的 before_flush 事件确保 tenant_id 被正确填入。资源配额与速率限制不同租户的付费等级不同,需要精确控制每个租户可用的工具数量、存储空间和请求频率:import timefrom collections import defaultdictfrom dataclasses import dataclass, field@dataclassclass QuotaConfig: max_tools: int = 100 max_resources: int = 1000 max_requests_per_minute: int = 1000 max_storage_mb: int = 1024class QuotaManager: def __init__(self): self._configs: dict[str, QuotaConfig] = {} self._usage: dict[str, dict[str, int]] = defaultdict( lambda: defaultdict(int) ) self._rate_windows: dict[str, list[float]] = defaultdict(list) def configure(self, tenant_id: str, config: QuotaConfig): self._configs[tenant_id] = config def config(self, tenant_id: str) -> QuotaConfig: return self._configs.get(tenant_id, QuotaConfig()) def consume(self, tenant_id: str, resource: str, amount: int = 1) -> bool: cfg = self.config(tenant_id) limit = getattr(cfg, resource, None) if limit is None: return True if self._usage[tenant_id][resource] + amount > limit: return False self._usage[tenant_id][resource] += amount return True def release(self, tenant_id: str, resource: str, amount: int = 1): self._usage[tenant_id][resource] = max( 0, self._usage[tenant_id][resource] - amount ) def check_rate(self, tenant_id: str) -> bool: cfg = self.config(tenant_id) now = time.monotonic() window = self._rate_windows[tenant_id] cutoff = now - 60 self._rate_windows[tenant_id] = [ t for t in window if t > cutoff ] if len(self._rate_windows[tenant_id]) >= cfg.max_requests_per_minute: return False self._rate_windows[tenant_id].append(now) return TrueQuotaManager 区分了资源配额(如工具数量、存储空间)和速率限制(每分钟请求数)。前者是持久消耗型,用 consume / release 管理;后者是滑动窗口型,用 check_rate 在每次请求前校验。time.monotonic() 而非 time.time() 避免了系统时钟回拨导致的速率限制失效。租户级别的工具与资源注册MCP Server 中的工具(Tools)和资源(Resources)需要按租户可见性进行隔离。核心思路是:全局工具对所有租户可见,租户专属工具仅对拥有权限的租户展示:from mcp.server import Serverfrom functools import wrapsfrom collections import defaultdictclass MultiTenantServer(Server): def __init__(self, name: str, ctx_manager: TenantContextManager): super().__init__(name) self._ctx = ctx_manager self._global_tools: dict[str, dict] = {} self._tenant_tools: dict[str, dict[str, dict]] = defaultdict(dict) self._global_resources: dict[str, dict] = {} self._tenant_resources: dict[str, dict[str, dict]] = defaultdict(dict) def register_tool(self, name: str, handler, description: str, tenant_id: str = None): entry = {"handler": handler, "description": description} if tenant_id: self._tenant_tools[tenant_id][name] = entry else: self._global_tools[name] = entry def register_resource(self, uri: str, handler, name: str, description: str, tenant_id: str = None): entry = {"handler": handler, "name": name, "description": description} if tenant_id: self._tenant_resources[tenant_id][uri] = entry else: self._global_resources[uri] = entry async def list_tools(self) -> list[dict]: ctx = current_tenant.get() if ctx is None: return [] tools = [ {"name": n, "description": t["description"]} for n, t in self._global_tools.items() ] for n, t in self._tenant_tools.get(ctx.tenant_id, {}).items(): tools.append({"name": n, "description": t["description"]}) return tools async def call_tool(self, name: str, arguments: dict): ctx = current_tenant.get() if ctx is None: raise PermissionError("未找到租户上下文") entry = self._tenant_tools.get(ctx.tenant_id, {}).get(name) if entry is None: entry = self._global_tools.get(name) if entry is None: raise KeyError(f"工具 {name} 不存在") if "admin" not in ctx.permissions: required = entry.get("required_permission") if required and required not in ctx.permissions: raise PermissionError( f"租户 {ctx.tenant_id} 无权使用工具 {name}" ) return await entry["handler"](arguments)list_tools 只返回当前租户可见的工具列表,call_tool 在执行前校验租户权限。查找顺序是"租户专属优先,全局兜底",这样同名工具可以被租户覆盖以提供定制行为。认证与授权MCP 远程 Server 支持 OAuth 2.0,多租户认证的推荐做法是将租户身份编码在 JWT token 中:import jwtfrom datetime import datetime, timedelta, timezonefrom typing import Anyclass TenantAuthenticator: def __init__(self, secret_key: str, algorithm: str = "HS256"): self._secret = secret_key self._algo = algorithm def issue_token(self, tenant_id: str, user_id: str, permissions: list[str], expires_in: int = 3600) -> str: now = datetime.now(timezone.utc) payload = { "sub": user_id, "tid": tenant_id, "perms": permissions, "iat": now, "exp": now + timedelta(seconds=expires_in), } return jwt.encode(payload, self._secret, algorithm=self._algo) def verify(self, token: str) -> dict[str, Any]: try: return jwt.decode( token, self._secret, algorithms=[self._algo] ) except jwt.ExpiredSignatureError: raise ValueError("令牌已过期") except jwt.InvalidTokenError: raise ValueError("无效令牌") def authenticate(self, token: str) -> TenantContext: claims = self.verify(token) return TenantContext( tenant_id=claims["tid"], tenant_name=claims.get("tenant_name", claims["tid"]), user_id=claims["sub"], permissions=claims.get("perms", []), )JWT 中用 tid(tenant id)而非 tenant_id,是为了缩短 token 体积。authenticate 方法直接返回 TenantContext,中间件可以一行代码完成"验证 token + 激活租户上下文"。在实际部署中,如果 MCP Server 对接 Microsoft Entra 等 IdP,则 tid 可以直接映射为 Entra JWT 中的 tid claim(即 Azure AD 租户 ID),无需自行签发 token。租户监控与审计多租户环境下需要按租户维度收集指标和审计日志,既用于运营分析,也为故障排查和安全审计提供依据:from datetime import datetime, timedelta, timezonefrom collections import defaultdictclass TenantMonitor: def __init__(self, max_records: int = 5000): self._max = max_records self._metrics: dict[str, dict[str, list[dict]]] = \ defaultdict(lambda: defaultdict(list)) self._audit_log: list[dict] = [] def record(self, tenant_id: str, metric: str, value: float): entry = { "value": value, "ts": datetime.now(timezone.utc).isoformat(), } buf = self._metrics[tenant_id][metric] buf.append(entry) if len(buf) > self._max: self._metrics[tenant_id][metric] = buf[-self._max:] def audit(self, tenant_id: str, action: str, detail: str = ""): self._audit_log.append({ "tenant_id": tenant_id, "action": action, "detail": detail, "ts": datetime.now(timezone.utc).isoformat(), }) if len(self._audit_log) > 10000: self._audit_log = self._audit_log[-10000:] def aggregate(self, tenant_id: str, metric: str, since: datetime = None) -> dict: records = self._metrics[tenant_id].get(metric, []) if since: records = [ r for r in records if datetime.fromisoformat(r["ts"]) >= since ] if not records: return {} values = [r["value"] for r in records] return { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values), } def report(self, tenant_id: str, days: int = 7) -> dict: since = datetime.now(timezone.utc) - timedelta(days=days) return { "tenant_id": tenant_id, "period_days": days, "metrics": { m: self.aggregate(tenant_id, m, since) for m in self._metrics.get(tenant_id, {}) }, }审计日志记录了"哪个租户在什么时间做了什么操作",当发生安全事件时可以快速追溯。指标聚合支持按时间范围过滤,便于生成租户维度的运营报告。多租户 MCP Server 的中间件集成将上述组件串联起来,MCP Server 的请求处理流程如下:请求到达,中间件从 HTTP Header 或 OAuth token 中提取租户身份TenantAuthenticator.authenticate() 验证 token 并构建 TenantContextTenantContextManager.activate() 激活租户上下文QuotaManager.check_rate() 校验速率限制执行工具/资源操作(自动带租户隔离)TenantMonitor.record() 记录指标,audit() 记录审计日志TenantContextManager.deactivate() 清理租户上下文async def tenant_middleware(request, handler): token = request.headers.get("Authorization", "").removeprefix("Bearer ") try: ctx = authenticator.authenticate(token) ctx_manager.activate(ctx.tenant_id) if not quota_manager.check_rate(ctx.tenant_id): return {"error": "rate_limit_exceeded"}, 429 monitor.audit(ctx.tenant_id, "request", request.path) result = await handler(request) monitor.record(ctx.tenant_id, "request_count", 1) return result except ValueError as e: return {"error": str(e)}, 401 finally: ctx_manager.deactivate()中间件模式确保租户上下文的生命周期管理集中在一处,业务逻辑无需关心租户切换细节。finally 块中的 deactivate() 保证了即使请求处理异常,租户上下文也不会泄漏到下一个请求。隔离方案选择建议不同规模和场景适合不同的隔离策略:| 场景 | 推荐方案 | 理由 ||------|---------|------|| 初创期/租户少于20 | 行级隔离 | 实现简单,运维成本低 || 中等规模/20-200租户 | Schema 隔离 | 兼顾安全性和管理成本 || 企业级/200+租户 | 独立数据库 | 合规要求高,隔离性最强 || 混合场景 | 行级隔离 + 热点租户独立库 | 平衡成本与性能 |选择时还需考虑:是否需要支持租户级别的数据库备份恢复、是否面临合规审计要求、单个租户的数据量是否足以影响整体性能。多租户是 MCP Server 走向生产环境的关键能力。核心在于让租户身份贯穿请求全生命周期——从认证到数据访问再到监控审计,每一层都必须感知租户边界。
服务端阅读 05月28日 06:44

MCP 错误处理与重试机制怎么做?从错误码到断路器的实战方案

MCP 服务上线第一天就翻了车——tool 调用超时、server 不响应、客户端疯狂重试把整个链路打崩。这是很多团队上生产时的真实经历,问题不在 MCP 本身,而在于没做好错误处理和重试。MCP 基于 JSON-RPC 2.0 协议通信,错误处理的核心在于:区分哪些错能重试、哪些不能,以及重试时怎么避免把服务打崩。下面从 MCP 协议本身的错误体系讲起,再到重试策略、断路器和降级方案的实战配置。MCP 协议的错误码体系MCP 定义了两层错误码:标准 JSON-RPC 错误和协议扩展错误。搞不清这两层的区别,重试逻辑就是瞎写。标准 JSON-RPC 错误(客户端和服务端都可能返回):| 错误码 | 含义 | 能否重试 ||--------|------|----------|| -32700 | Parse error,请求体格式错误 | 不能 || -32600 | Invalid Request,请求不合法 | 不能 || -32601 | Method not found | 不能 || -32602 | Invalid params | 不能 || -32603 | Internal error | 视情况 |MCP 扩展错误(code < -32000):| 错误码 | 含义 | 能否重试 ||--------|------|----------|| -32000 | Server error,服务端通用错误 | 可能可以 || -32001 | 请求超时 | 可以 || -32002 | 速率限制(Rate limit) | 可以(需等 retryAfter) |关键点:-32xxx 范围的错误才是"可能重试"的候选,-32xxx 以下的参数类错误重试也没用——你传错了参数,重试 100 次还是错。MCP 的错误响应里可以带 data 字段,里面放 retryable 和 retryAfter,就是告诉客户端"这个错能重试,等多久再试":{ "jsonrpc": "2.0", "id": "req-123", "error": { "code": -32001, "message": "Request timeout", "data": { "retryable": true, "retryAfter": 3 } }}所以第一步是:在错误处理逻辑里,根据错误码判断是否可重试,而不是一刀切全部重试。Stdio 和 HTTP/SSE Transport 的错误处理差异MCP 支持两种 transport,错误处理方式完全不同,搞混了会踩坑。Stdio transport:没有重试机制。进程挂了就是挂了,错误写 stderr,进程退出码非零表示失败。客户端能做的是重启进程然后重放请求——但要注意,重启后的 server 是全新状态,之前的 session 已经丢了。我见过有人重启后拿旧 session_id 去请求,服务端根本不认,排查了半天才发现是 session 丢失的问题。HTTP/SSE transport:有 HTTP 状态码可以判断。400 系列是客户端问题(不用重试),500 系列是服务端问题(可以重试),503 通常带 Retry-After header。SSE 流断开时,用最后一个 event ID 重连可以续上,不需要从头开始——这是 SSE transport 相比 Stdio 的一个明显优势。import httpximport asyncioclass MCPRetryableError(Exception): passclass MCPClientError(Exception): passasync def call_mcp_with_http(method: str, params: dict): async with httpx.AsyncClient() as client: resp = await client.post( "http://mcp-server:8080/mcp", json={"jsonrpc": "2.0", "id": "1", "method": method, "params": params} ) if resp.status_code == 429: retry_after = int(resp.headers.get("Retry-After", 5)) await asyncio.sleep(retry_after) return await call_mcp_with_http(method, params) elif resp.status_code >= 500: raise MCPRetryableError(resp.json()) elif resp.status_code >= 400: raise MCPClientError(resp.json()) return resp.json()指数退避重试:别让重试变成雪崩"超时了再试一次"是最直觉的做法,但问题在于:如果 100 个客户端同时超时、同时重试,服务端刚缓过来又被压垮。这就是 thundering herd 问题,在 MCP 多客户端场景下特别常见。指数退避的思路是每次重试等更久:第 1 次等 1 秒,第 2 次等 2 秒,第 3 次等 4 秒。再加上随机抖动(jitter),让各客户端的重试时间错开:import randomimport asyncioasync def retry_with_backoff(func, max_retries=3, base_delay=1.0, max_delay=32.0): last_error = None for attempt in range(max_retries): try: return await func() except MCPRetryableError as e: last_error = e if attempt == max_retries - 1: break delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay) await asyncio.sleep(delay) raise last_error参数选择的经验值(MCP 场景下):base_delay:0.5-1 秒。MCP 服务如果只是临时过载,1 秒内通常能恢复max_delay:16-32 秒。超过 30 秒还没恢复,大概率不是临时问题,再重试没意义max_retries:3 次。对 MCP 来说够了,更多重试不如触发断路器jitter:必须加。不加 jitter 的退避在多客户端场景下等于没退避,所有客户端会在同一时刻重试断路器模式:连续失败时及时止损重试解决的是"临时故障",但如果是持续故障(比如 MCP server 的下游数据库挂了),越重试越雪上加霜。断路器的作用是:失败次数超过阈值后直接拒绝请求,不再调用服务端,给对方恢复的时间。断路器有三个状态:CLOSED(正常):请求正常通过,同时统计失败次数OPEN(熔断):直接拒绝,不调用服务端。等超时后进入半开HALF_OPEN(试探):放一个请求过去试探,成功则恢复 CLOSED,失败则继续 OPENimport timefrom enum import Enumclass CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open"class MCPCircuitBreaker: def __init__(self, failure_threshold=5, recovery_timeout=30.0): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = 0 async def call(self, func): if self.state == CircuitState.OPEN: if time.time() - self.last_failure_time >= self.recovery_timeout: self.state = CircuitState.HALF_OPEN else: raise Exception("Circuit breaker OPEN, fast fail") try: result = await func() self.failure_count = 0 if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.CLOSED return result except Exception: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN raiseMCP 场景下断路器参数建议:failure_threshold:5 次。MCP server 连续 5 次都失败,基本可以判定服务异常recovery_timeout:30 秒。比普通 HTTP 服务短,因为 MCP 调用通常是轻量级的,恢复较快重试 + 断路器:生产环境必须组合使用实际生产中两者必须配合:重试处理临时抖动,断路器处理持续故障。请求先过断路器,断路器放行后才进入重试逻辑:breaker = MCPCircuitBreaker(failure_threshold=5, recovery_timeout=30.0)async def resilient_mcp_call(method: str, params: dict): async def _call(): return await mcp_client.call(method, params) async def _guarded_call(): return await breaker.call(_call) return await retry_with_backoff(_guarded_call, max_retries=3)注意顺序:先断路器后重试。反过来会出问题——断路器 OPEN 时,重试逻辑还会等 3 次超时才放弃,白白浪费时间。断路器在前可以快速失败,客户端立刻知道"别等了"。优雅降级:服务彻底不可用时的兜底重试和断路器都救不了的场面——MCP server 完全挂了——系统不能直接崩溃,需要有降级方案:async def mcp_call_with_fallback(method: str, params: dict, fallback=None): try: return await resilient_mcp_call(method, params) except Exception: if fallback is not None: return fallback raise常见的降级策略:缓存回退:MCP tool 返回的数据如果短期内不变,缓存上次成功结果直接返回默认值兜底:非核心功能(如推荐、个性化)返回默认值,只有核心功能才报错备用服务:同一个 tool 如果有备用 server,切换过去核心思路:把 MCP 调用分为"必须成功"和"可以降级"两类,前者才需要严格的重试和断路器,后者用 fallback 兜底。2026 MCP Roadmap:重试语义即将标准化MCP 官方在 2026 路线图中明确将 retry semantics 列为重点完善领域,目前的方向:标准化的重试语义:定义哪些错误应该重试、重试策略如何声明,而不是让每个客户端自己实现Streamable HTTP transport:新 transport 原生支持负载均衡和水平扩展,server 重启对客户端无感,从架构层面减少需要重试的场景无状态化 session:session 创建、恢复、迁移标准化后,server 扩缩容时客户端不需要重连重试当前的"手动实现重试+断路器"是过渡方案,后续 MCP SDK 很可能会内置这些能力。但在标准化落地之前,上面这套方案是生产环境必备的防护网。
服务端阅读 05月28日 06:44

MCP 服务器怎么部署到生产环境?从 Docker 到 K8s 的完整方案

MCP(Model Context Protocol)已成为 AI 应用连接外部工具和数据的标准协议,2026 年活跃公共 MCP 服务器超过 10,000 个,每月 SDK 下载量接近 1 亿次。但把 MCP 从本地开发推到生产环境,需要处理传输安全、认证鉴权、容器编排、监控告警等一系列问题。本文从实际部署经验出发,给出从 Docker 单机到 Kubernetes 集群的完整方案。MCP 生产部署的关键决策在写任何配置文件之前,先做三个决策:1. 选择传输协议MCP 支持两种传输方式:Stdio 和 Streamable HTTP。Stdio 适合本地开发和测试,生产环境必须使用 Streamable HTTP(2025 年 3 月已替代旧的 HTTP+SSE)。远程部署时,Streamable HTTP 支持负载均衡、反向代理和标准 HTTP 基础设施。2. 选择认证方式2025 年修订的 MCP 规范推荐 OAuth 2.1 作为 HTTP 传输的标准认证方案。三种生产认证模式:单服务多用户:MCP 服务器自身管理用户身份,适合独立工具类服务委托身份:MCP 服务器将用户身份透传给下游 API,适合企业内部服务审计追踪:需要在下游 API 调用中携带用户身份证据,适合合规要求高的场景认证设计要前置——事后改造成本是前期设计的 2-3 倍。3. 确定部署架构根据流量和可用性要求选择:单实例 + 反向代理:日请求 < 10 万多实例 + 负载均衡:日请求 10-100 万K8s 集群 + HPA 自动扩缩:日请求 > 100 万Docker 容器化部署编写 DockerfileMCP 服务器通常基于 Python 或 Node.js,以下以 Python 为例:FROM python:3.11-slimWORKDIR /app# 先复制依赖文件,利用 Docker 缓存层COPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt# 再复制应用代码COPY . .# 非 root 用户运行RUN useradd -m mcpUSER mcpEXPOSE 8000HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1CMD ["python", "-m", "mcp.server", "--host", "0.0.0.0", "--port", "8000"]注意点:使用非 root 用户运行、先复制 requirements.txt 利用缓存层、必须配置健康检查。docker-compose 本地编排开发和小规模部署用 docker-compose 即可:version: '3.8'services: mcp-server: build: . ports: - "8000:8000" environment: - MCP_TRANSPORT=streamable-http - MCP_HOST=0.0.0.0 - MCP_PORT=8000 - DATABASE_URL=postgresql://user:pass@db:5432/mcp - REDIS_URL=redis://redis:6379 - LOG_LEVEL=info depends_on: db: condition: service_healthy redis: condition: service_healthy restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/health"] interval: 30s timeout: 10s retries: 3 db: image: postgres:15 environment: POSTGRES_DB: mcp POSTGRES_USER: user POSTGRES_PASSWORD: pass volumes: - postgres_data:/var/lib/postgresql/data healthcheck: test: ["CMD-SHELL", "pg_isready -U user"] interval: 5s timeout: 5s retries: 5 redis: image: redis:7-alpine volumes: - redis_data:/data healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5svolumes: postgres_data: redis_data:关键改进:添加 condition: service_healthy 确保 MCP 服务器在依赖服务就绪后才启动,避免启动时连接失败。Kubernetes 生产部署Deployment 和 ServiceapiVersion: apps/v1kind: Deploymentmetadata: name: mcp-serverspec: replicas: 3 selector: matchLabels: app: mcp-server template: metadata: labels: app: mcp-server spec: containers: - name: mcp-server image: your-registry/mcp-server:latest ports: - containerPort: 8000 env: - name: MCP_TRANSPORT value: "streamable-http" - name: DATABASE_URL valueFrom: secretKeyRef: name: mcp-secrets key: database-url - name: SECRET_KEY valueFrom: secretKeyRef: name: mcp-secrets key: secret-key resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5---apiVersion: v1kind: Servicemetadata: name: mcp-serverspec: selector: app: mcp-server ports: - protocol: TCP port: 80 targetPort: 8000 type: LoadBalancerHPA 自动扩缩apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: mcp-server-hpaspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: mcp-server minReplicas: 3 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80网络策略与安全生产环境必须限制 MCP 服务器的网络访问范围:apiVersion: networking.k8s.io/v1kind: NetworkPolicymetadata: name: mcp-server-netpolspec: podSelector: matchLabels: app: mcp-server policyTypes: - Ingress - Egress ingress: - from: - namespaceSelector: matchLabels: name: api-gateway ports: - protocol: TCP port: 8000 egress: - to: - podSelector: matchLabels: app: postgres ports: - protocol: TCP port: 5432 - to: - podSelector: matchLabels: app: redis ports: - protocol: TCP port: 6379MCP 安全加固实践MCP 部署到生产环境,安全是最容易被忽视的环节。2025 年 4 月安全研究人员已披露 MCP 存在提示注入、工具权限组合攻击等风险。工具级别的访问控制# 为每个工具设置独立的权限和速率限制TOOL_CONFIG = { "read_file": { "scope": "readonly", "rate_limit": 100, # 每分钟请求数 "kill_switch": "feature_flag.read_file.disabled" }, "write_file": { "scope": "mutation", "rate_limit": 20, "kill_switch": "feature_flag.write_file.disabled" }, "execute_command": { "scope": "dangerous", "rate_limit": 5, "kill_switch": "feature_flag.execute_command.disabled", "requires_confirmation": True }}三个关键安全实践:Per-tool kill-switch:通过功能开关单独禁用某个工具,不影响其他功能读写分离限流:读操作和写操作使用不同的速率限制阈值危险操作确认:执行命令等高风险工具需要二次确认OAuth 2.1 认证实现from authlib.integrations.starlette_client import OAuthoauth = OAuth()oauth.register( name='mcp_auth', server_metadata_url='https://auth.example.com/.well-known/openid-configuration', client_id='mcp-server-client', client_secret='YOUR_CLIENT_SECRET', client_kwargs={'scope': 'openid profile email'})# 在 MCP 服务器中验证 tokenasync def verify_token(request): token = request.headers.get('Authorization', '').replace('Bearer ', '') if not token: raise HTTPException(status_code=401, detail="Missing token") try: claims = await oauth.mcp_auth.parse_id_token(token) return claims except Exception: raise HTTPException(status_code=401, detail="Invalid token")监控与可观测性Prometheus 指标采集MCP 服务器需要暴露三类核心指标:from prometheus_client import Counter, Histogram, Gauge, start_http_server# 请求指标REQUEST_COUNT = Counter( 'mcp_requests_total', 'Total MCP requests', ['method', 'tool_name', 'status'])REQUEST_DURATION = Histogram( 'mcp_request_duration_seconds', 'Request duration', ['tool_name'])# 连接指标ACTIVE_CONNECTIONS = Gauge( 'mcp_active_connections', 'Active SSE connections')# 工具调用指标TOOL_CALLS = Counter( 'mcp_tool_calls_total', 'Tool invocation count', ['tool_name', 'status'])TOOL_ERRORS = Counter( 'mcp_tool_errors_total', 'Tool error count', ['tool_name', 'error_type'])def start_metrics_server(port=9090): start_http_server(port)告警规则# Prometheus alerting rulesgroups:- name: mcp-alerts rules: - alert: MCPHighErrorRate expr: rate(mcp_tool_errors_total[5m]) / rate(mcp_tool_calls_total[5m]) > 0.1 for: 5m labels: severity: warning annotations: summary: "MCP tool error rate exceeds 10%" - alert: MCPSlowResponse expr: histogram_quantile(0.95, rate(mcp_request_duration_seconds_bucket[5m])) > 5 for: 10m labels: severity: warning annotations: summary: "MCP P95 latency exceeds 5s" - alert: MCPConnectionSaturation expr: mcp_active_connections > 80 for: 5m labels: severity: critical annotations: summary: "MCP active connections approaching limit"结构化日志import structloglogger = structlog.get_logger()# 每次工具调用记录结构化日志async def handle_tool_call(tool_name: str, arguments: dict, user_id: str): log = logger.bind(tool=tool_name, user_id=user_id) log.info("tool_call_started", arguments_keys=list(arguments.keys())) try: result = await execute_tool(tool_name, arguments) log.info("tool_call_completed", result_size=len(str(result))) return result except Exception as e: log.error("tool_call_failed", error_type=type(e).__name__, error_message=str(e)) raise结构化日志让排查问题更高效:按 toolname 过滤、追踪 userid 的操作链、统计错误类型分布。配置管理生产环境禁止硬编码,使用环境变量 + 配置中心:from pydantic_settings import BaseSettingsclass MCPSettings(BaseSettings): # 传输配置 transport: str = "streamable-http" host: str = "0.0.0.0" port: int = 8000 # 认证配置 auth_enabled: bool = True oauth_issuer: str = "" oauth_audience: str = "" # 数据库配置 database_url: str = "" database_pool_size: int = 10 # Redis 配置 redis_url: str = "redis://localhost:6379" cache_ttl: int = 3600 # 安全配置 secret_key: str = "" max_connections: int = 100 request_timeout: int = 30 rate_limit_per_minute: int = 60 # 日志配置 log_level: str = "INFO" log_format: str = "json" class Config: env_file = ".env" env_prefix = "MCP_"settings = MCPSettings()K8s 环境中通过 ConfigMap 管理非敏感配置,Secret 管理密钥和凭证:apiVersion: v1kind: ConfigMapmetadata: name: mcp-configdata: MCP_TRANSPORT: "streamable-http" MCP_HOST: "0.0.0.0" MCP_PORT: "8000" MCP_LOG_LEVEL: "info" MCP_LOG_FORMAT: "json" MCP_RATE_LIMIT_PER_MINUTE: "60"---apiVersion: v1kind: Secretmetadata: name: mcp-secretstype: OpaquestringData: database-url: "postgresql://user:pass@db:5432/mcp" secret-key: "your-secret-key" oauth-issuer: "https://auth.example.com"CI/CD 自动化部署# .github/workflows/deploy.ymlname: Deploy MCP Serveron: push: branches: [main]jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: '3.11' - run: pip install -r requirements.txt pytest pytest-cov - run: pytest --cov=mcp --cov-report=xml build: needs: test runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Build and push Docker image run: | docker build -t mcp-server:${{ github.sha }} . docker push your-registry/mcp-server:${{ github.sha }} docker tag mcp-server:${{ github.sha }} your-registry/mcp-server:latest docker push your-registry/mcp-server:latest deploy: needs: build runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - name: Deploy to Kubernetes uses: azure/k8s-deploy@v4 with: manifests: k8s/ images: your-registry/mcp-server:${{ github.sha }} kubeconfig: ${{ secrets.KUBE_CONFIG }}备份与灾难恢复定期备份策略:#!/bin/bash# backup.sh - 每日自动备份set -euo pipefailBACKUP_DIR="/backups/$(date +%Y%m%d)"mkdir -p "$BACKUP_DIR"# 数据库备份pg_dump "$DATABASE_URL" | gzip > "$BACKUP_DIR/db.sql.gz"# 配置备份kubectl get configmap mcp-config -o yaml > "$BACKUP_DIR/configmap.yaml"kubectl get secret mcp-secrets -o yaml > "$BACKUP_DIR/secrets.yaml"# 清理 7 天前的备份find /backups -maxdepth 1 -mtime +7 -exec rm -rf {} +echo "Backup completed: $BACKUP_DIR"灾难恢复清单:数据库每日全量备份 + WAL 归档实现增量恢复K8s 配置通过 GitOps 管理,可从 Git 仓库重建镜像版本明确打标签,避免 latest 标签导致回滚困难定期演练恢复流程,验证 RTO 和 RPO 指标常见问题排查| 症状 | 可能原因 | 排查方法 ||------|---------|---------|| 工具调用超时 | 下游 API 响应慢 | 检查 REQUESTDURATION 的 P99 值,确认超时配置 || 连接数突增 | 客户端未正确关闭连接 | 检查 ACTIVECONNECTIONS 趋势,确认连接池配置 || 工具调用报错率升高 | 下游服务异常 | 按 errortype 分组查看 TOOLERRORS,检查下游健康状态 || OOM Kill | 内存泄漏 | 检查 Pod 内存使用趋势,增加 limits 或修复泄漏 || 认证失败 | Token 过期或密钥轮换 | 检查 OAuth issuer 配置,验证 JWKS 端点可达 |总结MCP 生产部署的核心要点:传输协议必须使用 Streamable HTTP,不要用 Stdio认证方案前置设计,OAuth 2.1 是标准选择安全上要实现 per-tool kill-switch 和读写分离限流监控覆盖请求量、延迟、错误率、连接数四个维度日志必须结构化,便于聚合分析配置通过环境变量注入,敏感信息用 Secret 管理CI/CD 流水线确保每次变更可追溯、可回滚备份和恢复流程必须定期演练
服务端阅读 05月28日 06:44

MCP 如何与微服务架构结合?

MCP(Model Context Protocol)与微服务架构的结合,是构建可扩展 AI 应用系统的关键路径。MCP 本身采用客户端-服务器架构,天然契合微服务的拆分与独立部署理念。本文从实际架构设计出发,讲清楚 MCP 在微服务中怎么拆、怎么连、怎么管。为什么 MCP 适合微服务架构MCP 解决的核心问题是 AI 系统与外部工具集成的 N×M 复杂度——每个 AI 应用对接每个工具需要单独适配。MCP 将其简化为 N+M:每个 AI 应用实现一次客户端协议,每个工具实现一次服务器协议,即可互操作。这与微服务架构解决的问题是同构的:微服务将单体应用拆分为独立服务,通过标准化协议通信;MCP 将 AI 能力拆分为独立工具服务器,通过标准化协议对接。两者叠加,可以构建出高度解耦、独立演进的 AI 微服务系统。MCP 微服务架构的三层设计┌─────────────────────────────────────────────┐│ API Gateway / BFF ││ (认证、限流、路由、协议转换) │├─────────────────────────────────────────────┤│ MCP Client Layer ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │ Client A │ │ Client B │ │ Client C │ ││ └────┬─────┘ └────┬─────┘ └────┬─────┘ │├────────┼────────────┼────────────┼──────────┤│ └────────────┼────────────┘ ││ MCP Server Layer ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │ DB MCP │ │ API MCP │ │ File MCP │ ││ │ Server │ │ Server │ │ Server │ ││ └──────────┘ └──────────┘ └──────────┘ │├─────────────────────────────────────────────┤│ Infrastructure Layer ││ Consul / etcd / Kafka / Redis / Prometheus│└─────────────────────────────────────────────┘第一层:MCP Server——按业务域拆分拆分粒度是第一个决策点。原则是按业务能力拆分,而非按技术层拆分:| 拆分方式 | 示例 | 判断 ||---------|------|------|| 按业务域 | 用户服务 MCP、订单服务 MCP、支付服务 MCP | 正确:边界清晰,独立演进 || 按技术层 | 数据库 MCP、缓存 MCP、消息队列 MCP | 错误:职责混杂,变更耦合 |每个 MCP Server 独立部署,暴露三类能力:Tools:可调用的操作(如 create_order、query_user)Resources:可读取的数据(如 user://123/profile)Prompts:预定义的交互模板(如 analyze_order_trend)# MCP Server 注册示例 - 按业务域拆分from mcp.server import Serverorder_server = Server("order-service")@order_server.tool()async def create_order(user_id: str, items: list) -> dict: # 创建订单,调用订单微服务的内部 API return await order_service.create(user_id, items)@order_server.resource("order://{order_id}")async def get_order(order_id: str) -> str: # 获取订单详情 order = await order_service.get(order_id) return order.to_json()第二层:MCP Client——编排与上下文管理MCP Client 是 AI 应用与 MCP Server 之间的桥梁。在微服务架构中,Client 需要处理三件事:服务发现、连接管理和上下文聚合。from mcp.client import Clientclass MCPClientManager: # 管理多个 MCP Server 连接 def __init__(self, service_discovery): self.discovery = service_discovery self.clients: dict[str, Client] = {} async def connect(self, server_name: str) -> Client: # 通过服务发现连接 MCP Server endpoint = await self.discovery.resolve(server_name) client = Client(f"{endpoint}/mcp") await client.connect() self.clients[server_name] = client return client async def aggregate_context(self, query: str) -> str: # 聚集多个 Server 的上下文 contexts = [] for name, client in self.clients.items(): resources = await client.list_resources() for r in resources: if self._is_relevant(r, query): content = await client.read_resource(r.uri) contexts.append(f"[{name}] {content}") return "\n\n".join(contexts)第三层:基础设施——服务发现与配置MCP Server 作为微服务,需要注册到服务发现组件。关键区别在于:MCP Server 的健康检查不仅要检查 HTTP 端口,还要验证 MCP 协议握手是否正常。# MCP Server 注册到 Consul(含协议级健康检查)import consuldef register_mcp_server(consul_client, server_name, address, port): consul_client.agent.service.register( name=server_name, address=address, port=port, tags=["mcp", "v2"], # 标记为 MCP 服务 check=consul.Check.http( url=f"http://{address}:{port}/health", interval="10s", header={"MCP-Protocol-Version": ["2025-03-26"]} ) )三个关键集成模式模式一:API Gateway 统一入口在微服务架构中,API Gateway 是统一入口。MCP 的加入需要 Gateway 支持协议转换——外部请求通过 REST/GraphQL 进入,Gateway 将其翻译为 MCP 协议调用后端服务。客户端 → API Gateway → MCP Client → MCP Server │ ├─ REST → MCP Tool Call ├─ GraphQL → MCP Resource Query └─ SSE → MCP Streamable HTTP注意:MCP 2026 规范已将 HTTP Transport 升级为 Streamable HTTP,支持长连接和流式响应,Gateway 需要支持 SSE 透传。模式二:MCP Server 作为 Sidecar对于已有微服务,不需要重写代码。采用 Sidecar 模式,在现有服务旁边部署一个 MCP 适配器,将微服务的 API 翻译为 MCP 协议:┌──────────────────────┐│ Pod ││ ┌───────┐ ┌───────┐││ │ User │ │ MCP │││ │ Svc │←│Proxy │││ │:8080 │ │:3000 │││ └───────┘ └───────┘│└──────────────────────┘这种模式的优势:零侵入改造现有服务,MCP Proxy 独立升级,不影响业务逻辑。# Sidecar 模式的 MCP Proxyfrom mcp.server import Serverimport httpxproxy = Server("user-service-mcp-proxy")backend = httpx.AsyncClient(base_url="http://localhost:8080")@proxy.tool()async def get_user(user_id: str) -> dict: # 代理到用户微服务的 GET /users/{id} resp = await backend.get(f"/users/{user_id}") return resp.json()@proxy.tool()async def update_user(user_id: str, data: dict) -> dict: # 代理到用户微服务的 PUT /users/{id} resp = await backend.put(f"/users/{user_id}", json=data) return resp.json()模式三:事件驱动的 MCP 编排微服务间的异步通信通过消息队列。MCP 可以订阅 Kafka Topic,当事件到达时主动推送上下文给 AI 应用:# MCP Server 订阅 Kafka 事件from aiokafka import AIOKafkaConsumerimport json@order_server.resource("order://events/recent")async def recent_order_events() -> str: # 返回最近的订单事件流 consumer = AIOKafkaConsumer( "order-events", bootstrap_servers="kafka:9092", auto_offset_reset="latest" ) await consumer.start() events = [] async for msg in consumer: events.append(json.loads(msg.value)) if len(events) >= 10: break await consumer.stop() return json.dumps(events, ensure_ascii=False)安全:MCP 微服务必须解决的三个问题1. 服务间认证MCP 2026 规范的 Auth 机制已从草案进入正式版。每个 MCP Server 必须验证 Client 的 OAuth 2.0 Token,而非简单信任网络边界:from mcp.server import Serverfrom authlib.integrations.starlette import OAuth2Proxyserver = Server("secure-service", auth_provider=OAuth2Proxy( issuer="https://auth.example.com", audience="mcp-api"))# 未认证的请求将被拒绝@server.tool()async def sensitive_operation(data: dict) -> dict: # 只有携带有效 Token 的 Client 才能调用 return await process(data)2. 上下文隔离MCP 的安全边界设计要求:Server 不能读取完整对话历史,只能收到必要的上下文。在微服务环境中,这意味着每个 MCP Server 只能访问自己业务域的数据,跨域调用必须通过 Client 编排。3. 限流与熔断AI 调用工具的频率可能远超人类操作。MCP Server 必须实现限流,防止 AI Agent 的重试风暴击穿后端服务:from circuitbreaker import circuit@circuit(failure_threshold=5, recovery_timeout=30)@server.tool()async def query_database(sql: str) -> list: # 带熔断的数据库查询 return await db.execute(sql)可观测性:追踪 AI 调用链在微服务中追踪一次用户请求已经复杂,加入 MCP 后链路更长。需要端到端的 Trace ID 贯穿:用户请求 → API Gateway → MCP Client → MCP Server → 后端微服务。from opentelemetry import trace, contexttracer = trace.get_tracer("mcp-service")@server.tool()async def create_order(user_id: str, items: list) -> dict: with tracer.start_as_current_span("mcp.tool.create_order") as span: span.set_attribute("mcp.server", "order-service") span.set_attribute("mcp.tool", "create_order") span.set_attribute("user.id", user_id) result = await order_service.create(user_id, items) span.set_attribute("order.id", result["id"]) return result决策清单:什么时候用 MCP 微服务| 场景 | 建议 ||------|------|| 单个 AI 应用 + 少于 5 个工具 | 不需要微服务,单体 MCP Server 足够 || 多个 AI 应用共享工具集 | 拆分为独立 MCP Server,统一注册 || 工具调用需要独立扩缩容 | 按业务域拆分 MCP Server,独立部署 || 已有微服务需要 AI 化 | Sidecar 模式,零侵入接入 || AI Agent 需要跨域编排 | MCP Client 聚合上下文,统一编排 |总结MCP 与微服务的结合不是简单叠加,而是架构层面的对齐:MCP 的客户端-服务器模型对应微服务的服务拆分与通信,MCP 的工具/资源/提示对应微服务的 API/数据/事件。核心原则是按业务域拆分 MCP Server、用 Sidecar 零侵入改造现有服务、通过 MCP Client 编排跨域调用。安全上必须启用 OAuth 2.0 认证和上下文隔离,可观测性上要保证 AI 调用链的端到端追踪。
服务端阅读 05月28日 06:42

MCP 工具定义和参数验证怎么做?

MCP(Model Context Protocol)的工具定义和参数验证,是构建可靠 MCP 服务的基础。如果工具定义不清晰或参数验证不严格,LLM 调用时可能传入错误参数,导致服务崩溃或返回异常结果。本文从 MCP 协议规范出发,详解工具定义结构、JSON Schema 参数约束、Pydantic 验证实践,以及安全防护要点。工具定义的核心结构MCP 工具由三个必填字段组成:name(工具名称)、description(功能描述)和 inputSchema(输入参数的 JSON Schema)。{ "name": "query_database", "description": "在指定数据库中执行 SQL 查询,仅支持 SELECT 语句。结果以表格形式返回,最多 1000 行,超时 30 秒。", "inputSchema": { "type": "object", "properties": { "sql": { "type": "string", "description": "SQL 查询语句,仅支持 SELECT", "minLength": 1, "maxLength": 5000 }, "database": { "type": "string", "enum": ["production", "staging", "analytics"], "description": "目标数据库名称" } }, "required": ["sql"], "additionalProperties": false }}关键要点:additionalProperties: false 是安全必备项。不加这个约束,LLM 可能传入预期之外的参数字段,引发未知行为description 要具体且包含边界信息:说明工具能做什么、不能做什么、返回格式、超时限制等,帮助 LLM 准确选择工具enum 约束枚举值:对于有限选项的参数,用 enum 替代自由字符串,减少 LLM 幻觉导致的错误调用JSON Schema 支持的参数类型MCP 的 inputSchema 遵循 JSON Schema 规范,支持以下类型:| 类型 | 说明 | 常用约束 ||------|------|----------|| string | 字符串 | minLength, maxLength, pattern, format, enum || number | 数值(整数或浮点) | minimum, maximum, exclusiveMinimum, exclusiveMaximum || integer | 整数 | minimum, maximum, multipleOf || boolean | 布尔值 | default || array | 数组 | minItems, maxItems, items, uniqueItems || object | 对象 | properties, required, additionalProperties |嵌套对象示例实际场景中,参数经常需要嵌套结构:{ "name": "search_documents", "inputSchema": { "type": "object", "properties": { "query": { "type": "string", "description": "搜索关键词", "minLength": 1, "maxLength": 500 }, "filters": { "type": "object", "properties": { "category": {"type": "string"}, "date_range": { "type": "object", "properties": { "start": {"type": "string", "format": "date"}, "end": {"type": "string", "format": "date"} }, "required": ["start", "end"], "additionalProperties": false } }, "additionalProperties": false }, "sort": { "type": "string", "enum": ["relevance", "date", "popularity"], "default": "relevance" }, "limit": { "type": "integer", "minimum": 1, "maximum": 100, "default": 10 } }, "required": ["query"], "additionalProperties": false }}注意每一层 object 都应设置 additionalProperties: false,包括嵌套的 filters 和 date_range。用 Pydantic 实现参数验证手写验证器容易遗漏边界条件,MCP Python SDK 推荐使用 Pydantic 自动生成 schema 并完成验证:from pydantic import BaseModel, Field, field_validatorfrom enum import Enumclass DatabaseName(str, Enum): PRODUCTION = "production" STAGING = "staging" ANALYTICS = "analytics"class QueryParams(BaseModel): sql: str = Field( ..., min_length=1, max_length=5000, description="SQL 查询语句,仅支持 SELECT" ) database: DatabaseName = Field( default=DatabaseName.PRODUCTION, description="目标数据库名称" ) @field_validator("sql") @classmethod def validate_sql_read_only(cls, v: str) -> str: upper = v.strip().upper() forbidden = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "CREATE"] for keyword in forbidden: if keyword in upper: raise ValueError(f"不允许执行 {keyword} 操作,仅支持 SELECT 查询") return v# Pydantic 模型自动生成 JSON Schemaschema = QueryParams.model_json_schema()Pydantic 的优势:类型自动推断:Python 类型注解直接映射为 JSON Schema 类型验证规则内聚:约束和自定义验证逻辑与数据模型放在一起,维护方便错误信息清晰:验证失败时返回结构化错误,包含字段路径和具体原因与 MCP SDK 集成:@mcp.tool() 装饰器可直接使用 Pydantic 模型作为参数类型与 MCP SDK 集成示例from mcp.server.fastmcp import FastMCPmcp = FastMCP("database-service")@mcp.tool()async def query_database(params: QueryParams) -> str: """在指定数据库中执行 SQL 查询""" # params 已经过 Pydantic 验证,可以直接使用 result = await execute_sql(params.sql, params.database.value) return format_result(result)format 格式验证JSON Schema 的 format 字段可以对字符串做语义验证:from pydantic import BaseModel, Field, EmailStr, HttpUrlfrom datetime import dateclass UserProfile(BaseModel): email: EmailStr = Field(description="用户邮箱") website: HttpUrl = Field(default=None, description="个人网站") birthday: date = Field(description="出生日期")常用 format 值:| format | 说明 | 示例 ||--------|------|------|| date | 日期 | 2026-05-28 || date-time | 日期时间 | 2026-05-28T10:30:00Z || email | 邮箱地址 | user@example.com || uri | URI | https://example.com || uuid | UUID | 550e8400-e29b-41d4-a716-446655440000 |工具调用的错误处理参数验证失败时,应返回结构化错误信息,而非抛出异常让服务崩溃:from mcp.server.fastmcp import FastMCPfrom pydantic import ValidationErrormcp = FastMCP("database-service")@mcp.tool()async def query_database(sql: str, database: str = "production") -> str: """在指定数据库中执行 SQL 查询,仅支持 SELECT""" try: params = QueryParams(sql=sql, database=database) except ValidationError as e: return f"参数验证失败: {e.json()}" try: result = await execute_sql(params.sql, params.database.value) return format_result(result) except TimeoutError: return "查询超时(30 秒限制),请简化查询或添加更严格的过滤条件" except Exception as e: return f"查询执行失败: {type(e).__name__}: {str(e)}"安全防护要点工具定义和参数验证不只是功能需求,更是安全防线。LLM 生成的参数不可信任,以下实践必须遵守:每个 object 都加 additionalProperties: false:防止 LLM 注入未定义字段字符串参数设 maxLength:防止超长输入导致缓冲区溢出或 DoS数值参数设 minimum/maximum:防止极端值导致计算异常敏感操作加确认机制:涉及写入、删除等操作,在服务端实现二次确认对工具定义做哈希校验:防止恶意 MCP 服务在审核后篡改工具 schema(tool poisoning 攻击)日志记录所有调用:记录工具名、参数(脱敏)、结果、时间戳,便于审计追踪总结| 要点 | 做法 ||------|------|| 工具定义 | 必填 name + description + inputSchema,description 要具体 || 参数约束 | 用 JSON Schema 的 type、enum、minLength 等做声明式约束 || 代码验证 | 优先用 Pydantic 模型,自动生成 schema + 运行时验证 || 安全防护 | additionalProperties: false + maxLength + 哈希校验 + 审计日志 || 错误处理 | 捕获 ValidationError,返回结构化错误,不要让服务崩溃 |
服务端阅读 05月28日 06:41

MCP 协议到底是什么?一文讲透 Model Context Protocol 原理与实战

MCP(Model Context Protocol,模型上下文协议)是 Anthropic 于 2024 年底推出的开放标准协议,现已成为 AI 应用与外部工具、数据源交互的事实标准。2025 年底 MCP 捐赠给 Linux 基金会后,OpenAI、Google、Microsoft、AWS 等厂商全面支持,SDK 月下载量突破 9700 万,GitHub Stars 超过 81000。如果把 AI 模型比作电脑,那 MCP 就是 USB-C 接口——一个通用连接器,让你写一次工具就能在所有 AI 客户端上运行。MCP 解决了什么问题?在 MCP 出现之前,每接入一个新工具,开发者就要为每个 AI 模型单独写一套集成代码。如果你有 M 个模型和 N 个工具,就要维护 M×N 个集成。MCP 把这个复杂度降到了 M+N——模型端和工具端各实现一次协议即可。核心痛点:每个工具厂商各自为政,集成方式不统一相同功能在不同 AI 客户端上需要重复开发工具发现、权限管理、错误处理缺少标准MCP 通过定义统一的通信协议、工具注册规范和资源发现机制,一次性解决了这些问题。三大核心原语MCP 定义了三种基本能力单元,所有功能都围绕它们构建:1. Tools(工具)Tools 是模型可以调用的操作,有副作用。比如查询数据库、发送消息、创建文件。{ "name": "query_database", "description": "执行 SQL 查询并返回结果", "inputSchema": { "type": "object", "properties": { "sql": { "type": "string", "description": "SQL 查询语句" } }, "required": ["sql"] }}2. Resources(资源)Resources 是只读的上下文数据,模型可以主动拉取。比如代码仓库内容、数据库表结构、日志文件。{ "uri": "file:///project/README.md", "name": "项目说明文档", "mimeType": "text/markdown"}3. Prompts(提示模板)Prompts 是可复用的提示词模板,支持参数化。比如代码审查模板、Bug 分析模板。{ "name": "code_review", "description": "代码审查模板", "arguments": [ { "name": "language", "required": true }, { "name": "focus_area", "required": false } ]}技术架构详解MCP 的通信基于 JSON-RPC 2.0,整体架构分为三层:┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ Host │ │ Server │ │ Data Source ││ (AI 客户端) │◄───►│ (MCP 服务) │◄───►│ (外部系统) │└─────────────┘ └─────────────┘ └─────────────┘ Client MCP APIHost(宿主):运行 AI 模型的应用程序,如 Claude Desktop、Cursor、Cline。一个 Host 可以同时连接多个 Server。Client(客户端):Host 内部与 Server 建立连接的组件,每个 Client 维护一个与 Server 的会话。Server(服务端):提供 Tools、Resources、Prompts 的程序,暴露能力给 Client 使用。传输方式MCP 支持两种传输方式:stdio:本地开发场景,Server 作为子进程运行,通过标准输入输出通信Streamable HTTP:生产环境场景,Server 作为独立服务运行,支持负载均衡和 CDN 部署Streamable HTTP 是 2025 年 11 月引入的新传输方式,取代了早期的 SSE 方案,更契合云原生架构。认证机制2025 年 6 月起,MCP 规范引入 OAuth 2.1 作为远程 Server 的标准认证方案。这意味着:远程 MCP Server 不再使用裸 API Key客户端通过 OAuth 2.1 流程获取 access token支持细粒度的权限范围控制(scope)MCP vs Function Calling:有什么区别?这是一个常见误区。MCP 和 Function Calling 不是竞争关系:| 对比维度 | Function Calling | MCP ||---------|-----------------|-----|| 定位 | 模型 API 层 | 集成协议层 || 作用 | 让模型输出结构化调用 | 标准化工具注册和通信 || 范围 | 单个模型调用 | 跨客户端、跨工具 || 关系 | 底层能力 | 上层协议 |简单说,Function Calling 是模型知道"怎么调用工具",MCP 是定义了"工具怎么被所有模型发现和调用"。现代架构中两者配合使用。快速上手:5 分钟搭建一个 MCP Server以 Python 为例,使用 FastMCP 框架:from mcp.server.fastmcp import FastMCPmcp = FastMCP("my-tools")@mcp.tool()def get_weather(city: str) -> str: """获取指定城市的天气信息""" return f"{city}:晴,25°C"@mcp.resource("config://app")def get_config() -> str: """返回应用配置""" return '{"version": "1.0", "debug": false}'@mcp.prompt()def code_review(code: str) -> str: """代码审查提示模板""" return f"请审查以下代码,关注安全性和性能:\n\n{code}"if __name__ == "__main__": mcp.run()在 Claude Desktop 的配置文件中添加:{ "mcpServers": { "my-tools": { "command": "python", "args": ["server.py"] } }}重启 Claude Desktop 后,模型就能自动发现并使用你定义的工具了。2026 年生态现状MCP 生态在 2026 年已非常成熟:主流 AI 客户端支持:Claude Desktop、Claude Code、Cursor、Cline、Continue、OpenClaw、Codex、Zed、Chatbox 等全部原生支持 MCP。开源 Server 生态:500+ 个公开 MCP Server,覆盖 GitHub、Slack、PostgreSQL、Stripe、Figma、Docker、Kubernetes、Notion、Linear、Chrome DevTools 等主流工具。2026 路线图方向:无状态化。当前 Server 是有状态的,每个连接维持 session,限制了横向扩展。2026 年将把 session 的创建、恢复、迁移标准化,使 MCP Server 能像无状态 Web 服务一样水平扩展。常见问题MCP 是免费的吗?是的,MCP 是开源协议,任何人都可以免费使用和实现。MCP 只支持 Claude 吗?不是。MCP 是开放标准,所有主流 AI 客户端都已支持。MCP 和 LangChain 的区别?LangChain 是应用框架,提供编排和链式调用;MCP 是通信协议,定义工具如何被发现和调用。两者互补。远程 MCP Server 安全吗?2025 年规范已引入 OAuth 2.1 认证,安全性有标准保障。但安全责任在 Host 端——Host 控制用户授权、凭证范围和工具白名单。
服务端阅读 05月28日 06:40

MCP 的生态系统包含哪些组件和工具?

MCP(Model Context Protocol)的生态系统在 2026 年已从实验性协议成长为 AI 工具集成的基础设施层。本文基于最新数据,系统梳理 MCP 生态中的核心组件与工具。协议核心:三大原语MCP 服务器通过三种原语向外暴露能力:Resources(资源):LLM 可访问的只读数据源,如数据库连接、文件系统内容。资源由 URI 标识,客户端可按需订阅更新。Tools(工具):LLM 可调用的函数,支持结构化输入输出、外部系统集成。每个工具声明名称、描述和 JSON Schema 参数定义。Prompts(提示模板):可复用的提示模板,支持参数化,主机可渲染后发送给 LLM。最新稳定协议版本为 2025-11-25,关键更新包括 OAuth 2.1 认证、结构化输出、工具中的资源链接,以及 Streamable HTTP 替代 SSE 传输。官方 SDKMCP 提供多语言官方 SDK,由不同技术伙伴协作维护:| SDK | 维护方 | 成熟度 | 说明 ||-----|--------|--------|------|| TypeScript SDK | Anthropic | 稳定(v1.x) | 下载量最高,v2 预计 2026 Q1 稳定发布 || Python SDK | Anthropic | 稳定 | 适合快速脚本和数据科学场景 || Kotlin SDK | JetBrains | 稳定 | 面向 JVM 生态,与 JetBrains IDE 深度集成 || C# SDK | Microsoft | 稳定 | .NET 生态官方支持 || Go SDK | Google | 开发中 | 稳定版预计 2026 年 8 月发布,围绕核心包 mcp 构建 |所有 SDK 均在 GitHub modelcontextprotocol 组织下开源。截至 2026 年 3 月,SDK 月下载量已达 9700 万次。服务器生态注册表与目录MCP 服务器通过多个注册表发现和分发:PulseMCP:11,840+ 服务器,人工审核目录,质量过滤严格Glama:21,000+ 服务器,最大体量目录,支持可视化预览Smithery:7,000+ 服务器,类应用商店界面,支持一键安装和远程托管Official MCP Registry:约 2,000 服务器,Anthropic 支持的官方注册表,面向程序化发现MCP.so:19,700+ 社区提交服务器按场景分类的主流服务器开发与 DevOps:GitHub MCP(代码搜索、PR 管理)、Docker Hub(容器管理)、Kubernetes(集群状态、部署)、Jira/Linear(工单管理)、Datadog/Grafana(可观测性)数据与数据库:PostgreSQL/MySQL/SQLite(直连查询与 Schema 检查)、Snowflake/BigQuery(云数仓)、dbt(数据模型文档)生产力与协作:Slack MCP(OAuth 2.1,Slack 官方托管)、Google Drive MCP(OAuth 2.1)、Notion MCP(OAuth 2.1)、Microsoft 365/Outlook MCPCRM 与业务应用:HubSpot MCP(OAuth 2.1)、Salesforce MCP(OAuth 2.1)、Linear MCP(OAuth 2.1 托管)AI 与 LLM 工具:Anthropic Claude(元 MCP,支持 Claude-in-Claude 工作流)、OpenAI MCP(GPT 模型 API 访问)、Pinecone/Weaviate(向量数据库 RAG 工作流)搜索与信息获取:Exa MCP(当前使用最广泛的搜索服务器,专为 AI Agent 语义查询设计)OAuth 2.1 已成为 CRM 和生产力类服务器的标准认证模式,尤其是由服务商自身托管的官方服务器。客户端生态截至 2026 年,已有 300+ MCP 客户端,覆盖主流编辑器、聊天应用和企业平台:编辑器集成:VS Code(GitHub Copilot)、Cursor、Windsurf、Zed、JetBrains AI聊天应用:Claude Desktop、ChatGPT开发平台:Replit、Continue、Sourcegraph Cody云平台:AWS Bedrock、Azure AI、GCP Vertex 均提供托管 MCP 端点基础设施组件随着生态规模化,围绕 MCP 的基础设施层也在快速形成:MCP Gateway:代理层,为 MCP 流量添加认证、限流、缓存和可观测性MCP 开发平台:无需管理基础设施即可构建、测试和部署 MCP 服务器MCP 安全产品:服务器发现、权限审计与合规工具MCP Inspector:官方可视化测试与调试工具治理与标准化2025 年 12 月,MCP 治理权移交给 Linux 基金会 下的 Agentic AI Foundation (AAIF),标志着 MCP 从 Anthropic 内部项目转变为厂商中立的开放标准。2026 年路线图重点方向包括:无状态重构:2026-07-28 发布候选版本,为长期发展奠定基础技能发现与分发:实验性技能扩展,通过 MCP 原语实现技能的自动发现MCP Apps 协议:嵌入 AI 聊天机器人的 UI 标准多模态交互:整合语音、手势、眼动追踪等交互方式市场规模MCP 服务器市场预计到 2026 年达到 104 亿美元,年复合增长率 24.7%。超过 10,000 个公开 MCP 服务器,80% 的财富 500 强企业通过 MCP 部署 AI Agent 生产工作流。如何选择和使用发现服务器:质量优先用 PulseMCP,覆盖面优先用 Glama,一键安装用 Smithery选择 SDK:Web 项目选 TypeScript,数据/脚本选 Python,JVM 选 Kotlin,.NET 选 C#测试调试:使用 MCP Inspector 可视化检查服务器行为部署上线:使用 Docker 镜像或 Smithery 托管方案,OAuth 2.1 保护端点安全MCP 生态系统的快速成熟,使其成为连接 AI 模型与外部工具事实上的标准协议。无论你是构建 AI 应用的开发者,还是寻求 AI 集成的企业,MCP 生态都已具备生产可用的完整工具链。
服务端阅读 05月28日 06:34

MCP 的版本管理和兼容性如何处理?

MCP 版本号机制与 SemVer 的区别MCP 协议没有采用常见的语义化版本号(SemVer,如 1.2.3),而是使用 日期格式版本号,例如 2025-03-26、2024-11-05。这种选择并非随意,而是反映了协议的迭代节奏:版本号直接标识该版本最后一次引入破坏性变更的日期。MCP 规范定义了三种版本状态:| 状态 | 含义 ||------|------|| Draft | 草稿阶段,尚未准备好用于生产 || Current | 当前版本,可正常使用,仍可接收向后兼容的更新 || Final | 已完结的历史版本,不再变更 |关键区别在于:当更新保持向后兼容时,协议版本号不会递增。这意味着同一版本号下的规范可能在细节上有所完善,但不会破坏已有实现。只有引入不兼容变更时,才会产生新的日期版本号。初始化阶段的版本协商流程MCP 的版本协商发生在客户端与服务器的 初始化握手阶段,这是连接建立后的第一个交互。整个流程如下:客户端发送 initialize 请求,其中包含 protocolVersion 字段,声明自己支持的协议版本服务器收到请求后,从自身支持的版本列表中选择一个兼容版本,在响应中返回该版本号和自身的能力声明客户端收到响应后,如果支持服务器返回的版本,发送 initialized 通知确认就绪如果客户端不支持服务器返回的版本,应当主动断开连接// 客户端发送 initialize 请求{ "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2025-03-26", "capabilities": { "tools": { "listChanged": true }, "resources": { "subscribe": true } }, "clientInfo": { "name": "my-mcp-client", "version": "1.0.0" } }}// 服务器响应{ "protocolVersion": "2025-03-26", "capabilities": { "tools": { "listChanged": true }, "resources": {} }, "serverInfo": { "name": "my-mcp-server", "version": "2.1.0" }}需要注意的是:在初始化握手完成之前,双方不能发送任何功能请求(如列出工具、读取资源等)。对于 HTTP 传输,服务器还要求客户端在后续所有请求中携带 MCP-Protocol-Version 头,以便中间代理正确路由。能力协商:比版本号更灵活的兼容机制版本号只能粗粒度地判断兼容性,MCP 引入了 能力协商(Capabilities Negotiation) 来实现更精细的兼容控制。在初始化握手时,客户端和服务器各自声明自己支持的能力集合(capabilities)。只有双方都声明支持的功能,才能在会话中使用。这意味着:新版客户端连接旧版服务器时,新版功能自动禁用,但已有功能正常工作旧版客户端连接新版服务器时,服务器不会调用客户端不支持的特性双方可以独立演进,只要遵守"不使用对方未声明的能力"这一约定// 能力协商后的安全调用模式interface NegotiatedCapabilities { client: ClientCapabilities; server: ServerCapabilities;}function safeCall( caps: NegotiatedCapabilities, feature: string): boolean { // 只有双方都支持才返回 true return caps.client[feature] !== undefined && caps.server[feature] !== undefined;}这种设计让 MCP 的兼容性保障从"版本号对齐"升级为"能力对齐",大幅降低了版本升级的摩擦。正式弃用策略与功能生命周期MCP 在 2026 年引入了正式的弃用策略(SEP-2596),建立了明确的三阶段功能生命周期:Active → Deprecated → Removed核心规则:最小弃用窗口期为 12 个月——从标记为 Deprecated 到最早可能被移除,至少间隔一年弃用仅为"注解级别"(annotation-only):被标记为 Deprecated 的方法、类型和能力标志在当前版本和弃用后一年内的所有规范版本中继续正常工作每个弃用项必须提供明确的替代方案以 2026-07-28 发布候选版本为例,三个核心功能被标记为弃用:| 被弃用功能 | 替代方案 ||-----------|---------|| roots | 使用工具参数、资源 URI 或服务器配置 || sampling | 直接集成 LLM 提供商 API || logging | stdio 传输使用 stderr;结构化可观测性使用 OpenTelemetry |这种渐进式弃用策略让开发者有充足时间迁移,而不是突然面对破坏性变更。向后兼容性保证与破坏性变更处理MCP 1.x 系列承诺向后兼容性,以下核心组件被认为是稳定的:核心消息格式(JSON-RPC 2.0)传输协议(stdio、Streamable HTTP)工具/资源/提示的基本生命周期错误类型层级能力协商机制当必须引入破坏性变更时,MCP 采用以下处理方式:1. 双格式过渡期在传输协议迁移中(如从 HTTP+SSE 迁移到 Streamable HTTP),客户端被期望在过渡期内同时支持新旧两种格式,确保不会因服务器升级而断连。2. 错误码迁移错误码的变更也遵循渐进策略。例如,资源缺失的错误码从 MCP 自定义的 -32002 迁移到 JSON-RPC 标准的 -32602(Invalid Params),通过 SEP 流程明确迁移时间和方式。3. 扩展优先原则新功能优先作为可选扩展发布,而非直接加入核心规范。扩展不会修改或破坏已有核心功能,开发者可以选择性采用。扩展框架如何降低版本升级成本MCP 的扩展框架(Extensions Framework)是版本管理的重要组成部分,它让协议能够在不频繁升级核心版本的前提下持续演进。扩展具备四个关键特性:可选采用:服务器和客户端实现者可以自行决定是否采纳某个扩展纯粹增量:扩展只增加功能,不修改也不破坏核心协议功能独立版本控制:扩展可以跟随核心 MCP 版本周期,也可以按需独立发版可组合:扩展之间模块化设计,可以同时使用多个扩展而不冲突这种架构意味着:当你只需要核心功能时,版本升级的压力很小;当你需要新特性时,通过扩展获取,而不必等待核心规范的大版本更新。生产环境中的版本迁移实践在实际项目中处理 MCP 版本迁移时,以下策略被证明是有效的:1. 版本范围锁定在项目配置中明确声明兼容的协议版本范围,而非依赖最新版本:{ "mcp": { "protocolVersion": "2025-03-26", "minProtocolVersion": "2024-11-05" }}2. 兼容性测试集成将 MCP 版本兼容性测试纳入 CI/CD 流程,在部署前自动验证客户端与目标服务器版本的兼容性。重点关注:初始化握手是否成功、能力协商后双方功能是否正常、已弃用功能是否有替代方案。3. 弃用监控建立弃用功能清单,定期对照 MCP 规范更新检查项目中是否使用了已弃用的 API。利用 MCP 服务器在 initialize 响应中返回的版本信息,自动检测弃用风险。4. 渐进式迁移当新版本发布时,不要一次性全量迁移。先在新实例上验证新版本的兼容性,确认无问题后再逐步替换旧实例。对于 HTTP 传输场景,确保所有客户端都更新到支持新版本后再下线旧版服务器。5. 关注 SEP 提案MCP 的变更通过 Specification Enhancement Proposals(SEP)提出和推进。关注与自身项目相关的 SEP,可以在变更正式生效前提前准备,避免被动应对。
服务端阅读 05月28日 06:32

如何对 MCP Server 进行测试?测试策略与最佳实践详解

MCP(Model Context Protocol)作为 AI 应用与外部工具交互的标准协议,在 2026 年已获得广泛采用——SDK 月下载量超过 9700 万次,Anthropic、OpenAI、Google、Microsoft、AWS 等主流厂商均已支持。随着 MCP Server 数量激增,如何系统地测试 MCP Server 成为开发者必须掌握的技能。本文将从测试层次、实战工具、代码示例和最佳实践四个维度,详细介绍 MCP Server 的测试策略。一、MCP 测试的层次结构有效的 MCP 测试应遵循测试金字塔原则,从底层到顶层依次覆盖:| 层次 | 目标 | 速度 | 覆盖范围 ||------|------|------|----------|| 单元测试 | 验证单个 Tool/Resource 函数的逻辑 | 快(毫秒级) | 代码路径 || 协议测试 | 验证 Server 是否正确实现 MCP 协议 | 中 | 协议边界 || 集成测试 | 验证 Client-Server 端到端通信 | 较慢 | 交互链路 || 性能测试 | 验证并发和负载下的稳定性 | 慢 | 性能指标 || 安全测试 | 验证认证、授权和注入防护 | 中 | 安全边界 |关键原则:大量单元测试打基础,适量协议测试守边界,少量端到端测试保信心。二、单元测试:直接测试 Tool 函数单元测试是最快的反馈环。对于 MCP Server,你无需启动真实服务器,直接测试 @mcp.tool() 装饰的函数即可。2.1 安装依赖pip install mcp pytest pytest-asyncio2.2 被测 Server 示例# server.pyfrom mcp.server.fastmcp import FastMCP, ToolErrorfrom typing import Optionalmcp = FastMCP("calculator")@mcp.tool()def add(a: int, b: int) -> int: """Add two numbers together.""" return a + b@mcp.tool()def divide(a: float, b: float) -> float: """Divide a by b. Raises error if b is zero.""" if b == 0: raise ToolError("Division by zero is not allowed") return a / b@mcp.tool()def search_documents(query: str, max_results: Optional[int] = 10) -> list: """Search documents by query string.""" return [{"id": 1, "title": f"Result for {query}", "score": 0.95}]2.3 单元测试编写# test_unit.pyimport pytestfrom server import add, divide, search_documentsdef test_add(): assert add(2, 3) == 5 assert add(-1, 1) == 0def test_add_type_coercion(): with pytest.raises(Exception): add("not_a_number", 1)def test_divide(): assert divide(10, 2) == 5.0def test_divide_by_zero(): from mcp.server.fastmcp import ToolError with pytest.raises(ToolError, match="Division by zero"): divide(1, 0)@pytest.mark.parametrize("query,expected_len", [ ("test", 1), ("MCP protocol", 1),])def test_search_documents(query, expected_len): results = search_documents(query) assert len(results) == expected_len assert "title" in results[0]要点:单元测试直接调用 Python 函数,不走 MCP 协议,速度极快。重点覆盖正常路径、边界条件和错误处理。三、协议测试:使用 FastMCP Client 验证协议边界单元测试无法发现协议层的问题——比如 Tool 注册是否正确、参数 Schema 是否完整、返回格式是否符合规范。这时需要用 FastMCP Client 进行协议级测试。3.1 In-Process 协议测试FastMCP 提供了 FastMCPTransport,允许在不启动真实服务器的情况下,通过内存中的协议层进行测试:# test_protocol.pyimport pytestfrom mcp.server.fastmcp import FastMCPfrom fastmcp.client import Clientfrom fastmcp.client.transports import FastMCPTransportfrom server import mcp@pytest.fixtureasync def mcp_client(): """创建连接到真实 Server 实例的 Client""" async with Client(mcp) as client: yield client@pytest.mark.asyncioasync def test_tool_registration(mcp_client): """验证所有 Tool 都正确注册""" tools = await mcp_client.list_tools() tool_names = [t.name for t in tools] assert "add" in tool_names assert "divide" in tool_names assert "search_documents" in tool_names@pytest.mark.asyncioasync def test_tool_schema(mcp_client): """验证 Tool 的参数 Schema 描述正确""" tools = await mcp_client.list_tools() add_tool = next(t for t in tools if t.name == "add") assert "properties" in add_tool.inputSchema assert "a" in add_tool.inputSchema["properties"] assert "b" in add_tool.inputSchema["properties"]@pytest.mark.asyncioasync def test_call_tool_via_protocol(mcp_client): """通过协议层调用 Tool 并验证返回""" result = await mcp_client.call_tool("add", arguments={"a": 2, "b": 3}) assert result.data is not None@pytest.mark.asyncioasync def test_tool_error_handling(mcp_client): """验证 ToolError 是否正确通过协议传播""" with pytest.raises(Exception): await mcp_client.call_tool("divide", arguments={"a": 1, "b": 0})要点:FastMCPTransport 让你在进程内完成协议级验证,无需启动子进程或网络端口,兼顾了速度和真实性。3.2 Snapshot 测试:锁定 Tool 接口使用 pytest-inline-snapshot 可以锁定 Tool 列表和 Schema,防止意外变更:# test_snapshot.pyfrom inline_snapshot import snapshot@pytest.mark.asyncioasync def test_tool_list_unchanged(mcp_client): """Tool 列表不应发生意外变更""" tools = await mcp_client.list_tools() tool_info = [(t.name, t.description) for t in tools] assert tool_info == snapshot()首次运行时使用 pytest --inline-snapshot=fix,create 生成快照,后续运行会自动比对。四、集成测试:使用 MCP Inspector 交互式验证MCP Inspector 是官方提供的交互式测试工具,适合开发阶段手动验证 Server 行为。4.1 启动 Inspector# 方式一:通过 MCP CLI(推荐)mcp dev server.py# 方式二:直接使用 npxnpx @modelcontextprotocol/inspector@latest python server.pyInspector 会自动打开浏览器界面,提供以下功能:Tools 面板:列出所有注册的 Tool,支持逐个测试调用Resources 面板:查看和读取 Server 暴露的资源Prompts 面板:测试预定义的 Prompt 模板消息日志:实时查看 JSON-RPC 请求和响应4.2 使用配置文件测试对于需要环境变量的 Server,可以创建配置文件:{ "mcpServers": { "my-server": { "command": "python", "args": ["server.py"], "env": { "API_KEY": "test-key-123" } } }}在 Inspector 中加载此配置,即可模拟真实部署环境。五、性能测试:验证并发和负载能力AI Agent 可能会快速连续调用 Tool,你的 Server 需要承受并发压力。5.1 使用 locust 进行负载测试# locustfile.pyfrom locust import HttpUser, task, betweenclass MCPLoadTest(HttpUser): wait_time = between(0.5, 2) host = "http://localhost:3001" @task(3) def call_add_tool(self): """测试 add Tool 的并发调用""" payload = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": "add", "arguments": {"a": 1, "b": 2} }, "id": 1 } self.client.post("/mcp", json=payload) @task(1) def list_tools(self): """测试 Tool 列表查询""" payload = { "jsonrpc": "2.0", "method": "tools/list", "params": {}, "id": 2 } self.client.post("/mcp", json=payload)运行方式:locust -f locustfile.py --users 50 --spawn-rate 105.2 关键性能指标| 指标 | 建议阈值 | 说明 ||------|----------|------|| 单次 Tool 调用延迟 | < 500ms | AI Agent 对延迟敏感 || 并发 50 用户 P99 | < 2s | 峰值场景下的兜底 || 错误率 | < 1% | 稳定性基线 || 内存泄漏 | 无 | 长时间运行不增长 |六、安全测试:认证、授权与注入防护MCP 的动态交互特性引入了独特的安全挑战。根据 MCP 安全规范,需关注六大威胁向量。6.1 认证测试# test_security.pyimport pytest@pytest.mark.asyncioasync def test_unauthorized_access(): """未认证请求应被拒绝""" # 使用无效 token 的 Client 调用受保护 Tool # 预期返回 401/403 pass # 根据实际认证中间件实现@pytest.mark.asyncioasync def test_token_validation(): """过期和无效 token 应被正确识别""" pass # 根据实际认证方案实现6.2 输入注入测试@pytest.mark.asyncioasync def test_path_traversal_prevention(mcp_client): """路径遍历攻击应被阻止""" malicious_paths = [ "../../../etc/passwd", "..\\..\\windows\\system32", "/proc/self/environ" ] for path in malicious_paths: result = await mcp_client.call_tool( "read_file", arguments={"filepath": path} ) assert "error" in str(result).lower() or result.isError6.3 速率限制测试@pytest.mark.asyncioasync def test_rate_limiting(): """超出速率限制应返回 429""" # 快速发送大量请求 # 验证在限制阈值后收到 429 Too Many Requests pass # 根据实际限流策略实现七、Mock 外部依赖当 Tool 依赖外部 API 时,使用 Mock 隔离测试:# test_with_mocks.pyimport pytestfrom unittest.mock import AsyncMock, patch@pytest.mark.asyncioasync def test_search_with_mocked_api(mcp_client): """使用 Mock 测试依赖外部 API 的 Tool""" mock_response = [{"id": 1, "title": "Mocked Result"}] with patch("server.httpx.AsyncClient.get") as mock_get: mock_get.return_value.json.return_value = mock_response result = await mcp_client.call_tool( "search_documents", arguments={"query": "test"} ) assert result is not None mock_get.assert_called_once()八、CI/CD 集成将 MCP 测试集成到持续集成流水线中:# .github/workflows/test.ymlname: MCP Server Testson: [push, pull_request]jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: '3.12' - name: Install dependencies run: pip install mcp[cli] pytest pytest-asyncio pytest-cov - name: Run unit tests run: pytest test_unit.py -v --cov=server --cov-report=xml - name: Run protocol tests run: pytest test_protocol.py -v - name: Coverage check run: pytest --cov-fail-under=80九、最佳实践总结遵循测试金字塔:70% 单元测试 + 20% 协议测试 + 10% 端到端测试优先使用 FastMCPTransport:在不启动真实服务器的情况下完成协议验证,速度快且可靠用 MCP Inspector 辅助开发:写自动化测试前先用 Inspector 手动验证行为Snapshot 锁定接口:防止 Tool Schema 意外变更导致 Client 端故障Mock 外部依赖:隔离测试,避免因第三方 API 不稳定导致测试闪烁覆盖安全边界:认证、授权、输入校验、速率限制必须测试CI 集成:每次提交自动运行测试,覆盖率目标 80%性能基线:设定延迟和并发阈值,定期跑负载测试十、测试工具速查表| 工具 | 用途 | 安装命令 ||------|------|----------|| pytest | 单元/协议测试 | pip install pytest pytest-asyncio || FastMCPTransport | 进程内协议测试 | pip install mcp || MCP Inspector | 交互式手动测试 | mcp dev server.py || pytest-cov | 覆盖率报告 | pip install pytest-cov || pytest-inline-snapshot | 接口快照测试 | pip install pytest-inline-snapshot || locust | 负载测试 | pip install locust || mcpmock | Mock MCP Server | pip install mcpmock |通过以上分层测试策略,你可以系统性地保障 MCP Server 的质量、安全性和性能,从开发阶段到生产环境全程覆盖。
服务端阅读 05月28日 06:28

MCP(Model Context Protocol)安全性设计有哪些关键机制?

OAuth 2.1 三方授权架构MCP 规范(2025-06-18 版本)将安全模型建立在 OAuth 2.1 之上,采用经典的三方架构:MCP Client 充当 OAuth 2.1 Client,负责发起授权请求MCP Server 充当 OAuth 2.1 Resource Server,验证 Access Token 后提供资源Authorization Server 负责用户认证、授权同意和令牌签发授权流程从 Client 向 Server 发起连接开始。若 Server 要求认证,返回 401 Unauthorized 并在 WWW-Authenticate 头中提供 Protected Resource Metadata(PRM)文档地址。Client 通过 PRM 发现 Authorization Server 的端点信息,完成 Dynamic Client Registration(RFC 7591)后,引导用户浏览器打开 /authorize 端点完成登录和授权。授权码通过 PKCE 机制交换为 Access Token,后续请求携带 Bearer Token 访问资源。对于本地 STDIO 传输,MCP 规范明确要求不走 OAuth 流程,而是通过环境变量注入凭证或复用第三方库的认证态,避免在本地进程间引入不必要的网络授权开销。工具调用安全与 Tool Poisoning 防护MCP 的核心能力是让 LLM 调用外部工具,这也带来了 Tool Poisoning 威胁——恶意 MCP Server 可以在 tool description 中嵌入 Prompt Injection 指令,操控 LLM 的行为。防护措施包括:工具描述哈希锁定:首次审批时记录 tool description 的哈希值,后续调用前校验哈希,防止"rug pull"式描述篡改工具版本钉扎:锁定已审批的 MCP Server 版本,避免自动更新引入恶意变更最小 Scope 授权:从最小的权限范围起步(如 mcp:tools),仅在业务需要时逐步提升,拒绝通配符 ScopeHuman-in-the-loop:涉及写入、删除、发送等破坏性操作时,强制要求用户显式确认沙箱隔离与权限最小化每个 MCP Server 应运行在独立的容器化沙箱中,网络出口仅限白名单地址。关键实践:默认阻断出站 DNS,防止数据通过 DNS 隧道外泄文件系统挂载采用只读 + 白名单路径策略,Server 只能访问明确授权的目录设置执行超时和资源配额(CPU / 内存 / 磁盘),防止无限循环或资源耗尽使用 MCP Gateway 统一管理 Server 白名单、访问控制和集中日志通信加密与证书验证远程 MCP 连接强制使用 TLS,并实施完整的证书链验证和吊销检查(OCSP/CRL)。协议基于 JSON-RPC 2.0,在传输层之上不依赖额外的消息级加密。生产环境中建议:禁止自签名证书,所有证书必须由可信 CA 签发开启 HSTS 防止协议降级攻击定期轮换 TLS 证书,自动化证书管理采用 ACME 协议输入验证与 Prompt Injection 防御MCP 场景下的输入验证比传统 Web 应用更复杂,因为 LLM 的自然语言输入天然难以结构化验证。核心防御策略:参数 Schema 强校验:所有 tool input 必须声明 JSON Schema,Server 端严格按 Schema 验证,拒绝不合规参数Prompt Injection 检测:在用户输入到达 LLM 前插入风险标签,识别越权指令、数据外泄企图等异常模式上下文隔离:不同来源的 Prompt 片段用隔离标记包裹,防止跨上下文的指令注入命令执行白名单:禁止 LLM 直接执行 Shell 命令,必须通过预定义的工具接口间接操作审计日志与异常监控MCP 的安全审计要求记录每一次工具调用的完整上下文:工具名称、调用参数、LLM 决策过程、返回结果均需记录每条日志携带 Correlation ID,支持跨 Server 调用链的端到端追踪日志推送至 SIEM 系统,配合规则引擎实时检测异常行为错误消息禁止泄露敏感信息(Token、密钥、内部路径),统一返回脱敏后的安全提示数据隐私与合规MCP 规范在数据层面强调用户同意和最小暴露原则:Host 在向 Server 暴露用户数据前必须获得用户明确同意Server 不得在未授权情况下将资源数据传输给第三方日志中的敏感字段(API Key、Token、个人身份信息)必须脱敏处理提供与 GDPR、CCPA、ISO 27001 等标准的合规映射能力,辅助生成评估报告面试追问Q: MCP 的 OAuth 2.1 授权和传统 Web 应用的 OAuth 有什么区别?MCP 中 Server 同时承担 Resource Server 角色,Client 需要通过 Protected Resource Metadata(RFC 9728)发现 Authorization Server,而非硬编码授权端点。此外 MCP 强制使用 PKCE、支持 Dynamic Client Registration,且本地 STDIO 传输完全不走 OAuth 流程,改用环境变量注入凭证——这是与传统 Web OAuth 最大的架构差异。Q: 如果 MCP Server 被植入恶意 tool description,如何检测和防御?检测层面,通过 tool description 哈希比对发现描述被篡改,配合 SIEM 实时监控异常工具调用模式。防御层面,首次审批时锁定描述哈希和 Server 版本,运行时采用最小 Scope 策略限制工具权限,破坏性操作强制 Human-in-the-loop 确认,同时通过沙箱隔离限制 Server 的实际执行能力。