乐闻世界logo
搜索文章和话题

如何在 MCP 中实现流式处理?

2月19日 21:34

MCP 的流式处理能力允许实时传输大量数据或长时间运行的操作结果。以下是详细的实现方法:

流式处理基础

MCP 支持两种流式处理模式:

  1. 服务器推送流:服务器主动推送数据
  2. 客户端请求流:客户端请求流式响应

1. 流式工具定义

python
from mcp.server import Server from mcp.types import Tool import asyncio server = Server("my-mcp-server") @server.tool( name="stream_data", description="流式返回大量数据" ) async def stream_data( count: int, batch_size: int = 10 ) -> AsyncIterator[str]: """流式生成数据""" for i in range(0, count, batch_size): batch = list(range(i, min(i + batch_size, count))) # 返回一批数据 yield { "type": "data", "batch": batch, "progress": (i + batch_size) / count } # 模拟处理延迟 await asyncio.sleep(0.1) # 发送完成信号 yield { "type": "done", "total": count }

2. 流式响应处理器

python
from typing import AsyncIterator, Dict, Any import json class StreamProcessor: def __init__(self): self.active_streams = {} async def process_stream( self, stream_id: str, stream: AsyncIterator[Dict[str, Any]] ) -> AsyncIterator[Dict[str, Any]]: """处理流式响应""" self.active_streams[stream_id] = { "status": "active", "start_time": asyncio.get_event_loop().time() } try: async for chunk in stream: # 处理每个数据块 processed = await self._process_chunk(chunk) # 更新流状态 if processed.get("type") == "done": self.active_streams[stream_id]["status"] = "completed" yield processed except Exception as e: self.active_streams[stream_id]["status"] = "error" self.active_streams[stream_id]["error"] = str(e) yield { "type": "error", "error": str(e) } finally: # 清理流 if stream_id in self.active_streams: del self.active_streams[stream_id] async def _process_chunk( self, chunk: Dict[str, Any] ) -> Dict[str, Any]: """处理单个数据块""" chunk_type = chunk.get("type") if chunk_type == "data": # 处理数据块 return { "type": "data", "data": chunk.get("batch"), "progress": chunk.get("progress"), "timestamp": asyncio.get_event_loop().time() } elif chunk_type == "done": # 处理完成信号 return { "type": "done", "total": chunk.get("total"), "timestamp": asyncio.get_event_loop().time() } return chunk def get_stream_status(self, stream_id: str) -> Dict[str, Any]: """获取流状态""" return self.active_streams.get(stream_id, { "status": "not_found" })

3. 流式数据聚合器

python
class StreamAggregator: def __init__(self): self.buffers = {} self.aggregators = {} async def aggregate_stream( self, stream_id: str, stream: AsyncIterator[Dict[str, Any]], aggregation_func: callable ) -> Dict[str, Any]: """聚合流式数据""" buffer = [] self.buffers[stream_id] = buffer try: async for chunk in stream: if chunk.get("type") == "data": buffer.append(chunk.get("data")) elif chunk.get("type") == "done": # 执行聚合 result = aggregation_func(buffer) return { "type": "aggregated", "result": result, "count": len(buffer) } return { "type": "error", "error": "Stream ended without completion" } finally: if stream_id in self.buffers: del self.buffers[stream_id] def get_buffer(self, stream_id: str) -> list: """获取缓冲区数据""" return self.buffers.get(stream_id, [])

4. 流式进度跟踪

python
class StreamProgressTracker: def __init__(self): self.progress = {} def track_stream( self, stream_id: str, total_items: int ): """开始跟踪流进度""" self.progress[stream_id] = { "total": total_items, "processed": 0, "start_time": asyncio.get_event_loop().time(), "last_update": asyncio.get_event_loop().time() } def update_progress( self, stream_id: str, processed: int ): """更新进度""" if stream_id not in self.progress: return self.progress[stream_id]["processed"] = processed self.progress[stream_id]["last_update"] = \ asyncio.get_event_loop().time() def get_progress(self, stream_id: str) -> Dict[str, Any]: """获取进度信息""" if stream_id not in self.progress: return { "status": "not_found" } info = self.progress[stream_id] return { "total": info["total"], "processed": info["processed"], "percentage": (info["processed"] / info["total"]) * 100, "elapsed": asyncio.get_event_loop().time() - info["start_time"], "estimated_remaining": self._estimate_remaining(info) } def _estimate_remaining(self, info: Dict[str, Any]) -> float: """估算剩余时间""" if info["processed"] == 0: return 0.0 elapsed = asyncio.get_event_loop().time() - info["start_time"] rate = info["processed"] / elapsed if rate == 0: return 0.0 remaining = (info["total"] - info["processed"]) / rate return remaining

5. 流式错误处理

python
class StreamErrorHandler: def __init__(self): self.error_handlers = {} def register_handler( self, error_type: str, handler: callable ): """注册错误处理器""" self.error_handlers[error_type] = handler async def handle_error( self, error: Exception, stream_id: str ) -> Dict[str, Any]: """处理流错误""" error_type = type(error).__name__ # 查找对应的错误处理器 handler = self.error_handlers.get(error_type) if handler: try: result = await handler(error, stream_id) return { "type": "handled", "result": result } except Exception as e: return { "type": "error", "error": f"Error handler failed: {str(e)}" } # 默认错误处理 return { "type": "error", "error": str(error), "error_type": error_type }

6. 流式资源管理

python
class StreamResourceManager: def __init__(self): self.resources = {} def allocate_resource( self, stream_id: str, resource_type: str, resource: Any ): """分配流资源""" if stream_id not in self.resources: self.resources[stream_id] = {} self.resources[stream_id][resource_type] = resource def get_resource( self, stream_id: str, resource_type: str ) -> Any: """获取流资源""" if stream_id not in self.resources: return None return self.resources[stream_id].get(resource_type) def release_resources(self, stream_id: str): """释放流资源""" if stream_id not in self.resources: return resources = self.resources[stream_id] # 清理资源 for resource_type, resource in resources.items(): if hasattr(resource, 'close'): resource.close() elif hasattr(resource, '__aenter__'): asyncio.create_task(resource.__aexit__(None, None, None)) del self.resources[stream_id]

7. 流式性能监控

python
class StreamPerformanceMonitor: def __init__(self): self.metrics = {} def start_monitoring(self, stream_id: str): """开始监控""" self.metrics[stream_id] = { "start_time": asyncio.get_event_loop().time(), "chunks": 0, "bytes": 0, "errors": 0 } def record_chunk(self, stream_id: str, size: int): """记录数据块""" if stream_id not in self.metrics: return self.metrics[stream_id]["chunks"] += 1 self.metrics[stream_id]["bytes"] += size def record_error(self, stream_id: str): """记录错误""" if stream_id not in self.metrics: return self.metrics[stream_id]["errors"] += 1 def get_metrics(self, stream_id: str) -> Dict[str, Any]: """获取性能指标""" if stream_id not in self.metrics: return { "status": "not_found" } metrics = self.metrics[stream_id] elapsed = asyncio.get_event_loop().time() - metrics["start_time"] return { "elapsed": elapsed, "chunks": metrics["chunks"], "bytes": metrics["bytes"], "errors": metrics["errors"], "chunks_per_second": metrics["chunks"] / elapsed if elapsed > 0 else 0, "bytes_per_second": metrics["bytes"] / elapsed if elapsed > 0 else 0, "error_rate": metrics["errors"] / metrics["chunks"] if metrics["chunks"] > 0 else 0 }

最佳实践:

  1. 合理分块:根据网络条件和数据特性选择合适的块大小
  2. 进度反馈:提供清晰的进度信息,改善用户体验
  3. 错误恢复:实现错误恢复机制,提高系统鲁棒性
  4. 资源管理:及时释放流资源,避免内存泄漏
  5. 性能监控:监控流性能,及时发现和解决问题
  6. 超时控制:设置合理的超时时间,防止无限等待

通过完善的流式处理机制,可以高效处理大量数据和长时间运行的操作。

标签:MCP