MCP 流式处理如何实现?Streamable HTTP 传输与进度通知完整指南
MCP(Model Context Protocol)的流式处理能力是远程部署和实时交互的核心基础。2025 年 3 月规范引入 Streamable HTTP 传输后,MCP 的流式架构发生了根本性变化。本文将基于最新规范,从传输层到应用层,系统讲解 MCP 流式处理的实现方法。
MCP 流式处理的架构基础
MCP 定义了两种标准传输方式:
| 传输方式 | 适用场景 | 连接模式 |
|---|---|---|
| stdio | 本地通信(同机器) | 标准输入输出管道 |
| Streamable HTTP | 远程通信(跨网络) | 单端点 HTTP + 按需 SSE |
Streamable HTTP 的核心设计是单一端点、按需流式:所有通信通过一个 HTTP 端点(如 /mcp)完成,服务器可以灵活选择返回普通 JSON 响应还是升级为 SSE 流式响应。
Streamable HTTP 传输实现
端点设计
Streamable HTTP 使用单一端点同时支持 POST 和 GET 请求:
- POST
/mcp:客户端发送 JSON-RPC 消息,服务器根据请求内容决定响应方式 - GET
/mcp:客户端建立 SSE 监听流,接收服务器主动推送的通知
客户端请求格式
客户端发起请求时,必须在 Accept 头中同时声明支持 JSON 和 SSE:
httpPOST /mcp HTTP/1.1 Content-Type: application/json Accept: application/json, text/event-stream Mcp-Session-Id: session-abc123 { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "analyze_data", "arguments": {"dataset": "large_corpus"} } }
注意:2026 规范要求请求携带 Mcp-Method 和 Mcp-Name 头部,便于负载均衡器和网关进行路由(SEP-2243)。
服务器响应策略
服务器根据请求特性动态选择响应方式:
场景一:普通 JSON 响应(短操作,立即返回结果)
httpHTTP/1.1 200 OK Content-Type: application/json { "jsonrpc": "2.0", "id": 1, "result": { "content": [{"type": "text", "text": "分析完成:共 1,024 条记录"}] } }
场景二:SSE 流式响应(长操作,需要进度反馈)
httpHTTP/1.1 200 OK Content-Type: text/event-stream event: message data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"op-123","progress":25,"total":100}} event: message data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"op-123","progress":75,"total":100}} event: message data: {"jsonrpc":"2.0","id":1,"result":{"content":[{"type":"text","text":"分析完成:共 1,024 条记录"}]}}
Python SDK 流式工具实现
安装与初始化
bashpip install mcp
带进度通知的流式工具
MCP Python SDK 通过 request_context 提供进度通知能力:
pythonfrom mcp.server import Server from mcp import types import asyncio server = Server("stream-demo") @server.call_tool() async def analyze_large_dataset( name: str, arguments: dict ) -> list[types.TextContent]: context = server.request_context progress_token = context.meta.progressToken if context.meta else None dataset = arguments.get("dataset", "") total_steps = 10 results = [] for step in range(total_steps): chunk_result = await process_chunk(dataset, step) results.append(chunk_result) if progress_token: await context.session.send_progress_notification( progress_token=progress_token, progress=step + 1, total=total_steps, ) await asyncio.sleep(0.1) summary = f"分析完成:处理了 {len(results)} 个数据块" return [types.TextContent(type="text", text=summary)] async def process_chunk(dataset: str, step: int) -> dict: return {"step": step, "status": "done"}
客户端接收进度通知
客户端通过 progress_callback 参数接收进度更新:
pythonfrom mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client async def call_with_progress(): server_params = StdioServerParameters( command="python", args=["-m", "stream_demo_server"], ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() def on_progress(progress: float, total: float | None): percent = (progress / total * 100) if total else 0 print(f"进度: {percent:.0f}%") result = await session.call_tool( "analyze_large_dataset", arguments={"dataset": "large_corpus"}, progress_callback=on_progress, ) print(f"结果: {result.content[0].text}")
服务器主动推送与通知
通知类型
MCP 定义了多种服务器通知类型,均可通过 SSE 流推送到客户端:
| 通知方法 | 用途 | 典型场景 |
|---|---|---|
notifications/progress | 操作进度 | 长时间运行的任务 |
notifications/tools/list_changed | 工具列表变更 | 动态注册/注销工具 |
notifications/resources/updated | 资源内容更新 | 文件变化通知 |
notifications/message | 日志消息 | 调试和监控 |
发送工具列表变更通知
python@server.call_tool() async def register_dynamic_tool(name: str, arguments: dict) -> list[types.TextContent]: # 注册新工具逻辑... await server.request_context.session.send_notification( types.ServerNotification( method="notifications/tools/list_changed", ) ) return [types.TextContent(type="text", text=f"工具 {arguments['tool_name']} 已注册")]
断点续传与连接恢复
Streamable HTTP 通过 Last-Event-ID 头部支持 SSE 流的断点续传:
httpGET /mcp HTTP/1.1 Accept: text/event-stream Last-Event-ID: evt-42 Mcp-Session-Id: session-abc123
服务器收到带有 Last-Event-ID 的请求后,应从指定事件之后继续推送,确保客户端不会丢失中间状态的通知。
实现要点:
- 每个 SSE 事件应携带唯一 ID
- 服务器需要维护最近的事件缓冲区
- 超出缓冲范围时,建议客户端重新初始化会话
与旧版 SSE 传输的对比
| 特性 | 旧版 HTTP+SSE | Streamable HTTP |
|---|---|---|
| 端点数量 | 2 个(/sse + /messages) | 1 个(/mcp) |
| 连接方式 | 必须先建立 SSE 长连接 | 按需升级为 SSE |
| 无状态服务器 | 不支持 | 支持(可部署 Serverless) |
| 断点续传 | 不支持 | 通过 Last-Event-ID 支持 |
| 基础设施兼容 | 部分代理/CDN 不兼容 | 标准 HTTP,完全兼容 |
旧版 SSE 传输仅建议用于兼容未升级到 2025-03-26 规范的老客户端。
最佳实践
1. 合理选择响应模式:短操作直接返回 JSON,长操作才升级为 SSE 流。避免对所有请求都建立流式连接。
2. 提供进度反馈:耗时超过 1 秒的操作,应通过 notifications/progress 提供进度更新,改善用户体验。
3. 实现断点续传:为 SSE 事件分配唯一 ID,维护事件缓冲区,支持客户端从断点恢复。
4. 资源清理:及时关闭完成的 SSE 流,避免服务器端资源泄漏。设置合理的超时时间。
5. 会话管理:妥善管理 Mcp-Session-Id,确保在负载均衡场景下会话亲和性。
6. 监控与日志:通过 notifications/message 发送日志级别消息,便于调试和性能监控。
通过合理运用 Streamable HTTP 传输和进度通知机制,可以构建高效、可靠的 MCP 流式处理系统,满足实时交互和大规模数据处理的需求。