5月28日 06:51

MCP 流式处理如何实现?Streamable HTTP 传输与进度通知完整指南

MCP(Model Context Protocol)的流式处理能力是远程部署和实时交互的核心基础。2025 年 3 月规范引入 Streamable HTTP 传输后,MCP 的流式架构发生了根本性变化。本文将基于最新规范,从传输层到应用层,系统讲解 MCP 流式处理的实现方法。

MCP 流式处理的架构基础

MCP 定义了两种标准传输方式:

传输方式适用场景连接模式
stdio本地通信(同机器)标准输入输出管道
Streamable HTTP远程通信(跨网络)单端点 HTTP + 按需 SSE

Streamable HTTP 的核心设计是单一端点、按需流式:所有通信通过一个 HTTP 端点(如 /mcp)完成,服务器可以灵活选择返回普通 JSON 响应还是升级为 SSE 流式响应。

Streamable HTTP 传输实现

端点设计

Streamable HTTP 使用单一端点同时支持 POST 和 GET 请求:

  • POST /mcp:客户端发送 JSON-RPC 消息,服务器根据请求内容决定响应方式
  • GET /mcp:客户端建立 SSE 监听流,接收服务器主动推送的通知

客户端请求格式

客户端发起请求时,必须在 Accept 头中同时声明支持 JSON 和 SSE:

http
POST /mcp HTTP/1.1 Content-Type: application/json Accept: application/json, text/event-stream Mcp-Session-Id: session-abc123 { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "analyze_data", "arguments": {"dataset": "large_corpus"} } }

注意:2026 规范要求请求携带 Mcp-MethodMcp-Name 头部,便于负载均衡器和网关进行路由(SEP-2243)。

服务器响应策略

服务器根据请求特性动态选择响应方式:

场景一:普通 JSON 响应(短操作,立即返回结果)

http
HTTP/1.1 200 OK Content-Type: application/json { "jsonrpc": "2.0", "id": 1, "result": { "content": [{"type": "text", "text": "分析完成:共 1,024 条记录"}] } }

场景二:SSE 流式响应(长操作,需要进度反馈)

http
HTTP/1.1 200 OK Content-Type: text/event-stream event: message data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"op-123","progress":25,"total":100}} event: message data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"op-123","progress":75,"total":100}} event: message data: {"jsonrpc":"2.0","id":1,"result":{"content":[{"type":"text","text":"分析完成:共 1,024 条记录"}]}}

Python SDK 流式工具实现

安装与初始化

bash
pip install mcp

带进度通知的流式工具

MCP Python SDK 通过 request_context 提供进度通知能力:

python
from mcp.server import Server from mcp import types import asyncio server = Server("stream-demo") @server.call_tool() async def analyze_large_dataset( name: str, arguments: dict ) -> list[types.TextContent]: context = server.request_context progress_token = context.meta.progressToken if context.meta else None dataset = arguments.get("dataset", "") total_steps = 10 results = [] for step in range(total_steps): chunk_result = await process_chunk(dataset, step) results.append(chunk_result) if progress_token: await context.session.send_progress_notification( progress_token=progress_token, progress=step + 1, total=total_steps, ) await asyncio.sleep(0.1) summary = f"分析完成:处理了 {len(results)} 个数据块" return [types.TextContent(type="text", text=summary)] async def process_chunk(dataset: str, step: int) -> dict: return {"step": step, "status": "done"}

客户端接收进度通知

客户端通过 progress_callback 参数接收进度更新:

python
from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client async def call_with_progress(): server_params = StdioServerParameters( command="python", args=["-m", "stream_demo_server"], ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() def on_progress(progress: float, total: float | None): percent = (progress / total * 100) if total else 0 print(f"进度: {percent:.0f}%") result = await session.call_tool( "analyze_large_dataset", arguments={"dataset": "large_corpus"}, progress_callback=on_progress, ) print(f"结果: {result.content[0].text}")

服务器主动推送与通知

通知类型

MCP 定义了多种服务器通知类型,均可通过 SSE 流推送到客户端:

通知方法用途典型场景
notifications/progress操作进度长时间运行的任务
notifications/tools/list_changed工具列表变更动态注册/注销工具
notifications/resources/updated资源内容更新文件变化通知
notifications/message日志消息调试和监控

发送工具列表变更通知

python
@server.call_tool() async def register_dynamic_tool(name: str, arguments: dict) -> list[types.TextContent]: # 注册新工具逻辑... await server.request_context.session.send_notification( types.ServerNotification( method="notifications/tools/list_changed", ) ) return [types.TextContent(type="text", text=f"工具 {arguments['tool_name']} 已注册")]

断点续传与连接恢复

Streamable HTTP 通过 Last-Event-ID 头部支持 SSE 流的断点续传:

http
GET /mcp HTTP/1.1 Accept: text/event-stream Last-Event-ID: evt-42 Mcp-Session-Id: session-abc123

服务器收到带有 Last-Event-ID 的请求后,应从指定事件之后继续推送,确保客户端不会丢失中间状态的通知。

实现要点:

  • 每个 SSE 事件应携带唯一 ID
  • 服务器需要维护最近的事件缓冲区
  • 超出缓冲范围时,建议客户端重新初始化会话

与旧版 SSE 传输的对比

特性旧版 HTTP+SSEStreamable HTTP
端点数量2 个(/sse + /messages1 个(/mcp
连接方式必须先建立 SSE 长连接按需升级为 SSE
无状态服务器不支持支持(可部署 Serverless)
断点续传不支持通过 Last-Event-ID 支持
基础设施兼容部分代理/CDN 不兼容标准 HTTP,完全兼容

旧版 SSE 传输仅建议用于兼容未升级到 2025-03-26 规范的老客户端。

最佳实践

1. 合理选择响应模式:短操作直接返回 JSON,长操作才升级为 SSE 流。避免对所有请求都建立流式连接。

2. 提供进度反馈:耗时超过 1 秒的操作,应通过 notifications/progress 提供进度更新,改善用户体验。

3. 实现断点续传:为 SSE 事件分配唯一 ID,维护事件缓冲区,支持客户端从断点恢复。

4. 资源清理:及时关闭完成的 SSE 流,避免服务器端资源泄漏。设置合理的超时时间。

5. 会话管理:妥善管理 Mcp-Session-Id,确保在负载均衡场景下会话亲和性。

6. 监控与日志:通过 notifications/message 发送日志级别消息,便于调试和性能监控。

通过合理运用 Streamable HTTP 传输和进度通知机制,可以构建高效、可靠的 MCP 流式处理系统,满足实时交互和大规模数据处理的需求。

标签:MCP