乐闻世界logo
搜索文章和话题

MCP

模型上下文协议(Model Context Protocol,MCP) 解决了大型语言模型(LLM)与外部数据源和工具连接的难题,堪称 AI 领域的“万能遥控器”。 由 Anthropic 开源发布,MCP 在现有的函数调用机制基础上创新,免去了 LLM 与各类应用间定制集成的繁琐工作。 这意味着开发者无需为每种 AI 模型与外部系统的组合重新设计接口,能够更高效地构建功能更强大、上下文感知更精准的应用。
MCP
查看更多相关内容
MCP 的数据持久化和缓存策略有哪些?MCP 的数据持久化和缓存策略对于提高系统性能和可靠性至关重要。以下是详细的实现方法和最佳实践: **数据持久化架构** MCP 数据持久化应考虑以下方面: 1. **存储类型**:选择合适的存储类型(关系型、文档型、键值型等) 2. **数据模型**:设计合理的数据模型 3. **持久化策略**:实现高效的数据持久化策略 4. **缓存策略**:实现多层缓存策略 5. **数据一致性**:确保数据一致性 6. **备份恢复**:实现数据备份和恢复机制 **1. 数据模型设计** ```python from dataclasses import dataclass from typing import Optional, Dict, Any, List from datetime import datetime from enum import Enum class DataType(Enum): """数据类型""" TOOL = "tool" RESOURCE = "resource" PROMPT = "prompt" SESSION = "session" METADATA = "metadata" @dataclass class DataRecord: """数据记录""" id: str data_type: DataType content: Dict[str, Any] created_at: datetime updated_at: datetime version: int = 1 metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {} class DataModel: """数据模型""" def __init__(self): self.records: Dict[str, DataRecord] = {} def create_record( self, data_type: DataType, content: Dict[str, Any], metadata: Dict[str, Any] = None ) -> DataRecord: """创建数据记录""" record_id = self._generate_id(data_type) now = datetime.now() record = DataRecord( id=record_id, data_type=data_type, content=content, created_at=now, updated_at=now, metadata=metadata or {} ) self.records[record_id] = record return record def update_record( self, record_id: str, content: Dict[str, Any] = None, metadata: Dict[str, Any] = None ) -> Optional[DataRecord]: """更新数据记录""" if record_id not in self.records: return None record = self.records[record_id] if content: record.content.update(content) if metadata: record.metadata.update(metadata) record.updated_at = datetime.now() record.version += 1 return record def get_record(self, record_id: str) -> Optional[DataRecord]: """获取数据记录""" return self.records.get(record_id) def delete_record(self, record_id: str) -> bool: """删除数据记录""" if record_id in self.records: del self.records[record_id] return True return False def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询数据记录""" records = list(self.records.values()) if data_type: records = [r for r in records if r.data_type == data_type] if filters: for key, value in filters.items(): records = [ r for r in records if self._match_filter(r, key, value) ] return records def _generate_id(self, data_type: DataType) -> str: """生成记录 ID""" import uuid return f"{data_type.value}_{uuid.uuid4().hex}" def _match_filter( self, record: DataRecord, key: str, value: Any ) -> bool: """匹配过滤条件""" # 检查内容 if key in record.content: return record.content[key] == value # 检查元数据 if key in record.metadata: return record.metadata[key] == value return False ``` **2. 持久化存储实现** ```python from abc import ABC, abstractmethod from typing import Dict, List, Optional import json import os from pathlib import Path class PersistenceStorage(ABC): """持久化存储基类""" @abstractmethod async def save_record(self, record: DataRecord) -> bool: """保存记录""" pass @abstractmethod async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" pass @abstractmethod async def delete_record(self, record_id: str) -> bool: """删除记录""" pass @abstractmethod async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" pass class FileStorage(PersistenceStorage): """文件存储""" def __init__(self, storage_dir: str = "data"): self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) def _get_file_path(self, record_id: str) -> Path: """获取文件路径""" return self.storage_dir / f"{record_id}.json" async def save_record(self, record: DataRecord) -> bool: """保存记录""" file_path = self._get_file_path(record.id) try: data = { "id": record.id, "data_type": record.data_type.value, "content": record.content, "created_at": record.created_at.isoformat(), "updated_at": record.updated_at.isoformat(), "version": record.version, "metadata": record.metadata } with open(file_path, 'w') as f: json.dump(data, f, indent=2) return True except Exception as e: print(f"保存记录失败: {e}") return False async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" file_path = self._get_file_path(record_id) if not file_path.exists(): return None try: with open(file_path, 'r') as f: data = json.load(f) record = DataRecord( id=data["id"], data_type=DataType(data["data_type"]), content=data["content"], created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), version=data["version"], metadata=data.get("metadata", {}) ) return record except Exception as e: print(f"加载记录失败: {e}") return None async def delete_record(self, record_id: str) -> bool: """删除记录""" file_path = self._get_file_path(record_id) if file_path.exists(): file_path.unlink() return True return False async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" records = [] for file_path in self.storage_dir.glob("*.json"): try: with open(file_path, 'r') as f: data = json.load(f) record = DataRecord( id=data["id"], data_type=DataType(data["data_type"]), content=data["content"], created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), version=data["version"], metadata=data.get("metadata", {}) ) # 应用过滤条件 if data_type and record.data_type != data_type: continue if filters: match = True for key, value in filters.items(): if key in record.content and record.content[key] != value: match = False break if key in record.metadata and record.metadata[key] != value: match = False break if not match: continue records.append(record) except Exception as e: print(f"加载记录失败 {file_path}: {e}") return records class DatabaseStorage(PersistenceStorage): """数据库存储""" def __init__(self, database_url: str): self.database_url = database_url self._initialize_database() def _initialize_database(self): """初始化数据库""" from sqlalchemy import create_engine, Column, String, Integer, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base = declarative_base() class DataRecordTable(Base): __tablename__ = 'data_records' id = Column(String(100), primary_key=True) data_type = Column(String(50), nullable=False, index=True) content = Column(Text, nullable=False) created_at = Column(DateTime, nullable=False) updated_at = Column(DateTime, nullable=False) version = Column(Integer, default=1) metadata = Column(Text) self.engine = create_engine(self.database_url) Base.metadata.create_all(self.engine) self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) self.DataRecordTable = DataRecordTable async def save_record(self, record: DataRecord) -> bool: """保存记录""" session = self.SessionLocal() try: db_record = self.DataRecordTable( id=record.id, data_type=record.data_type.value, content=json.dumps(record.content), created_at=record.created_at, updated_at=record.updated_at, version=record.version, metadata=json.dumps(record.metadata) ) session.merge(db_record) session.commit() return True except Exception as e: session.rollback() print(f"保存记录失败: {e}") return False finally: session.close() async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" session = self.SessionLocal() try: db_record = session.query(self.DataRecordTable).filter( self.DataRecordTable.id == record_id ).first() if not db_record: return None record = DataRecord( id=db_record.id, data_type=DataType(db_record.data_type), content=json.loads(db_record.content), created_at=db_record.created_at, updated_at=db_record.updated_at, version=db_record.version, metadata=json.loads(db_record.metadata) if db_record.metadata else {} ) return record except Exception as e: print(f"加载记录失败: {e}") return None finally: session.close() async def delete_record(self, record_id: str) -> bool: """删除记录""" session = self.SessionLocal() try: session.query(self.DataRecordTable).filter( self.DataRecordTable.id == record_id ).delete() session.commit() return True except Exception as e: session.rollback() print(f"删除记录失败: {e}") return False finally: session.close() async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" session = self.SessionLocal() try: query = session.query(self.DataRecordTable) if data_type: query = query.filter( self.DataRecordTable.data_type == data_type.value ) db_records = query.all() records = [] for db_record in db_records: record = DataRecord( id=db_record.id, data_type=DataType(db_record.data_type), content=json.loads(db_record.content), created_at=db_record.created_at, updated_at=db_record.updated_at, version=db_record.version, metadata=json.loads(db_record.metadata) if db_record.metadata else {} ) # 应用过滤条件 if filters: match = True for key, value in filters.items(): if key in record.content and record.content[key] != value: match = False break if key in record.metadata and record.metadata[key] != value: match = False break if not match: continue records.append(record) return records except Exception as e: print(f"查询记录失败: {e}") return [] finally: session.close() ``` **3. 缓存策略实现** ```python from typing import Optional, Dict, Any, List from abc import ABC, abstractmethod import time class CacheStrategy(ABC): """缓存策略基类""" @abstractmethod async def get(self, key: str) -> Optional[Any]: """获取缓存值""" pass @abstractmethod async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" pass @abstractmethod async def delete(self, key: str) -> bool: """删除缓存值""" pass @abstractmethod async def clear(self): """清空缓存""" pass class MemoryCache(CacheStrategy): """内存缓存""" def __init__(self, max_size: int = 1000, default_ttl: int = 300): self.max_size = max_size self.default_ttl = default_ttl self.cache: Dict[str, tuple] = {} async def get(self, key: str) -> Optional[Any]: """获取缓存值""" if key not in self.cache: return None value, timestamp, ttl = self.cache[key] # 检查是否过期 if ttl and time.time() - timestamp > ttl: del self.cache[key] return None return value async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" # 检查缓存大小 if len(self.cache) >= self.max_size: self._evict() ttl = ttl or self.default_ttl self.cache[key] = (value, time.time(), ttl) async def delete(self, key: str) -> bool: """删除缓存值""" if key in self.cache: del self.cache[key] return True return False async def clear(self): """清空缓存""" self.cache.clear() def _evict(self): """淘汰缓存项""" # 简单实现:随机淘汰 if self.cache: key = next(iter(self.cache)) del self.cache[key] class RedisCache(CacheStrategy): """Redis 缓存""" def __init__(self, redis_url: str = "redis://localhost:6379/0"): import redis.asyncio as aioredis self.redis = aioredis.from_url(redis_url) async def get(self, key: str) -> Optional[Any]: """获取缓存值""" try: value = await self.redis.get(key) if value: return json.loads(value) return None except Exception as e: print(f"获取缓存失败: {e}") return None async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" try: serialized_value = json.dumps(value) if ttl: await self.redis.setex(key, ttl, serialized_value) else: await self.redis.set(key, serialized_value) except Exception as e: print(f"设置缓存失败: {e}") async def delete(self, key: str) -> bool: """删除缓存值""" try: result = await self.redis.delete(key) return result > 0 except Exception as e: print(f"删除缓存失败: {e}") return False async def clear(self): """清空缓存""" try: await self.redis.flushdb() except Exception as e: print(f"清空缓存失败: {e}") class MultiLevelCache: """多级缓存""" def __init__( self, l1_cache: CacheStrategy, l2_cache: CacheStrategy = None ): self.l1_cache = l1_cache self.l2_cache = l2_cache async def get(self, key: str) -> Optional[Any]: """获取缓存值""" # 先从 L1 缓存获取 value = await self.l1_cache.get(key) if value is not None: return value # 从 L2 缓存获取 if self.l2_cache: value = await self.l2_cache.get(key) if value is not None: # 回填 L1 缓存 await self.l1_cache.set(key, value) return value return None async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" # 同时设置 L1 和 L2 缓存 await self.l1_cache.set(key, value, ttl) if self.l2_cache: await self.l2_cache.set(key, value, ttl) async def delete(self, key: str) -> bool: """删除缓存值""" # 同时删除 L1 和 L2 缓存 l1_deleted = await self.l1_cache.delete(key) l2_deleted = True if self.l2_cache: l2_deleted = await self.l2_cache.delete(key) return l1_deleted or l2_deleted async def clear(self): """清空缓存""" await self.l1_cache.clear() if self.l2_cache: await self.l2_cache.clear() ``` **4. 数据持久化管理器** ```python from typing import Optional, Dict, Any, List class DataPersistenceManager: """数据持久化管理器""" def __init__( self, storage: PersistenceStorage, cache: CacheStrategy = None ): self.storage = storage self.cache = cache self.data_model = DataModel() async def save_record( self, data_type: DataType, content: Dict[str, Any], metadata: Dict[str, Any] = None, use_cache: bool = True ) -> Optional[DataRecord]: """保存记录""" # 创建记录 record = self.data_model.create_record( data_type, content, metadata ) # 持久化存储 success = await self.storage.save_record(record) if not success: return None # 更新缓存 if use_cache and self.cache: await self.cache.set(record.id, record) return record async def load_record( self, record_id: str, use_cache: bool = True ) -> Optional[DataRecord]: """加载记录""" # 先从缓存获取 if use_cache and self.cache: record = await self.cache.get(record_id) if record: return record # 从存储加载 record = await self.storage.load_record(record_id) if record and use_cache and self.cache: # 更新缓存 await self.cache.set(record.id, record) return record async def update_record( self, record_id: str, content: Dict[str, Any] = None, metadata: Dict[str, Any] = None, use_cache: bool = True ) -> Optional[DataRecord]: """更新记录""" # 更新数据模型 record = self.data_model.update_record( record_id, content, metadata ) if not record: return None # 持久化存储 success = await self.storage.save_record(record) if not success: return None # 更新缓存 if use_cache and self.cache: await self.cache.set(record.id, record) return record async def delete_record( self, record_id: str, use_cache: bool = True ) -> bool: """删除记录""" # 从存储删除 success = await self.storage.delete_record(record_id) if not success: return False # 从缓存删除 if use_cache and self.cache: await self.cache.delete(record_id) # 从数据模型删除 self.data_model.delete_record(record_id) return True async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" return await self.storage.query_records(data_type, filters) ``` **5. 数据备份和恢复** ```python import shutil from typing import Optional from datetime import datetime class BackupManager: """备份管理器""" def __init__(self, storage_dir: str = "data", backup_dir: str = "backups"): self.storage_dir = Path(storage_dir) self.backup_dir = Path(backup_dir) self.backup_dir.mkdir(parents=True, exist_ok=True) async def create_backup(self, backup_name: str = None) -> Optional[str]: """创建备份""" if not backup_name: backup_name = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = self.backup_dir / backup_name try: # 创建备份目录 backup_path.mkdir(parents=True, exist_ok=True) # 复制数据文件 for file_path in self.storage_dir.glob("*.json"): shutil.copy2(file_path, backup_path / file_path.name) # 创建备份元数据 metadata = { "backup_name": backup_name, "created_at": datetime.now().isoformat(), "file_count": len(list(backup_path.glob("*.json"))) } metadata_path = backup_path / "backup_metadata.json" with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) return backup_name except Exception as e: print(f"创建备份失败: {e}") return None async def restore_backup(self, backup_name: str) -> bool: """恢复备份""" backup_path = self.backup_dir / backup_name if not backup_path.exists(): print(f"备份不存在: {backup_name}") return False try: # 清空当前数据目录 for file_path in self.storage_dir.glob("*.json"): file_path.unlink() # 复制备份文件 for file_path in backup_path.glob("*.json"): if file_path.name != "backup_metadata.json": shutil.copy2(file_path, self.storage_dir / file_path.name) return True except Exception as e: print(f"恢复备份失败: {e}") return False async def list_backups(self) -> List[Dict[str, Any]]: """列出所有备份""" backups = [] for backup_path in self.backup_dir.iterdir(): if not backup_path.is_dir(): continue metadata_path = backup_path / "backup_metadata.json" if metadata_path.exists(): with open(metadata_path, 'r') as f: metadata = json.load(f) backups.append(metadata) return backups async def delete_backup(self, backup_name: str) -> bool: """删除备份""" backup_path = self.backup_dir / backup_name if backup_path.exists(): shutil.rmtree(backup_path) return True return False ``` **最佳实践:** 1. **分层存储**:根据数据访问频率选择合适的存储类型 2. **缓存策略**:实现多级缓存,提高访问速度 3. **数据一致性**:确保缓存和存储的数据一致性 4. **定期备份**:定期创建数据备份,防止数据丢失 5. **监控告警**:监控存储和缓存的健康状态 6. **性能优化**:优化数据访问和存储性能 通过完善的数据持久化和缓存策略,可以确保 MCP 系统的高性能和可靠性。
服务端 · 2月21日 15:51
MCP 的错误处理和重试机制如何实现?MCP 的错误处理和重试机制对于确保系统稳定性和可靠性至关重要。以下是详细的错误处理策略和重试机制实现: **错误处理架构** MCP 错误处理应考虑以下方面: 1. **错误分类**:区分不同类型的错误 2. **错误传播**:正确传播错误信息 3. **错误恢复**:实现错误恢复机制 4. **重试策略**:智能的重试策略 5. **熔断机制**:防止级联故障 6. **降级策略**:在故障时提供降级服务 **1. 错误分类和定义** ```python from enum import Enum from typing import Optional, Dict, Any from dataclasses import dataclass class ErrorType(Enum): """错误类型""" VALIDATION_ERROR = "validation_error" AUTHENTICATION_ERROR = "authentication_error" AUTHORIZATION_ERROR = "authorization_error" NOT_FOUND_ERROR = "not_found_error" CONFLICT_ERROR = "conflict_error" RATE_LIMIT_ERROR = "rate_limit_error" INTERNAL_ERROR = "internal_error" EXTERNAL_SERVICE_ERROR = "external_service_error" TIMEOUT_ERROR = "timeout_error" NETWORK_ERROR = "network_error" class ErrorSeverity(Enum): """错误严重程度""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class MCPError(Exception): """MCP 错误基类""" error_type: ErrorType message: str code: int details: Dict[str, Any] = None severity: ErrorSeverity = ErrorSeverity.MEDIUM retryable: bool = False def __post_init__(self): if self.details is None: self.details = {} super().__init__(self.message) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "error_type": self.error_type.value, "message": self.message, "code": self.code, "details": self.details, "severity": self.severity.value, "retryable": self.retryable } class ValidationError(MCPError): """验证错误""" def __init__(self, message: str, details: Dict[str, Any] = None): super().__init__( error_type=ErrorType.VALIDATION_ERROR, message=message, code=400, details=details, severity=ErrorSeverity.LOW, retryable=False ) class AuthenticationError(MCPError): """认证错误""" def __init__(self, message: str = "Authentication failed"): super().__init__( error_type=ErrorType.AUTHENTICATION_ERROR, message=message, code=401, severity=ErrorSeverity.HIGH, retryable=False ) class AuthorizationError(MCPError): """授权错误""" def __init__(self, message: str = "Access denied"): super().__init__( error_type=ErrorType.AUTHORIZATION_ERROR, message=message, code=403, severity=ErrorSeverity.HIGH, retryable=False ) class NotFoundError(MCPError): """未找到错误""" def __init__(self, resource: str, identifier: str): super().__init__( error_type=ErrorType.NOT_FOUND_ERROR, message=f"{resource} not found: {identifier}", code=404, details={"resource": resource, "identifier": identifier}, severity=ErrorSeverity.LOW, retryable=False ) class RateLimitError(MCPError): """速率限制错误""" def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60): super().__init__( error_type=ErrorType.RATE_LIMIT_ERROR, message=message, code=429, details={"retry_after": retry_after}, severity=ErrorSeverity.MEDIUM, retryable=True ) class InternalError(MCPError): """内部错误""" def __init__(self, message: str = "Internal server error"): super().__init__( error_type=ErrorType.INTERNAL_ERROR, message=message, code=500, severity=ErrorSeverity.CRITICAL, retryable=True ) class ExternalServiceError(MCPError): """外部服务错误""" def __init__(self, service: str, message: str): super().__init__( error_type=ErrorType.EXTERNAL_SERVICE_ERROR, message=f"{service} error: {message}", code=502, details={"service": service}, severity=ErrorSeverity.HIGH, retryable=True ) class TimeoutError(MCPError): """超时错误""" def __init__(self, operation: str, timeout: float): super().__init__( error_type=ErrorType.TIMEOUT_ERROR, message=f"{operation} timed out after {timeout}s", code=504, details={"operation": operation, "timeout": timeout}, severity=ErrorSeverity.HIGH, retryable=True ) ``` **2. 错误处理器** ```python from typing import Callable, Optional, Dict, Any import logging import traceback class ErrorHandler: """错误处理器""" def __init__(self, logger: logging.Logger = None): self.logger = logger or logging.getLogger(__name__) self.error_handlers: Dict[ErrorType, Callable] = {} self.error_reporters: List[Callable] = [] def register_handler( self, error_type: ErrorType, handler: Callable ): """注册错误处理器""" self.error_handlers[error_type] = handler def register_reporter(self, reporter: Callable): """注册错误报告器""" self.error_reporters.append(reporter) async def handle_error( self, error: Exception, context: Dict[str, Any] = None ) -> Dict[str, Any]: """处理错误""" # 记录错误 await self._log_error(error, context) # 报告错误 await self._report_error(error, context) # 转换为 MCP 错误 mcp_error = self._convert_to_mcp_error(error) # 调用特定错误处理器 if mcp_error.error_type in self.error_handlers: try: result = await self.error_handlers[mcp_error.error_type]( mcp_error, context ) return result except Exception as e: self.logger.error(f"错误处理器失败: {e}") # 返回默认错误响应 return mcp_error.to_dict() async def _log_error( self, error: Exception, context: Dict[str, Any] = None ): """记录错误""" if isinstance(error, MCPError): self.logger.error( f"MCP Error: {error.error_type.value} - {error.message}", extra={ "error_code": error.code, "error_details": error.details, "context": context } ) else: self.logger.error( f"Unexpected error: {str(error)}", exc_info=True, extra={"context": context} ) async def _report_error( self, error: Exception, context: Dict[str, Any] = None ): """报告错误""" for reporter in self.error_reporters: try: await reporter(error, context) except Exception as e: self.logger.error(f"错误报告器失败: {e}") def _convert_to_mcp_error(self, error: Exception) -> MCPError: """转换为 MCP 错误""" if isinstance(error, MCPError): return error # 根据异常类型转换 if isinstance(error, ValueError): return ValidationError(str(error)) elif isinstance(error, PermissionError): return AuthorizationError(str(error)) elif isinstance(error, TimeoutError): return TimeoutError("operation", 0) else: return InternalError(str(error)) # 错误报告器示例 class ErrorReporter: """错误报告器""" def __init__(self, error_service_url: str): self.error_service_url = error_service_url async def report_error( self, error: Exception, context: Dict[str, Any] = None ): """报告错误到错误服务""" import aiohttp error_data = { "error": str(error), "error_type": type(error).__name__, "context": context or {}, "timestamp": datetime.now().isoformat() } try: async with aiohttp.ClientSession() as session: async with session.post( self.error_service_url, json=error_data ) as response: if response.status != 200: self.logger.error( f"报告错误失败: {response.status}" ) except Exception as e: self.logger.error(f"报告错误失败: {e}") ``` **3. 重试机制** ```python import asyncio from typing import Callable, Optional, Type import time class RetryStrategy: """重试策略基类""" async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" raise NotImplementedError async def get_delay(self, attempt: int) -> float: """获取重试延迟""" raise NotImplementedError class FixedDelayRetry(RetryStrategy): """固定延迟重试""" def __init__(self, max_attempts: int = 3, delay: float = 1.0): self.max_attempts = max_attempts self.delay = delay async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """获取重试延迟""" return self.delay class ExponentialBackoffRetry(RetryStrategy): """指数退避重试""" def __init__( self, max_attempts: int = 5, initial_delay: float = 1.0, max_delay: float = 60.0, backoff_factor: float = 2.0 ): self.max_attempts = max_attempts self.initial_delay = initial_delay self.max_delay = max_delay self.backoff_factor = backoff_factor async def should_retry( self, attempt: int, error: Exception ) -> bool: """判断是否应该重试""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """获取重试延迟""" delay = self.initial_delay * (self.backoff_factor ** attempt) return min(delay, self.max_delay) class RetryManager: """重试管理器""" def __init__(self, retry_strategy: RetryStrategy): self.retry_strategy = retry_strategy async def execute_with_retry( self, func: Callable, *args, **kwargs ) -> Any: """带重试执行函数""" attempt = 0 last_error = None while True: attempt += 1 try: result = await func(*args, **kwargs) return result except Exception as error: last_error = error # 判断是否应该重试 should_retry = await self.retry_strategy.should_retry( attempt, error ) if not should_retry: raise error # 获取重试延迟 delay = await self.retry_strategy.get_delay(attempt) # 等待后重试 await asyncio.sleep(delay) raise last_error # 重试装饰器 def retry( max_attempts: int = 3, delay: float = 1.0, backoff_factor: float = 2.0, max_delay: float = 60.0 ): """重试装饰器""" def decorator(func: Callable): retry_strategy = ExponentialBackoffRetry( max_attempts=max_attempts, initial_delay=delay, max_delay=max_delay, backoff_factor=backoff_factor ) retry_manager = RetryManager(retry_strategy) @wraps(func) async def wrapper(*args, **kwargs): return await retry_manager.execute_with_retry( func, *args, **kwargs ) return wrapper return decorator ``` **4. 熔断机制** ```python from enum import Enum from typing import Callable, Optional import asyncio class CircuitState(Enum): """熔断器状态""" CLOSED = "closed" # 正常状态 OPEN = "open" # 熔断状态 HALF_OPEN = "half_open" # 半开状态 class CircuitBreaker: """熔断器""" def __init__( self, failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0 ): self.failure_threshold = failure_threshold self.success_threshold = success_threshold self.timeout = timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time: Optional[float] = None self.lock = asyncio.Lock() async def execute( self, func: Callable, *args, **kwargs ) -> Any: """执行函数""" async with self.lock: # 检查熔断器状态 if self.state == CircuitState.OPEN: # 检查是否应该尝试恢复 if time.time() - self.last_failure_time > self.timeout: self.state = CircuitState.HALF_OPEN self.success_count = 0 else: raise MCPError( error_type=ErrorType.INTERNAL_ERROR, message="Circuit breaker is OPEN", code=503, retryable=True ) try: result = await func(*args, **kwargs) # 成功执行 async with self.lock: if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: self.state = CircuitState.CLOSED self.failure_count = 0 elif self.state == CircuitState.CLOSED: self.failure_count = 0 return result except Exception as error: # 执行失败 async with self.lock: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN raise error def get_state(self) -> CircuitState: """获取熔断器状态""" return self.state def reset(self): """重置熔断器""" async with self.lock: self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = None # 熔断器装饰器 def circuit_breaker( failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0 ): """熔断器装饰器""" def decorator(func: Callable): breaker = CircuitBreaker( failure_threshold=failure_threshold, success_threshold=success_threshold, timeout=timeout ) @wraps(func) async def wrapper(*args, **kwargs): return await breaker.execute(func, *args, **kwargs) return wrapper return decorator ``` **5. 降级策略** ```python from typing import Callable, Optional, Dict, Any import asyncio class FallbackStrategy: """降级策略基类""" async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """执行降级逻辑""" raise NotImplementedError class CacheFallback(FallbackStrategy): """缓存降级""" def __init__(self, cache: Dict[str, Any]): self.cache = cache async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """从缓存获取数据""" cache_key = context.get("cache_key") if context else None if cache_key and cache_key in self.cache: return self.cache[cache_key] raise error class DefaultFallback(FallbackStrategy): """默认值降级""" def __init__(self, default_value: Any): self.default_value = default_value async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """返回默认值""" return self.default_value class FallbackManager: """降级管理器""" def __init__(self): self.fallback_strategies: Dict[ErrorType, FallbackStrategy] = {} self.default_fallback: Optional[FallbackStrategy] = None def register_fallback( self, error_type: ErrorType, fallback: FallbackStrategy ): """注册降级策略""" self.fallback_strategies[error_type] = fallback def set_default_fallback(self, fallback: FallbackStrategy): """设置默认降级策略""" self.default_fallback = fallback async def execute_with_fallback( self, func: Callable, context: Dict[str, Any] = None, *args, **kwargs ) -> Any: """带降级执行函数""" try: return await func(*args, **kwargs) except Exception as error: # 转换为 MCP 错误 if not isinstance(error, MCPError): error = InternalError(str(error)) # 查找对应的降级策略 fallback = self.fallback_strategies.get( error.error_type, self.default_fallback ) if fallback: try: return await fallback.execute_fallback(error, context) except Exception as fallback_error: raise fallback_error raise error # 降级装饰器 def fallback( error_type: ErrorType = None, default_value: Any = None ): """降级装饰器""" def decorator(func: Callable): fallback_manager = FallbackManager() if error_type and default_value is not None: fallback_manager.register_fallback( error_type, DefaultFallback(default_value) ) @wraps(func) async def wrapper(*args, **kwargs): return await fallback_manager.execute_with_fallback( func, None, *args, **kwargs ) return wrapper return decorator ``` **6. 综合错误处理示例** ```python from mcp.server import Server class RobustMCPServer(Server): """健壮的 MCP 服务器""" def __init__(self, name: str): super().__init__(name) # 初始化错误处理组件 self.error_handler = ErrorHandler() self.retry_manager = RetryManager(ExponentialBackoffRetry()) self.circuit_breaker = CircuitBreaker() self.fallback_manager = FallbackManager() # 配置错误处理 self._setup_error_handling() def _setup_error_handling(self): """设置错误处理""" # 注册错误处理器 self.error_handler.register_handler( ErrorType.VALIDATION_ERROR, self._handle_validation_error ) self.error_handler.register_handler( ErrorType.RATE_LIMIT_ERROR, self._handle_rate_limit_error ) # 注册降级策略 self.fallback_manager.register_fallback( ErrorType.EXTERNAL_SERVICE_ERROR, CacheFallback({}) ) async def _handle_validation_error( self, error: ValidationError, context: Dict[str, Any] ) -> Dict[str, Any]: """处理验证错误""" return { "error": error.to_dict(), "suggestions": self._get_validation_suggestions(error.details) } async def _handle_rate_limit_error( self, error: RateLimitError, context: Dict[str, Any] ) -> Dict[str, Any]: """处理速率限制错误""" retry_after = error.details.get("retry_after", 60) return { "error": error.to_dict(), "retry_after": retry_after, "message": f"请等待 {retry_after} 秒后重试" } def _get_validation_suggestions( self, details: Dict[str, Any] ) -> List[str]: """获取验证建议""" suggestions = [] # 根据错误详情提供建议 # ... return suggestions @retry(max_attempts=3, delay=1.0) @circuit_breaker(failure_threshold=5, timeout=60.0) @fallback(error_type=ErrorType.EXTERNAL_SERVICE_ERROR, default_value={}) async def call_external_service( self, service_url: str, params: Dict[str, Any] ) -> Dict[str, Any]: """调用外部服务""" try: # 调用外部服务 # ... pass except Exception as error: # 转换为 MCP 错误 raise ExternalServiceError("external", str(error)) ``` **最佳实践:** 1. **错误分类**:正确分类错误类型,便于针对性处理 2. **重试策略**:根据错误类型选择合适的重试策略 3. **熔断机制**:防止级联故障,保护系统稳定性 4. **降级策略**:在故障时提供降级服务,保证基本功能 5. **错误日志**:详细记录错误信息,便于问题排查 6. **监控告警**:监控错误率,及时发现问题 通过完善的错误处理和重试机制,可以确保 MCP 系统的稳定性和可靠性。
服务端 · 2月21日 15:51
MCP 的生态系统和社区支持有哪些?MCP 的生态系统和社区支持对于其发展和采用至关重要。以下是详细的生态系统分析和社区参与方式: **MCP 生态系统架构** MCP 生态系统包括以下组成部分: 1. **核心协议**:MCP 协议规范和实现 2. **客户端库**:各种编程语言的客户端库 3. **服务器实现**:不同平台的服务器实现 4. **工具和插件**:扩展 MCP 功能的工具和插件 5. **文档和教程**:学习资源和最佳实践 6. **社区贡献**:开源项目和社区活动 **1. MCP 客户端库** ```python # Python 客户端库示例 from typing import Dict, Any, Optional, List import asyncio import json class MCPClient: """MCP 客户端""" def __init__(self, server_url: str): self.server_url = server_url self.session_id: Optional[str] = None self.capabilities: Dict[str, Any] = {} async def connect(self) -> bool: """连接到 MCP 服务器""" try: # 初始化连接 response = await self._send_request({ "jsonrpc": "2.0", "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": { "tools": {}, "resources": {}, "prompts": {} } }, "id": 1 }) if "error" in response: print(f"连接失败: {response['error']}") return False # 保存会话信息 self.session_id = response.get("result", {}).get("sessionId") self.capabilities = response.get("result", {}).get("capabilities", {}) # 发送 initialized 通知 await self._send_notification({ "jsonrpc": "2.0", "method": "notifications/initialized" }) return True except Exception as e: print(f"连接错误: {e}") return False async def list_tools(self) -> List[Dict[str, Any]]: """列出可用工具""" response = await self._send_request({ "jsonrpc": "2.0", "method": "tools/list", "id": 2 }) if "error" in response: print(f"获取工具列表失败: {response['error']}") return [] return response.get("result", {}).get("tools", []) async def call_tool( self, name: str, arguments: Dict[str, Any] ) -> Any: """调用工具""" response = await self._send_request({ "jsonrpc": "2.0", "method": "tools/call", "params": { "name": name, "arguments": arguments }, "id": 3 }) if "error" in response: raise Exception(f"工具调用失败: {response['error']}") return response.get("result") async def list_resources(self) -> List[Dict[str, Any]]: """列出可用资源""" response = await self._send_request({ "jsonrpc": "2.0", "method": "resources/list", "id": 4 }) if "error" in response: print(f"获取资源列表失败: {response['error']}") return [] return response.get("result", {}).get("resources", []) async def read_resource(self, uri: str) -> Any: """读取资源""" response = await self._send_request({ "jsonrpc": "2.0", "method": "resources/read", "params": { "uri": uri }, "id": 5 }) if "error" in response: raise Exception(f"读取资源失败: {response['error']}") return response.get("result") async def list_prompts(self) -> List[Dict[str, Any]]: """列出可用提示词""" response = await self._send_request({ "jsonrpc": "2.0", "method": "prompts/list", "id": 6 }) if "error" in response: print(f"获取提示词列表失败: {response['error']}") return [] return response.get("result", {}).get("prompts", []) async def get_prompt( self, name: str, arguments: Dict[str, Any] = None ) -> Any: """获取提示词""" response = await self._send_request({ "jsonrpc": "2.0", "method": "prompts/get", "params": { "name": name, "arguments": arguments or {} }, "id": 7 }) if "error" in response: raise Exception(f"获取提示词失败: {response['error']}") return response.get("result") async def _send_request(self, request: Dict[str, Any]) -> Dict[str, Any]: """发送请求""" # 实现实际的请求发送逻辑 # 这里使用模拟实现 print(f"发送请求: {json.dumps(request, indent=2)}") # 模拟响应 return { "jsonrpc": "2.0", "id": request["id"], "result": {} } async def _send_notification(self, notification: Dict[str, Any]): """发送通知""" print(f"发送通知: {json.dumps(notification, indent=2)}") async def disconnect(self): """断开连接""" await self._send_notification({ "jsonrpc": "2.0", "method": "shutdown" }) self.session_id = None self.capabilities = {} ``` **2. MCP 服务器实现** ```python # MCP 服务器实现示例 from mcp.server import Server from typing import Dict, Any, List class MyMCPServer(Server): """自定义 MCP 服务器""" def __init__(self, name: str = "my-mcp-server"): super().__init__(name) self._setup_tools() self._setup_resources() self._setup_prompts() def _setup_tools(self): """设置工具""" @self.tool( name="calculate", description="执行数学计算" ) async def calculate( expression: str, operation: str = "evaluate" ) -> str: """计算工具""" try: if operation == "evaluate": result = eval(expression) return f"计算结果: {result}" else: return "不支持的操作" except Exception as e: return f"计算错误: {str(e)}" @self.tool( name="search", description="搜索信息" ) async def search( query: str, limit: int = 10 ) -> List[Dict[str, Any]]: """搜索工具""" # 实现搜索逻辑 results = [ { "title": f"结果 {i}", "url": f"https://example.com/{i}", "snippet": f"这是关于 {query} 的结果 {i}" } for i in range(min(limit, 10)) ] return results def _setup_resources(self): """设置资源""" @self.resource( uri="config://settings", name="配置设置", description="服务器配置" ) async def get_config() -> Dict[str, Any]: """获取配置""" return { "version": "1.0.0", "features": ["tools", "resources", "prompts"], "limits": { "max_requests_per_minute": 100, "max_concurrent_requests": 10 } } @self.resource( uri="data://statistics", name="统计数据", description="服务器统计信息" ) async def get_statistics() -> Dict[str, Any]: """获取统计信息""" return { "total_requests": 1000, "successful_requests": 950, "failed_requests": 50, "average_response_time": 0.5 } def _setup_prompts(self): """设置提示词""" @self.prompt( name="code_review", description="代码审查提示词" ) async def code_review_prompt( language: str = "Python", focus: str = "performance" ) -> str: """代码审查提示词""" return f""" 请审查以下 {language} 代码,重点关注 {focus}: 1. 代码质量和可读性 2. {focus} 相关的问题 3. 潜在的 bug 和安全问题 4. 改进建议 请提供详细的审查意见和改进建议。 """ @self.prompt( name="documentation", description="文档生成提示词" ) async def documentation_prompt( doc_type: str = "API", format: str = "Markdown" ) -> str: """文档生成提示词""" return f""" 请为以下代码生成 {doc_type} 文档,使用 {format} 格式: 1. 功能概述 2. 参数说明 3. 返回值说明 4. 使用示例 5. 注意事项 确保文档清晰、准确、易于理解。 """ # 启动服务器 async def start_server(): """启动 MCP 服务器""" server = MyMCPServer() # 启动服务器 await server.start() print("MCP 服务器已启动") # 保持服务器运行 try: while True: await asyncio.sleep(1) except KeyboardInterrupt: print("正在关闭服务器...") await server.stop() print("服务器已关闭") if __name__ == "__main__": asyncio.run(start_server()) ``` **3. MCP 社区项目** ```python # 社区贡献的 MCP 工具和插件示例 class MCPCommunityTools: """社区 MCP 工具集合""" @staticmethod def get_popular_tools() -> List[Dict[str, Any]]: """获取热门工具""" return [ { "name": "mcp-database", "description": "数据库操作工具", "author": "community", "stars": 150, "url": "https://github.com/example/mcp-database" }, { "name": "mcp-filesystem", "description": "文件系统操作工具", "author": "community", "stars": 120, "url": "https://github.com/example/mcp-filesystem" }, { "name": "mcp-web-scraper", "description": "网页抓取工具", "author": "community", "stars": 100, "url": "https://github.com/example/mcp-web-scraper" }, { "name": "mcp-api-client", "description": "API 客户端工具", "author": "community", "stars": 90, "url": "https://github.com/example/mcp-api-client" } ] @staticmethod def get_server_implementations() -> List[Dict[str, Any]]: """获取服务器实现""" return [ { "name": "mcp-server-python", "language": "Python", "description": "Python MCP 服务器实现", "version": "1.0.0" }, { "name": "mcp-server-nodejs", "language": "JavaScript", "description": "Node.js MCP 服务器实现", "version": "1.0.0" }, { "name": "mcp-server-go", "language": "Go", "description": "Go MCP 服务器实现", "version": "0.9.0" }, { "name": "mcp-server-rust", "language": "Rust", "description": "Rust MCP 服务器实现", "version": "0.8.0" } ] @staticmethod def get_client_libraries() -> List[Dict[str, Any]]: """获取客户端库""" return [ { "name": "mcp-client-python", "language": "Python", "description": "Python MCP 客户端库", "version": "1.0.0", "pip": "pip install mcp-client" }, { "name": "mcp-client-js", "language": "JavaScript", "description": "JavaScript MCP 客户端库", "version": "1.0.0", "npm": "npm install @mcp/client" }, { "name": "mcp-client-java", "language": "Java", "description": "Java MCP 客户端库", "version": "0.9.0", "maven": "implementation 'com.mcp:client:0.9.0'" } ] ``` **4. MCP 文档和教程** ```python class MCPDocumentation: """MCP 文档资源""" @staticmethod def get_official_docs() -> List[Dict[str, Any]]: """获取官方文档""" return [ { "title": "MCP 协议规范", "url": "https://spec.modelcontextprotocol.io", "description": "MCP 协议的完整技术规范", "language": "en" }, { "title": "快速入门指南", "url": "https://docs.modelcontextprotocol.io/quickstart", "description": "快速开始使用 MCP 的指南", "language": "zh" }, { "title": "服务器实现指南", "url": "https://docs.modelcontextprotocol.io/server", "description": "如何实现 MCP 服务器", "language": "zh" }, { "title": "客户端开发指南", "url": "https://docs.modelcontextprotocol.io/client", "description": "如何开发 MCP 客户端", "language": "zh" } ] @staticmethod def get_tutorials() -> List[Dict[str, Any]]: """获取教程""" return [ { "title": "构建第一个 MCP 服务器", "url": "https://docs.modelcontextprotocol.io/tutorials/first-server", "description": "从零开始构建 MCP 服务器", "difficulty": "beginner", "duration": "30 minutes" }, { "title": "MCP 工具开发", "url": "https://docs.modelcontextprotocol.io/tutorials/tool-development", "description": "开发自定义 MCP 工具", "difficulty": "intermediate", "duration": "1 hour" }, { "title": "MCP 集成最佳实践", "url": "https://docs.modelcontextprotocol.io/tutorials/best-practices", "description": "MCP 集成的最佳实践", "difficulty": "advanced", "duration": "2 hours" } ] @staticmethod def get_examples() -> List[Dict[str, Any]]: """获取示例代码""" return [ { "title": "简单计算器服务器", "url": "https://github.com/modelcontextprotocol/examples/tree/main/calculator", "description": "一个简单的计算器 MCP 服务器示例", "language": "Python" }, { "title": "文件系统服务器", "url": "https://github.com/modelcontextprotocol/examples/tree/main/filesystem", "description": "文件系统操作 MCP 服务器示例", "language": "Python" }, { "title": "数据库集成服务器", "url": "https://github.com/modelcontextprotocol/examples/tree/main/database", "description": "数据库集成 MCP 服务器示例", "language": "Python" } ] ``` **5. MCP 社区参与** ```python class MCPCommunity: """MCP 社区参与""" @staticmethod def get_community_channels() -> List[Dict[str, Any]]: """获取社区渠道""" return [ { "name": "GitHub", "url": "https://github.com/modelcontextprotocol", "description": "MCP GitHub 组织", "type": "code" }, { "name": "Discord", "url": "https://discord.gg/mcp", "description": "MCP Discord 社区", "type": "chat" }, { "name": "Twitter", "url": "https://twitter.com/modelcontext", "description": "MCP Twitter 账号", "type": "social" }, { "name": "Reddit", "url": "https://reddit.com/r/modelcontextprotocol", "description": "MCP Reddit 社区", "type": "forum" } ] @staticmethod def get_contribution_guidelines() -> Dict[str, Any]: """获取贡献指南""" return { "code_of_conduct": "https://github.com/modelcontextprotocol/.github/blob/main/CODE_OF_CONDUCT.md", "contributing": "https://github.com/modelcontextprotocol/.github/blob/main/CONTRIBUTING.md", "pull_request_template": "https://github.com/modelcontextprotocol/.github/blob/main/PULL_REQUEST_TEMPLATE.md", "issue_template": "https://github.com/modelcontextprotocol/.github/blob/main/ISSUE_TEMPLATE.md" } @staticmethod def get_ways_to_contribute() -> List[Dict[str, Any]]: """获取贡献方式""" return [ { "type": "代码贡献", "description": "提交代码改进和 bug 修复", "difficulty": "intermediate" }, { "type": "文档改进", "description": "改进和翻译文档", "difficulty": "beginner" }, { "type": "问题报告", "description": "报告 bug 和提出功能请求", "difficulty": "beginner" }, { "type": "工具开发", "description": "开发新的 MCP 工具和插件", "difficulty": "advanced" }, { "type": "社区支持", "description": "在社区中回答问题和提供帮助", "difficulty": "intermediate" } ] ``` **最佳实践:** 1. **参与社区**:积极参与 MCP 社区讨论和贡献 2. **学习资源**:充分利用官方文档和教程资源 3. **开源贡献**:为 MCP 生态系统贡献代码和工具 4. **分享经验**:分享使用 MCP 的经验和最佳实践 5. **反馈改进**:提供反馈帮助改进 MCP 协议和工具 6. **持续学习**:跟踪 MCP 的最新发展和更新 通过积极参与 MCP 生态系统和社区,可以更好地利用 MCP 的功能并为社区做出贡献。
服务端 · 2月21日 15:15
MCP 的性能监控和优化有哪些策略?MCP 的性能监控和优化对于确保系统高效运行至关重要。以下是详细的监控策略和优化方法: **性能监控架构** MCP 性能监控应考虑以下方面: 1. **指标收集**:收集系统运行的关键指标 2. **性能分析**:分析性能瓶颈和优化机会 3. **实时监控**:实时监控系统状态和性能 4. **告警机制**:及时发现和通知性能问题 5. **优化策略**:基于监控数据进行性能优化 **1. 指标收集系统** ```python from dataclasses import dataclass from typing import Dict, List, Optional from datetime import datetime import time @dataclass class Metric: """性能指标""" name: str value: float timestamp: datetime tags: Dict[str, str] = None def __post_init__(self): if self.tags is None: self.tags = {} class MetricsCollector: """指标收集器""" def __init__(self): self.metrics: List[Metric] = [] self.counters: Dict[str, int] = {} self.gauges: Dict[str, float] = {} self.histograms: Dict[str, List[float]] = {} def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None): """增加计数器""" key = self._make_key(name, tags) self.counters[key] = self.counters.get(key, 0) + value # 记录指标 self._record_metric(name, self.counters[key], tags) def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None): """设置仪表值""" key = self._make_key(name, tags) self.gauges[key] = value # 记录指标 self._record_metric(name, value, tags) def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None): """记录直方图值""" key = self._make_key(name, tags) if key not in self.histograms: self.histograms[key] = [] self.histograms[key].append(value) # 限制历史记录大小 if len(self.histograms[key]) > 1000: self.histograms[key] = self.histograms[key][-1000:] # 记录指标 self._record_metric(name, value, tags) def _record_metric(self, name: str, value: float, tags: Dict[str, str] = None): """记录指标""" metric = Metric( name=name, value=value, timestamp=datetime.now(), tags=tags or {} ) self.metrics.append(metric) # 限制指标历史大小 if len(self.metrics) > 10000: self.metrics = self.metrics[-10000:] def _make_key(self, name: str, tags: Dict[str, str] = None) -> str: """生成指标键""" if not tags: return name tag_str = ",".join([f"{k}={v}" for k, v in sorted(tags.items())]) return f"{name}{{{tag_str}}}" def get_metrics( self, name: str = None, since: datetime = None ) -> List[Metric]: """获取指标""" metrics = self.metrics if name: metrics = [m for m in metrics if m.name == name] if since: metrics = [m for m in metrics if m.timestamp >= since] return metrics def get_histogram_stats( self, name: str, tags: Dict[str, str] = None ) -> Optional[Dict]: """获取直方图统计""" key = self._make_key(name, tags) if key not in self.histograms: return None values = self.histograms[key] if not values: return None sorted_values = sorted(values) return { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values), "p50": self._percentile(sorted_values, 50), "p90": self._percentile(sorted_values, 90), "p95": self._percentile(sorted_values, 95), "p99": self._percentile(sorted_values, 99), } def _percentile(self, sorted_values: List[float], percentile: int) -> float: """计算百分位数""" if not sorted_values: return 0.0 index = int(len(sorted_values) * percentile / 100) return sorted_values[min(index, len(sorted_values) - 1)] ``` **2. 性能分析器** ```python from functools import wraps from typing import Callable, Optional import time class PerformanceProfiler: """性能分析器""" def __init__(self, metrics_collector: MetricsCollector): self.metrics_collector = metrics_collector def profile_function( self, metric_name: str, tags: Dict[str, str] = None ): """函数性能分析装饰器""" def decorator(func: Callable): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) # 记录执行时间 execution_time = time.time() - start_time self.metrics_collector.record_histogram( f"{metric_name}_duration", execution_time, tags ) # 记录成功计数 self.metrics_collector.increment_counter( f"{metric_name}_success", tags=tags ) return result except Exception as e: # 记录失败计数 self.metrics_collector.increment_counter( f"{metric_name}_error", tags=tags ) raise e @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) # 记录执行时间 execution_time = time.time() - start_time self.metrics_collector.record_histogram( f"{metric_name}_duration", execution_time, tags ) # 记录成功计数 self.metrics_collector.increment_counter( f"{metric_name}_success", tags=tags ) return result except Exception as e: # 记录失败计数 self.metrics_collector.increment_counter( f"{metric_name}_error", tags=tags ) raise e # 根据函数类型返回对应的包装器 if asyncio.iscoroutinefunction(func): return async_wrapper else: return sync_wrapper return decorator def profile_context( self, metric_name: str, tags: Dict[str, str] = None ): """上下文管理器性能分析""" class ProfileContext: def __init__(self, profiler, name, tags): self.profiler = profiler self.name = name self.tags = tags self.start_time = None def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): execution_time = time.time() - self.start_time self.profiler.metrics_collector.record_histogram( f"{self.name}_duration", execution_time, self.tags ) if exc_type is None: self.profiler.metrics_collector.increment_counter( f"{self.name}_success", tags=self.tags ) else: self.profiler.metrics_collector.increment_counter( f"{self.name}_error", tags=self.tags ) return False return ProfileContext(self, metric_name, tags) ``` **3. 实时监控系统** ```python import asyncio from typing import Dict, List, Callable, Optional class RealTimeMonitor: """实时监控器""" def __init__( self, metrics_collector: MetricsCollector, check_interval: int = 5 ): self.metrics_collector = metrics_collector self.check_interval = check_interval self.alerts: List[Dict] = [] self.alert_rules: List[Dict] = [] self.subscribers: List[Callable] = [] self.running = False def add_alert_rule( self, name: str, metric_name: str, condition: str, threshold: float, severity: str = "warning" ): """添加告警规则""" self.alert_rules.append({ "name": name, "metric_name": metric_name, "condition": condition, "threshold": threshold, "severity": severity }) def subscribe(self, callback: Callable): """订阅告警""" self.subscribers.append(callback) def unsubscribe(self, callback: Callable): """取消订阅""" if callback in self.subscribers: self.subscribers.remove(callback) async def start(self): """启动监控""" self.running = True while self.running: await self.check_alerts() await asyncio.sleep(self.check_interval) async def stop(self): """停止监控""" self.running = False async def check_alerts(self): """检查告警""" for rule in self.alert_rules: try: alert = await self._evaluate_rule(rule) if alert: self.alerts.append(alert) await self._notify_subscribers(alert) except Exception as e: print(f"检查告警规则失败: {rule['name']}, 错误: {e}") async def _evaluate_rule(self, rule: Dict) -> Optional[Dict]: """评估告警规则""" metric_name = rule["metric_name"] condition = rule["condition"] threshold = rule["threshold"] # 获取最近的指标 recent_metrics = self.metrics_collector.get_metrics( metric_name, since=datetime.now() - timedelta(minutes=1) ) if not recent_metrics: return None # 计算聚合值 values = [m.value for m in recent_metrics] avg_value = sum(values) / len(values) # 检查条件 triggered = False if condition == "greater_than": triggered = avg_value > threshold elif condition == "less_than": triggered = avg_value < threshold elif condition == "equals": triggered = avg_value == threshold elif condition == "not_equals": triggered = avg_value != threshold if triggered: return { "rule_name": rule["name"], "metric_name": metric_name, "current_value": avg_value, "threshold": threshold, "condition": condition, "severity": rule["severity"], "timestamp": datetime.now() } return None async def _notify_subscribers(self, alert: Dict): """通知订阅者""" for callback in self.subscribers: try: await callback(alert) except Exception as e: print(f"通知订阅者失败: {e}") def get_recent_alerts( self, since: datetime = None, severity: str = None ) -> List[Dict]: """获取最近的告警""" alerts = self.alerts if since: alerts = [a for a in alerts if a["timestamp"] >= since] if severity: alerts = [a for a in alerts if a["severity"] == severity] return alerts ``` **4. 性能优化策略** ```python from typing import Dict, List, Optional import asyncio from functools import lru_cache class PerformanceOptimizer: """性能优化器""" def __init__(self, metrics_collector: MetricsCollector): self.metrics_collector = metrics_collector self.optimization_strategies = {} def register_strategy( self, name: str, strategy: Callable, priority: int = 0 ): """注册优化策略""" self.optimization_strategies[name] = { "strategy": strategy, "priority": priority } async def optimize(self, context: Dict) -> Dict: """执行优化""" results = {} # 按优先级排序策略 sorted_strategies = sorted( self.optimization_strategies.items(), key=lambda x: x[1]["priority"], reverse=True ) for name, strategy_info in sorted_strategies: try: result = await strategy_info["strategy"](context) results[name] = result except Exception as e: results[name] = {"error": str(e)} return results # 缓存优化策略 class CacheOptimizer: """缓存优化器""" def __init__(self, max_size: int = 1000, ttl: int = 300): self.max_size = max_size self.ttl = ttl self.cache: Dict[str, tuple] = {} def get(self, key: str) -> Optional[any]: """获取缓存值""" if key not in self.cache: return None value, timestamp = self.cache[key] # 检查是否过期 if time.time() - timestamp > self.ttl: del self.cache[key] return None return value def set(self, key: str, value: any): """设置缓存值""" # 检查缓存大小 if len(self.cache) >= self.max_size: self._evict() self.cache[key] = (value, time.time()) def _evict(self): """淘汰缓存项""" # 简单实现:随机淘汰 if self.cache: key = next(iter(self.cache)) del self.cache[key] # 连接池优化 class ConnectionPoolOptimizer: """连接池优化器""" def __init__(self, min_size: int = 5, max_size: int = 20): self.min_size = min_size self.max_size = max_size self.pool = asyncio.Queue() self.created = 0 self.lock = asyncio.Lock() async def acquire(self) -> any: """获取连接""" try: # 尝试从池中获取连接 connection = await asyncio.wait_for( self.pool.get(), timeout=1.0 ) return connection except asyncio.TimeoutError: # 池中没有可用连接,创建新连接 async with self.lock: if self.created < self.max_size: connection = await self._create_connection() self.created += 1 return connection # 等待其他连接释放 return await self.pool.get() async def release(self, connection: any): """释放连接""" await self.pool.put(connection) async def _create_connection(self) -> any: """创建新连接""" # 实现具体的连接创建逻辑 pass # 批处理优化 class BatchProcessor: """批处理器""" def __init__(self, batch_size: int = 100, timeout: float = 1.0): self.batch_size = batch_size self.timeout = timeout self.buffer: List = [] self.lock = asyncio.Lock() async def add(self, item: any): """添加项目到批处理""" async with self.lock: self.buffer.append(item) if len(self.buffer) >= self.batch_size: await self._flush() async def _flush(self): """刷新缓冲区""" if not self.buffer: return batch = self.buffer.copy() self.buffer.clear() # 处理批次 await self._process_batch(batch) async def _process_batch(self, batch: List): """处理批次""" # 实现具体的批处理逻辑 pass async def start_periodic_flush(self): """启动定期刷新""" while True: await asyncio.sleep(self.timeout) async with self.lock: await self._flush() ``` **5. 性能报告生成器** ```python from typing import Dict, List from datetime import datetime, timedelta class PerformanceReportGenerator: """性能报告生成器""" def __init__(self, metrics_collector: MetricsCollector): self.metrics_collector = metrics_collector def generate_report( self, start_time: datetime, end_time: datetime ) -> Dict: """生成性能报告""" report = { "period": { "start": start_time, "end": end_time }, "summary": {}, "metrics": {}, "alerts": [], "recommendations": [] } # 生成摘要 report["summary"] = self._generate_summary(start_time, end_time) # 生成指标详情 report["metrics"] = self._generate_metrics_detail( start_time, end_time ) # 生成优化建议 report["recommendations"] = self._generate_recommendations( report["metrics"] ) return report def _generate_summary( self, start_time: datetime, end_time: datetime ) -> Dict: """生成摘要""" summary = { "total_requests": 0, "successful_requests": 0, "failed_requests": 0, "average_response_time": 0.0, "p95_response_time": 0.0, "p99_response_time": 0.0 } # 获取请求指标 success_metrics = self.metrics_collector.get_metrics( "request_success", since=start_time ) error_metrics = self.metrics_collector.get_metrics( "request_error", since=start_time ) duration_metrics = self.metrics_collector.get_metrics( "request_duration", since=start_time ) summary["successful_requests"] = len(success_metrics) summary["failed_requests"] = len(error_metrics) summary["total_requests"] = ( summary["successful_requests"] + summary["failed_requests"] ) if duration_metrics: durations = [m.value for m in duration_metrics] summary["average_response_time"] = sum(durations) / len(durations) summary["p95_response_time"] = self._percentile(durations, 95) summary["p99_response_time"] = self._percentile(durations, 99) return summary def _generate_metrics_detail( self, start_time: datetime, end_time: datetime ) -> Dict: """生成指标详情""" metrics_detail = {} # 获取所有指标名称 metric_names = set(m.name for m in self.metrics_collector.metrics) for metric_name in metric_names: metrics = self.metrics_collector.get_metrics( metric_name, since=start_time ) if not metrics: continue values = [m.value for m in metrics] metrics_detail[metric_name] = { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values), "p50": self._percentile(values, 50), "p90": self._percentile(values, 90), "p95": self._percentile(values, 95), "p99": self._percentile(values, 99), } return metrics_detail def _generate_recommendations( self, metrics_detail: Dict ) -> List[str]: """生成优化建议""" recommendations = [] # 检查响应时间 if "request_duration" in metrics_detail: avg_response_time = metrics_detail["request_duration"]["avg"] p95_response_time = metrics_detail["request_duration"]["p95"] if avg_response_time > 1.0: recommendations.append( "平均响应时间超过 1 秒,建议优化慢查询或增加缓存" ) if p95_response_time > 5.0: recommendations.append( "P95 响应时间超过 5 秒,建议检查性能瓶颈" ) # 检查错误率 if "request_error" in metrics_detail: error_count = metrics_detail["request_error"]["count"] if error_count > 100: recommendations.append( "错误数量较多,建议检查错误日志并修复问题" ) return recommendations def _percentile(self, sorted_values: List[float], percentile: int) -> float: """计算百分位数""" if not sorted_values: return 0.0 sorted_values = sorted(sorted_values) index = int(len(sorted_values) * percentile / 100) return sorted_values[min(index, len(sorted_values) - 1)] ``` **最佳实践:** 1. **全面监控**:监控所有关键指标,包括请求、响应时间、错误率等 2. **实时告警**:设置合理的告警阈值,及时发现性能问题 3. **性能分析**:定期分析性能数据,识别优化机会 4. **缓存优化**:合理使用缓存减少重复计算和查询 5. **连接池**:使用连接池优化数据库和网络连接 6. **批处理**:使用批处理提高吞吐量 通过完善的性能监控和优化,可以确保 MCP 系统高效稳定运行。
服务端 · 2月19日 21:43
MCP 的版本管理和兼容性如何处理?MCP 的版本管理和兼容性对于确保系统的稳定性和可维护性至关重要。以下是详细的版本管理策略和兼容性处理方法: **版本管理策略** MCP 版本管理应考虑以下方面: 1. **语义化版本控制**:使用语义化版本号(SemVer) 2. **向后兼容性**:确保新版本向后兼容旧版本 3. **废弃策略**:明确的功能废弃和移除流程 4. **迁移指南**:提供详细的版本迁移指南 5. **版本协商**:客户端和服务器的版本协商机制 **1. 语义化版本控制** ```python from dataclasses import dataclass from typing import Optional import re @dataclass class Version: """版本号""" major: int minor: int patch: int prerelease: Optional[str] = None build_metadata: Optional[str] = None def __str__(self) -> str: version_str = f"{self.major}.{self.minor}.{self.patch}" if self.prerelease: version_str += f"-{self.prerelease}" if self.build_metadata: version_str += f"+{self.build_metadata}" return version_str @classmethod def parse(cls, version_str: str) -> 'Version': """解析版本字符串""" # 匹配语义化版本格式 pattern = r'^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?$' match = re.match(pattern, version_str) if not match: raise ValueError(f"无效的版本格式: {version_str}") major = int(match.group(1)) minor = int(match.group(2)) patch = int(match.group(3)) prerelease = match.group(4) build_metadata = match.group(5) return cls( major=major, minor=minor, patch=patch, prerelease=prerelease, build_metadata=build_metadata ) def is_compatible(self, other: 'Version') -> bool: """检查版本兼容性""" # 主版本号相同,次版本号向后兼容 if self.major != other.major: return False # 如果当前版本 >= 其他版本,则兼容 if (self.minor, self.patch) >= (other.minor, other.patch): return True return False def __lt__(self, other: 'Version') -> bool: """版本比较""" if self.major != other.major: return self.major < other.major if self.minor != other.minor: return self.minor < other.minor if self.patch != other.patch: return self.patch < other.patch # 预发布版本比较 if self.prerelease and not other.prerelease: return True if not self.prerelease and other.prerelease: return False if self.prerelease and other.prerelease: return self.prerelease < other.prerelease return False def __eq__(self, other: 'Version') -> bool: """版本相等""" return ( self.major == other.major and self.minor == other.minor and self.patch == other.patch and self.prerelease == other.prerelease ) # 当前 MCP 版本 MCP_VERSION = Version(1, 0, 0) ``` **2. 版本协商机制** ```python from typing import Dict, List, Optional class VersionNegotiator: """版本协商器""" def __init__(self, server_version: Version): self.server_version = server_version self.supported_versions: List[Version] = [ Version(1, 0, 0), Version(0, 9, 0), ] def negotiate_version( self, client_versions: List[Version] ) -> Optional[Version]: """协商最佳版本""" # 找到客户端支持的最高版本 client_versions_sorted = sorted(client_versions, reverse=True) for client_version in client_versions_sorted: # 检查服务器是否支持此版本 for server_version in self.supported_versions: if server_version.is_compatible(client_version): return server_version # 没有找到兼容版本 return None def get_server_info(self) -> Dict: """获取服务器版本信息""" return { "version": str(self.server_version), "supported_versions": [ str(v) for v in self.supported_versions ] } class MCPClient: """MCP 客户端""" def __init__(self, supported_versions: List[Version]): self.supported_versions = supported_versions self.negotiated_version: Optional[Version] = None async def connect(self, server_info: Dict) -> bool: """连接到服务器并协商版本""" server_version = Version.parse(server_info["version"]) server_supported_versions = [ Version.parse(v) for v in server_info["supported_versions"] ] # 创建临时协商器 negotiator = VersionNegotiator(server_version) # 协商版本 self.negotiated_version = negotiator.negotiate_version( self.supported_versions ) if not self.negotiated_version: print("无法协商兼容的版本") return False print(f"协商版本: {self.negotiated_version}") return True ``` **3. 功能废弃管理** ```python from enum import Enum from typing import Dict, List, Optional from datetime import datetime class DeprecationStatus(Enum): """废弃状态""" STABLE = "stable" DEPRECATED = "deprecated" REMOVED = "removed" @dataclass class DeprecationInfo: """废弃信息""" status: DeprecationStatus deprecated_in: Optional[Version] = None removed_in: Optional[Version] = None replacement: Optional[str] = None message: Optional[str] = None class DeprecationManager: """废弃管理器""" def __init__(self): self.deprecations: Dict[str, DeprecationInfo] = {} def deprecate_feature( self, feature_name: str, deprecated_in: Version, removed_in: Version, replacement: str = None, message: str = None ): """标记功能为废弃""" self.deprecations[feature_name] = DeprecationInfo( status=DeprecationStatus.DEPRECATED, deprecated_in=deprecated_in, removed_in=removed_in, replacement=replacement, message=message ) def remove_feature(self, feature_name: str, removed_in: Version): """移除功能""" if feature_name in self.deprecations: self.deprecations[feature_name].status = DeprecationStatus.REMOVED self.deprecations[feature_name].removed_in = removed_in def check_feature( self, feature_name: str, current_version: Version ) -> tuple: """检查功能状态""" if feature_name not in self.deprecations: return True, None info = self.deprecations[feature_name] if info.status == DeprecationStatus.REMOVED: return False, f"功能 {feature_name} 已在版本 {info.removed_in} 中移除" if info.status == DeprecationStatus.DEPRECATED: if current_version >= info.removed_in: return False, f"功能 {feature_name} 已在版本 {info.removed_in} 中移除" warning = f"功能 {feature_name} 已废弃,将在版本 {info.removed_in} 中移除" if info.replacement: warning += f",请使用 {info.replacement}" return True, warning return True, None def get_deprecation_warnings( self, current_version: Version ) -> List[str]: """获取所有废弃警告""" warnings = [] for feature_name, info in self.deprecations.items(): if info.status == DeprecationStatus.DEPRECATED: if current_version < info.removed_in: warning = f"功能 {feature_name} 已废弃" if info.replacement: warning += f",请使用 {info.replacement}" warnings.append(warning) return warnings ``` **4. 版本迁移指南** ```python from typing import Dict, List, Callable class MigrationGuide: """版本迁移指南""" def __init__(self): self.migrations: Dict[Version, List[Migration]] = {} def add_migration( self, from_version: Version, to_version: Version, migration_func: Callable, description: str ): """添加迁移步骤""" if from_version not in self.migrations: self.migrations[from_version] = [] migration = Migration( from_version=from_version, to_version=to_version, func=migration_func, description=description ) self.migrations[from_version].append(migration) def get_migration_path( self, from_version: Version, to_version: Version ) -> List[Migration]: """获取迁移路径""" if from_version == to_version: return [] # 简单实现:直接迁移 if from_version in self.migrations: for migration in self.migrations[from_version]: if migration.to_version == to_version: return [migration] # 复杂实现:查找多步迁移路径 return self._find_migration_path(from_version, to_version) def _find_migration_path( self, from_version: Version, to_version: Version, visited: set = None ) -> List[Migration]: """递归查找迁移路径""" if visited is None: visited = set() if from_version in visited: return [] visited.add(from_version) if from_version == to_version: return [] if from_version not in self.migrations: return [] for migration in self.migrations[from_version]: path = self._find_migration_path( migration.to_version, to_version, visited.copy() ) if path is not None: return [migration] + path return [] async def execute_migration( self, from_version: Version, to_version: Version, context: Dict ) -> bool: """执行迁移""" migration_path = self.get_migration_path(from_version, to_version) if not migration_path: print(f"无法找到从 {from_version} 到 {to_version} 的迁移路径") return False print(f"开始迁移: {from_version} -> {to_version}") for migration in migration_path: print(f"执行迁移: {migration.description}") try: await migration.func(context) print(f"迁移完成: {migration.description}") except Exception as e: print(f"迁移失败: {migration.description}, 错误: {e}") return False print(f"迁移完成: {from_version} -> {to_version}") return True @dataclass class Migration: """迁移步骤""" from_version: Version to_version: Version func: Callable description: str ``` **5. 版本兼容性检查** ```python from typing import Dict, List, Tuple class CompatibilityChecker: """兼容性检查器""" def __init__(self, current_version: Version): self.current_version = current_version self.compatibility_matrix: Dict[Version, List[Version]] = {} def add_compatibility( self, server_version: Version, compatible_client_versions: List[Version] ): """添加兼容性规则""" self.compatibility_matrix[server_version] = compatible_client_versions def check_compatibility( self, client_version: Version ) -> Tuple[bool, Optional[str]]: """检查客户端版本是否兼容""" # 检查服务器是否支持此版本 if self.current_version in self.compatibility_matrix: compatible_versions = self.compatibility_matrix[self.current_version] for compatible_version in compatible_versions: if client_version.is_compatible(compatible_version): return True, None # 使用默认兼容性规则 if self.current_version.is_compatible(client_version): return True, None return False, f"客户端版本 {client_version} 与服务器版本 {self.current_version} 不兼容" def get_compatible_versions(self) -> List[Version]: """获取所有兼容的客户端版本""" compatible_versions = [] if self.current_version in self.compatibility_matrix: compatible_versions = self.compatibility_matrix[self.current_version] return compatible_versions ``` **6. 版本信息 API** ```python from fastapi import FastAPI from typing import Dict class VersionInfoAPI: """版本信息 API""" def __init__( self, current_version: Version, supported_versions: List[Version], deprecation_manager: DeprecationManager, migration_guide: MigrationGuide ): self.current_version = current_version self.supported_versions = supported_versions self.deprecation_manager = deprecation_manager self.migration_guide = migration_guide def setup_routes(self, app: FastAPI): """设置路由""" @app.get("/version") async def get_version() -> Dict: """获取当前版本信息""" return { "version": str(self.current_version), "supported_versions": [str(v) for v in self.supported_versions], "deprecation_warnings": self.deprecation_manager.get_deprecation_warnings( self.current_version ) } @app.get("/version/compatibility/{client_version}") async def check_compatibility(client_version: str) -> Dict: """检查客户端版本兼容性""" checker = CompatibilityChecker(self.current_version) try: version = Version.parse(client_version) except ValueError: return { "compatible": False, "message": "无效的版本格式" } compatible, message = checker.check_compatibility(version) return { "compatible": compatible, "message": message, "server_version": str(self.current_version) } @app.get("/version/migration/{from_version}/{to_version}") async def get_migration_path( from_version: str, to_version: str ) -> Dict: """获取迁移路径""" try: from_ver = Version.parse(from_version) to_ver = Version.parse(to_version) except ValueError: return { "success": False, "message": "无效的版本格式" } path = self.migration_guide.get_migration_path(from_ver, to_ver) return { "success": True, "migration_path": [ { "from": str(m.from_version), "to": str(m.to_version), "description": m.description } for m in path ] } ``` **最佳实践:** 1. **语义化版本**:严格遵循语义化版本控制规范 2. **向后兼容**:确保新版本向后兼容旧版本 3. **渐进废弃**:提前通知功能废弃,给用户迁移时间 4. **文档完善**:提供详细的版本迁移指南 5. **版本协商**:实现客户端和服务器的版本协商机制 6. **测试覆盖**:为每个版本编写兼容性测试 通过完善的版本管理和兼容性处理,可以确保 MCP 系统的稳定性和可维护性。
服务端 · 2月19日 21:43
MCP 的未来发展趋势是什么?有哪些挑战和机遇?MCP 作为新兴的 AI 集成协议,具有广阔的发展前景和潜力。以下是 MCP 的未来发展趋势: **1. 标准化推进** - **行业认可**:获得更多 AI 模型提供商和企业的认可 - **协议完善**:持续完善协议规范,解决现有局限性 - **标准化组织**:可能提交给标准化组织(如 W3C、IETF)进行标准化 - **互操作性增强**:与现有协议(如 OpenAPI、GraphQL)的互操作性 **2. 生态系统扩展** - **更多语言支持**:扩展到 Rust、Java、C#、PHP 等更多编程语言 - **服务器生态**:社区贡献更多针对特定领域的 MCP 服务器 - **客户端集成**:更多 AI 应用和平台原生支持 MCP - **工具库丰富**:提供更多预构建的工具和资源 **3. 性能优化** - **协议优化**:引入二进制协议、压缩、批量操作等优化 - **异步增强**:更强大的异步和流式处理能力 - **缓存机制**:智能缓存策略减少重复计算 - **边缘计算**:支持边缘节点部署,降低延迟 **4. 安全性增强** - **高级认证**:支持 OAuth 2.0、SAML 等企业级认证 - **细粒度权限**:更精细的访问控制和权限管理 - **安全审计**:完整的安全审计和合规支持 - **加密增强**:端到端加密和密钥管理 **5. 功能扩展** - **实时通信**:支持 WebSocket 等实时双向通信 - **流式处理**:更好的流式数据处理能力 - **事件驱动**:支持事件订阅和推送机制 - **多模态支持**:增强对图像、音频、视频等多模态数据的支持 **6. 企业级特性** - **多租户支持**:完善的多租户隔离和管理 - **高可用性**:内置高可用和灾难恢复机制 - **可观测性**:完整的监控、日志和追踪能力 - **治理工具**:提供企业级治理和管理工具 **7. AI 模型集成** - **更多模型支持**:支持更多开源和商业 AI 模型 - **模型适配器**:提供模型适配器简化集成 - **性能优化**:针对不同模型的性能优化 - **成本控制**:智能的成本控制和优化 **8. 开发者体验** - **更好的工具**:更强大的开发、测试和调试工具 - **文档完善**:更全面和易懂的文档 - **示例丰富**:更多实际应用场景的示例 - **社区支持**:活跃的社区支持和交流 **9. 应用场景拓展** - **企业应用**:更多企业级应用场景 - **物联网**:IoT 设备和系统的集成 - **边缘 AI**:边缘计算和 AI 的结合 - **自动化**:更广泛的自动化应用 **10. 挑战和机遇** **挑战:** - 与现有协议的竞争和兼容性 - 社区建设和生态发展 - 性能和可扩展性的平衡 - 安全性和易用性的权衡 **机遇:** - 成为 AI 集成的行业标准 - 推动AI应用的大规模部署 - 促进AI技术的民主化 - 创造新的商业模式和机会 **预测:** 未来 2-3 年内,MCP 有望成为 AI 模型与外部系统集成的主流标准之一,被广泛采用于企业级应用、开发工具和各种 AI 产品中。其开放性和标准化特性将推动整个 AI 生态系统的发展。 开发者现在学习和采用 MCP,将能够在未来的 AI 应用开发中占据有利位置。
服务端 · 2月19日 21:41
MCP 如何与其他 AI 框架(如 LangChain、LlamaIndex)集成?MCP 可以与各种 AI 框架和工具集成,扩展其功能和应用场景。以下是详细的集成方法和最佳实践: **集成架构设计** MCP 集成应考虑以下方面: 1. **框架兼容性**:确保与目标框架的兼容性 2. **性能影响**:最小化对系统性能的影响 3. **功能完整性**:保持 MCP 和框架功能的完整性 4. **错误处理**:正确处理集成过程中的错误 5. **配置管理**:统一管理集成配置 **1. 与 LangChain 集成** ```python from langchain.agents import AgentExecutor, create_openai_tools_agent from langchain_openai import ChatOpenAI from langchain.tools import Tool from mcp.server import Server class MCPLangChainIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.langchain_tools = [] async def convert_mcp_to_langchain_tools(self) -> list: """将 MCP 工具转换为 LangChain 工具""" mcp_tools = await self.mcp_server.list_tools() langchain_tools = [] for tool_info in mcp_tools: tool = Tool( name=tool_info["name"], description=tool_info["description"], func=self._create_tool_wrapper(tool_info["name"]) ) langchain_tools.append(tool) return langchain_tools def _create_tool_wrapper(self, tool_name: str): """创建工具包装器""" async def wrapper(**kwargs): result = await self.mcp_server.call_tool( tool_name, kwargs ) return result return wrapper async def create_langchain_agent(self): """创建 LangChain Agent""" # 转换 MCP 工具 tools = await self.convert_mcp_to_langchain_tools() # 创建 LLM llm = ChatOpenAI( model="gpt-4", temperature=0 ) # 创建 Agent agent = create_openai_tools_agent(llm, tools) # 创建 AgentExecutor agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True ) return agent_executor async def run_agent(self, query: str): """运行 Agent""" agent_executor = await self.create_langchain_agent() result = await agent_executor.ainvoke({ "input": query }) return result["output"] ``` **2. 与 LlamaIndex 集成** ```python from llama_index.core import VectorStoreIndex, SimpleDirectoryReader from llama_index.core.tools import QueryEngineTool, ToolMetadata from llama_index.core.agent import ReActAgent from mcp.server import Server class MCPLlamaIndexIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server async def create_mcp_query_engine(self, tool_name: str): """创建 MCP 查询引擎""" async def query_engine_fn(query: str) -> str: result = await self.mcp_server.call_tool( tool_name, {"query": query} ) return result return query_engine_fn async def create_llamaindex_tools(self) -> list: """创建 LlamaIndex 工具""" mcp_tools = await self.mcp_server.list_tools() tools = [] for tool_info in mcp_tools: query_engine = await self.create_mcp_query_engine( tool_info["name"] ) tool = QueryEngineTool( query_engine=query_engine, metadata=ToolMetadata( name=tool_info["name"], description=tool_info["description"] ) ) tools.append(tool) return tools async def create_react_agent(self): """创建 ReAct Agent""" tools = await self.create_llamaindex_tools() agent = ReActAgent.from_tools( tools=tools, verbose=True ) return agent async def query_with_agent(self, query: str): """使用 Agent 查询""" agent = await self.create_react_agent() response = agent.query(query) return response.response ``` **3. 与 AutoGPT 集成** ```python from autogpt.agent.agent import Agent from autogpt.config import Config from autogpt.models.command import Command from mcp.server import Server class MCPAutoGPTIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.command_registry = {} async def register_mcp_commands(self): """注册 MCP 命令""" mcp_tools = await self.mcp_server.list_tools() for tool_info in mcp_tools: command = Command( name=tool_info["name"], description=tool_info["description"], function=self._create_command_function(tool_info["name"]) ) self.command_registry[tool_info["name"]] = command def _create_command_function(self, tool_name: str): """创建命令函数""" async def command_function(**kwargs): result = await self.mcp_server.call_tool( tool_name, kwargs ) return result return command_function async def create_autogpt_agent(self, config: Config): """创建 AutoGPT Agent""" # 注册 MCP 命令 await self.register_mcp_commands() # 创建 Agent agent = Agent( ai_name="MCP-Agent", ai_role="Assistant", commands=self.command_registry, config=config ) return agent async def run_autogpt_task(self, task: str): """运行 AutoGPT 任务""" config = Config() agent = await self.create_autogpt_agent(config) result = await agent.run(task) return result ``` **4. 与 FastAPI 集成** ```python from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from mcp.server import Server from pydantic import BaseModel class MCPFastAPIIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.app = FastAPI(title="MCP API") self._setup_middleware() self._setup_routes() def _setup_middleware(self): """设置中间件""" self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) def _setup_routes(self): """设置路由""" @self.app.get("/tools") async def list_tools(): """列出所有工具""" tools = await self.mcp_server.list_tools() return {"tools": tools} @self.app.post("/tools/{tool_name}/call") async def call_tool(tool_name: str, params: dict): """调用工具""" try: result = await self.mcp_server.call_tool( tool_name, params ) return {"result": result} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/resources") async def list_resources(): """列出所有资源""" resources = await self.mcp_server.list_resources() return {"resources": resources} @self.app.get("/resources/{uri}") async def read_resource(uri: str): """读取资源""" try: content = await self.mcp_server.read_resource(uri) return {"content": content} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @self.app.get("/prompts") async def list_prompts(): """列出所有提示词""" prompts = await self.mcp_server.list_prompts() return {"prompts": prompts} @self.app.get("/health") async def health_check(): """健康检查""" return {"status": "healthy"} def get_app(self) -> FastAPI: """获取 FastAPI 应用""" return self.app ``` **5. 与 WebSocket 集成** ```python import asyncio import json from fastapi import WebSocket from mcp.server import Server class MCPWebSocketIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.active_connections = [] async def handle_websocket(self, websocket: WebSocket): """处理 WebSocket 连接""" await websocket.accept() self.active_connections.append(websocket) try: while True: # 接收消息 data = await websocket.receive_text() message = json.loads(data) # 处理消息 response = await self._handle_message(message) # 发送响应 await websocket.send_text(json.dumps(response)) except Exception as e: print(f"WebSocket 错误: {e}") finally: self.active_connections.remove(websocket) async def _handle_message(self, message: dict) -> dict: """处理消息""" message_type = message.get("type") if message_type == "list_tools": tools = await self.mcp_server.list_tools() return {"type": "tools_list", "data": tools} elif message_type == "call_tool": result = await self.mcp_server.call_tool( message["tool_name"], message.get("params", {}) ) return {"type": "tool_result", "data": result} elif message_type == "list_resources": resources = await self.mcp_server.list_resources() return {"type": "resources_list", "data": resources} else: return {"type": "error", "message": "未知消息类型"} async def broadcast_message(self, message: dict): """广播消息到所有连接""" message_text = json.dumps(message) for connection in self.active_connections: try: await connection.send_text(message_text) except Exception as e: print(f"发送消息失败: {e}") ``` **6. 与 GraphQL 集成** ```python import strawberry from strawberry.types import Info from mcp.server import Server @strawberry.type class MCPTool: name: str description: str @strawberry.type class MCPResource: uri: str name: str description: str @strawberry.type class Query: @strawberry.field async def tools(self, info: Info) -> list[MCPTool]: """获取所有工具""" mcp_server = info.context["mcp_server"] tools = await mcp_server.list_tools() return [ MCPTool( name=tool["name"], description=tool["description"] ) for tool in tools ] @strawberry.field async def resources(self, info: Info) -> list[MCPResource]: """获取所有资源""" mcp_server = info.context["mcp_server"] resources = await mcp_server.list_resources() return [ MCPResource( uri=resource["uri"], name=resource["name"], description=resource["description"] ) for resource in resources ] @strawberry.type class Mutation: @strawberry.mutation async def call_tool( self, tool_name: str, params: dict, info: Info ) -> str: """调用工具""" mcp_server = info.context["mcp_server"] result = await mcp_server.call_tool(tool_name, params) return str(result) class MCPGraphQLIntegration: def __init__(self, mcp_server: Server): self.mcp_server = mcp_server self.schema = strawberry.Schema( query=Query, mutation=Mutation ) async def execute_query(self, query: str, variables: dict = None): """执行 GraphQL 查询""" context = {"mcp_server": self.mcp_server} result = await self.schema.execute( query, variable_values=variables, context_value=context ) if result.errors: return { "errors": [str(error) for error in result.errors] } return {"data": result.data} ``` **7. 与 gRPC 集成** ```python import grpc from concurrent import futures import mcp_pb2 import mcp_pb2_grpc from mcp.server import Server class MCPServicer(mcp_pb2_grpc.MCPServicer): def __init__(self, mcp_server: Server): self.mcp_server = mcp_server async def ListTools( self, request: mcp_pb2.ListToolsRequest, context: grpc.ServicerContext ) -> mcp_pb2.ListToolsResponse: """列出工具""" tools = await self.mcp_server.list_tools() tool_protos = [ mcp_pb2.Tool( name=tool["name"], description=tool["description"] ) for tool in tools ] return mcp_pb2.ListToolsResponse(tools=tool_protos) async def CallTool( self, request: mcp_pb2.CallToolRequest, context: grpc.ServicerContext ) -> mcp_pb2.CallToolResponse: """调用工具""" params = dict(request.params) result = await self.mcp_server.call_tool( request.tool_name, params ) return mcp_pb2.CallToolResponse(result=str(result)) class MCPGRPCIntegration: def __init__(self, mcp_server: Server, port: int = 50051): self.mcp_server = mcp_server self.port = port self.server = None async def start_server(self): """启动 gRPC 服务器""" self.server = grpc.aio.server( futures.ThreadPoolExecutor(max_workers=10) ) mcp_pb2_grpc.add_MCPServicer_to_server( MCPServicer(self.mcp_server), self.server ) self.server.add_insecure_port(f'[::]:{self.port}') await self.server.start() print(f"gRPC 服务器启动在端口 {self.port}") async def stop_server(self): """停止 gRPC 服务器""" if self.server: await self.server.stop(0) print("gRPC 服务器已停止") ``` **最佳实践:** 1. **异步处理**:使用异步编程避免阻塞 2. **错误处理**:正确处理集成过程中的错误 3. **性能优化**:缓存频繁调用的结果 4. **日志记录**:记录所有集成操作 5. **测试覆盖**:编写集成测试确保功能正常 6. **文档完善**:提供清晰的集成文档 通过与其他 AI 框架和工具的集成,可以扩展 MCP 的功能和应用场景。
服务端 · 2月19日 21:40
MCP 的安全性设计有哪些关键机制?MCP 的安全性设计包含多个层面,确保 AI 模型与外部系统的交互是安全可控的: **1. 认证和授权机制** - **身份认证**:支持多种认证方式(API Key、OAuth、JWT 等) - **访问控制**:基于角色的权限管理(RBAC) - **令牌管理**:安全的令牌生成、验证和刷新机制 - **多租户支持**:隔离不同用户或租户的数据和资源 **2. 通信安全** - **加密传输**:强制使用 TLS/SSL 加密所有通信 - **证书验证**:严格的证书验证和吊销检查 - **安全协议**:基于 JSON-RPC 2.0 的安全扩展 - **防止中间人攻击**:完整的证书链验证 **3. 输入验证和清理** - **参数验证**:严格验证所有输入参数的类型和格式 - **SQL 注入防护**:使用参数化查询,防止 SQL 注入 - **XSS 防护**:清理和转义用户输入,防止跨站脚本攻击 - **命令注入防护**:限制和验证系统命令执行 **4. 资源访问控制** - **文件系统隔离**:限制可访问的文件路径和权限 - **网络访问限制**:白名单机制控制外部网络访问 - **资源配额**:限制 CPU、内存、磁盘等资源使用 - **操作审计**:记录所有资源访问和修改操作 **5. 执行环境安全** - **沙箱隔离**:在隔离的沙箱环境中执行代码 - **权限最小化**:只授予必要的最小权限 - **超时控制**:设置执行超时,防止无限循环 - **资源限制**:限制内存、CPU 等资源使用 **6. 错误处理和日志** - **安全错误消息**:不泄露敏感信息的错误提示 - **详细日志记录**:记录所有操作和安全事件 - **审计追踪**:完整的操作审计链 - **异常监控**:实时监控异常行为 **7. 数据保护** - **数据加密**:敏感数据加密存储和传输 - **数据脱敏**:日志和错误消息中的敏感数据脱敏 - **数据隔离**:不同用户数据的严格隔离 - **数据备份**:安全的数据备份和恢复机制 **8. 速率限制和防护** - **请求限流**:防止 API 滥用和 DDoS 攻击 - **并发控制**:限制并发请求数量 - **黑名单机制**:阻止恶意 IP 或用户 - **异常检测**:检测和阻止异常行为模式 **安全最佳实践:** 1. 定期进行安全审计和渗透测试 2. 及时更新依赖库和框架 3. 实施最小权限原则 4. 建立安全事件响应流程 5. 提供安全配置指南和文档 通过这些多层安全机制,MCP 能够在提供强大功能的同时,确保系统的安全性和可靠性。
服务端 · 2月19日 21:40
如何对 MCP 进行测试?有哪些测试策略和最佳实践?MCP 的测试策略对于确保系统质量和可靠性至关重要。以下是详细的测试方法和最佳实践: **测试层次结构** MCP 测试应涵盖以下层次: 1. **单元测试**:测试单个函数和组件 2. **集成测试**:测试组件之间的交互 3. **端到端测试**:测试完整的用户场景 4. **性能测试**:测试系统性能和可扩展性 5. **安全测试**:测试安全漏洞和防护机制 **1. 单元测试** ```python import pytest from unittest.mock import Mock, AsyncMock from mcp.server import Server class TestMCPTools: @pytest.fixture def server(self): """创建测试服务器实例""" return Server("test-server") @pytest.mark.asyncio async def test_tool_registration(self, server): """测试工具注册""" @server.tool( name="test_tool", description="测试工具" ) async def test_tool(param: str) -> str: return f"Result: {param}" # 验证工具已注册 tools = await server.list_tools() assert any(t["name"] == "test_tool" for t in tools) @pytest.mark.asyncio async def test_tool_execution(self, server): """测试工具执行""" @server.tool( name="calculate", description="计算工具" ) async def calculate(a: int, b: int) -> int: return a + b # 执行工具 result = await server.call_tool("calculate", {"a": 2, "b": 3}) assert result == 5 @pytest.mark.asyncio async def test_parameter_validation(self, server): """测试参数验证""" @server.tool( name="validate", description="参数验证工具", inputSchema={ "type": "object", "properties": { "email": { "type": "string", "format": "email" } }, "required": ["email"] } ) async def validate(email: str) -> str: return f"Valid: {email}" # 测试有效参数 result = await server.call_tool( "validate", {"email": "test@example.com"} ) assert "Valid" in result # 测试无效参数 with pytest.raises(ValueError): await server.call_tool("validate", {"email": "invalid"}) ``` **2. 集成测试** ```python class TestMCPIntegration: @pytest.mark.asyncio async def test_client_server_communication(self): """测试客户端-服务器通信""" from mcp.client import Client from mcp.server import Server # 创建服务器 server = Server("integration-test-server") @server.tool(name="echo", description="回显工具") async def echo(message: str) -> str: return message # 启动服务器 await server.start() try: # 创建客户端 client = Client("http://localhost:8000") # 测试通信 result = await client.call_tool("echo", {"message": "Hello"}) assert result == "Hello" finally: await server.stop() @pytest.mark.asyncio async def test_resource_access(self): """测试资源访问""" server = Server("resource-test-server") @server.resource( uri="file:///test.txt", name="测试文件", description="测试资源" ) async def test_resource() -> str: return "Test content" await server.start() try: client = Client("http://localhost:8000") # 读取资源 content = await client.read_resource("file:///test.txt") assert content == "Test content" finally: await server.stop() ``` **3. 端到端测试** ```python class TestMCPEndToEnd: @pytest.mark.asyncio async def test_complete_workflow(self): """测试完整工作流""" # 模拟用户场景:查询数据库并生成报告 server = Server("e2e-test-server") @server.tool(name="query_db", description="查询数据库") async def query_db(query: str) -> list: return [{"id": 1, "name": "Test"}] @server.tool(name="generate_report", description="生成报告") async def generate_report(data: list) -> str: return f"Report: {len(data)} items" await server.start() try: client = Client("http://localhost:8000") # 执行工作流 data = await client.call_tool("query_db", {"query": "SELECT *"}) report = await client.call_tool("generate_report", {"data": data}) assert "1 items" in report finally: await server.stop() ``` **4. 性能测试** ```python import asyncio import time from locust import HttpUser, task, between class MCPPerformanceTest(HttpUser): wait_time = between(1, 3) def on_start(self): """测试开始时的初始化""" self.client = Client(self.host) @task def tool_call_performance(self): """测试工具调用性能""" start_time = time.time() result = self.client.call_tool("test_tool", {"param": "value"}) elapsed = time.time() - start_time # 断言响应时间 assert elapsed < 1.0, f"响应时间过长: {elapsed}s" @task def concurrent_requests(self): """测试并发请求""" async def make_request(): return self.client.call_tool("test_tool", {"param": "value"}) # 并发执行 10 个请求 tasks = [make_request() for _ in range(10)] results = asyncio.run(asyncio.gather(*tasks)) # 验证所有请求都成功 assert all(results) ``` **5. 安全测试** ```python class TestMCPSecurity: @pytest.mark.asyncio async def test_authentication(self): """测试认证机制""" server = Server("security-test-server") # 配置认证 server.set_authenticator(lambda token: token == "valid-token") @server.tool(name="secure_tool", description="安全工具") async def secure_tool() -> str: return "Secure data" await server.start() try: # 测试有效令牌 client = Client("http://localhost:8000", token="valid-token") result = await client.call_tool("secure_tool", {}) assert result == "Secure data" # 测试无效令牌 client_invalid = Client("http://localhost:8000", token="invalid-token") with pytest.raises(AuthenticationError): await client_invalid.call_tool("secure_tool", {}) finally: await server.stop() @pytest.mark.asyncio async def test_sql_injection_prevention(self): """测试 SQL 注入防护""" server = Server("sql-injection-test-server") @server.tool(name="query", description="查询工具") async def query(sql: str) -> list: # 应该使用参数化查询 return execute_safe_query(sql) # 测试 SQL 注入尝试 malicious_sql = "SELECT * FROM users WHERE '1'='1'" result = await server.call_tool("query", {"sql": malicious_sql}) # 验证注入被阻止 assert result == [] @pytest.mark.asyncio async def test_rate_limiting(self): """测试速率限制""" server = Server("rate-limit-test-server") # 配置速率限制 server.set_rate_limit(max_requests=10, window=60) @server.tool(name="limited_tool", description="受限工具") async def limited_tool() -> str: return "Success" # 快速发送多个请求 for i in range(15): try: await server.call_tool("limited_tool", {}) except RateLimitError: # 预期的速率限制错误 assert i >= 10 break else: pytest.fail("未触发速率限制") ``` **6. Mock 和 Stub** ```python from unittest.mock import Mock, patch class TestMCPWithMocks: @pytest.mark.asyncio async def test_with_external_dependency_mock(self): """使用 Mock 测试外部依赖""" server = Server("mock-test-server") @server.tool(name="fetch_data", description="获取数据") async def fetch_data(url: str) -> dict: # Mock 外部 API 调用 with patch('requests.get') as mock_get: mock_get.return_value.json.return_value = { "data": "mocked" } response = requests.get(url) return response.json() result = await server.call_tool( "fetch_data", {"url": "http://api.example.com"} ) assert result == {"data": "mocked"} mock_get.assert_called_once() ``` **7. 测试覆盖率** ```python # 使用 pytest-cov 生成覆盖率报告 # 运行命令: pytest --cov=mcp --cov-report=html class TestCoverage: @pytest.mark.asyncio async def test_all_code_paths(self): """测试所有代码路径""" server = Server("coverage-test-server") @server.tool(name="complex_tool", description="复杂工具") async def complex_tool(condition: bool) -> str: if condition: return "Branch A" else: return "Branch B" # 测试所有分支 result_a = await server.call_tool("complex_tool", {"condition": True}) assert result_a == "Branch A" result_b = await server.call_tool("complex_tool", {"condition": False}) assert result_b == "Branch B" ``` **最佳实践:** 1. **测试金字塔**:大量单元测试,适量集成测试,少量端到端测试 2. **独立性**:每个测试应该独立运行,不依赖其他测试 3. **可重复性**:测试结果应该可重复,不受环境因素影响 4. **快速反馈**:单元测试应该快速执行,提供快速反馈 5. **持续集成**:将测试集成到 CI/CD 流程中 6. **覆盖率目标**:设定合理的代码覆盖率目标(如 80%) 通过完善的测试策略,可以确保 MCP 系统的质量和可靠性。
服务端 · 2月19日 21:40
MCP 的插件系统是如何工作的?MCP 的插件系统允许开发者扩展 MCP 服务器的功能,无需修改核心代码。以下是详细的插件架构和实现方法: **插件架构设计** MCP 插件系统应考虑以下方面: 1. **插件发现**:自动发现和加载插件 2. **插件生命周期**:管理插件的加载、初始化、卸载 3. **插件隔离**:确保插件之间相互隔离 4. **插件通信**:提供插件间的通信机制 5. **插件安全**:限制插件的权限和资源访问 **1. 插件接口定义** ```python from abc import ABC, abstractmethod from typing import Dict, Any, List class MCPPlugin(ABC): """MCP 插件基类""" def __init__(self, config: Dict[str, Any] = None): self.config = config or {} self.name = self.__class__.__name__ self.version = getattr(self, 'VERSION', '1.0.0') @abstractmethod async def initialize(self, server): """初始化插件""" pass @abstractmethod async def shutdown(self): """关闭插件""" pass @abstractmethod def get_tools(self) -> List[Dict[str, Any]]: """获取插件提供的工具""" return [] @abstractmethod def get_resources(self) -> List[Dict[str, Any]]: """获取插件提供的资源""" return [] @abstractmethod def get_prompts(self) -> List[Dict[str, Any]]: """获取插件提供的提示词""" return [] def get_metadata(self) -> Dict[str, Any]: """获取插件元数据""" return { "name": self.name, "version": self.version, "description": getattr(self, 'DESCRIPTION', ''), "author": getattr(self, 'AUTHOR', ''), "dependencies": getattr(self, 'DEPENDENCIES', []) } ``` **2. 插件管理器** ```python import importlib import inspect import os from pathlib import Path from typing import Dict, List, Type class PluginManager: def __init__(self, server): self.server = server self.plugins: Dict[str, MCPPlugin] = {} self.plugin_directories: List[str] = [] def add_plugin_directory(self, directory: str): """添加插件目录""" if directory not in self.plugin_directories: self.plugin_directories.append(directory) async def discover_plugins(self): """发现插件""" discovered_plugins = [] for directory in self.plugin_directories: plugin_path = Path(directory) if not plugin_path.exists(): continue # 遍历插件目录 for item in plugin_path.iterdir(): if item.is_file() and item.suffix == '.py': # 单文件插件 discovered_plugins.append(str(item)) elif item.is_dir() and (item / '__init__.py').exists(): # 包插件 discovered_plugins.append(str(item)) return discovered_plugins async def load_plugin(self, plugin_path: str) -> bool: """加载插件""" try: # 动态导入插件模块 spec = importlib.util.spec_from_file_location( "plugin_module", plugin_path ) if spec is None or spec.loader is None: return False module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # 查找插件类 plugin_class = None for name, obj in inspect.getmembers(module): if (inspect.isclass(obj) and issubclass(obj, MCPPlugin) and obj is not MCPPlugin): plugin_class = obj break if plugin_class is None: return False # 创建插件实例 plugin = plugin_class() # 初始化插件 await plugin.initialize(self.server) # 注册插件 self.plugins[plugin.name] = plugin # 注册插件提供的工具、资源、提示词 self._register_plugin_tools(plugin) self._register_plugin_resources(plugin) self._register_plugin_prompts(plugin) return True except Exception as e: print(f"加载插件失败 {plugin_path}: {e}") return False def _register_plugin_tools(self, plugin: MCPPlugin): """注册插件工具""" tools = plugin.get_tools() for tool_info in tools: @self.server.tool( name=tool_info["name"], description=tool_info["description"] ) async def tool_wrapper(**kwargs): return await tool_info["function"](**kwargs) def _register_plugin_resources(self, plugin: MCPPlugin): """注册插件资源""" resources = plugin.get_resources() for resource_info in resources: @self.server.resource( uri=resource_info["uri"], name=resource_info["name"], description=resource_info["description"] ) async def resource_wrapper(): return await resource_info["function"]() def _register_plugin_prompts(self, plugin: MCPPlugin): """注册插件提示词""" prompts = plugin.get_prompts() for prompt_info in prompts: @self.server.prompt( name=prompt_info["name"], description=prompt_info["description"] ) async def prompt_wrapper(**kwargs): return await prompt_info["function"](**kwargs) async def unload_plugin(self, plugin_name: str) -> bool: """卸载插件""" if plugin_name not in self.plugins: return False plugin = self.plugins[plugin_name] try: # 关闭插件 await plugin.shutdown() # 从插件列表中移除 del self.plugins[plugin_name] return True except Exception as e: print(f"卸载插件失败 {plugin_name}: {e}") return False async def reload_plugin(self, plugin_name: str) -> bool: """重新加载插件""" if plugin_name not in self.plugins: return False # 先卸载 await self.unload_plugin(plugin_name) # 重新加载(需要记录插件路径) # 这里简化处理,实际需要保存插件路径 return True def get_plugin_info(self, plugin_name: str) -> Dict[str, Any]: """获取插件信息""" if plugin_name not in self.plugins: return {} plugin = self.plugins[plugin_name] return plugin.get_metadata() def list_plugins(self) -> List[Dict[str, Any]]: """列出所有插件""" return [ plugin.get_metadata() for plugin in self.plugins.values() ] ``` **3. 示例插件实现** ```python # plugins/database_plugin.py class DatabasePlugin(MCPPlugin): """数据库插件""" VERSION = "1.0.0" DESCRIPTION = "提供数据库查询和管理功能" AUTHOR = "Your Name" DEPENDENCIES = ["sqlalchemy"] def __init__(self, config: Dict[str, Any] = None): super().__init__(config) self.db_connection = None async def initialize(self, server): """初始化数据库连接""" from sqlalchemy import create_engine db_url = self.config.get("database_url", "sqlite:///mcp.db") self.db_connection = create_engine(db_url) print(f"数据库插件 {self.name} 已初始化") async def shutdown(self): """关闭数据库连接""" if self.db_connection: self.db_connection.dispose() print(f"数据库插件 {self.name} 已关闭") def get_tools(self) -> List[Dict[str, Any]]: """获取数据库工具""" return [ { "name": "query_database", "description": "执行数据库查询", "function": self._query_database }, { "name": "execute_sql", "description": "执行 SQL 语句", "function": self._execute_sql } ] def get_resources(self) -> List[Dict[str, Any]]: """获取数据库资源""" return [ { "uri": "db://schema", "name": "数据库模式", "description": "数据库表结构", "function": self._get_schema } ] def get_prompts(self) -> List[Dict[str, Any]]: """获取数据库提示词""" return [ { "name": "generate_query", "description": "生成 SQL 查询提示词", "function": self._generate_query_prompt } ] async def _query_database(self, query: str) -> str: """执行数据库查询""" from sqlalchemy import text with self.db_connection.connect() as conn: result = conn.execute(text(query)) rows = result.fetchall() return f"查询结果: {len(rows)} 行" async def _execute_sql(self, sql: str) -> str: """执行 SQL 语句""" from sqlalchemy import text with self.db_connection.connect() as conn: conn.execute(text(sql)) conn.commit() return "SQL 执行成功" async def _get_schema(self) -> str: """获取数据库模式""" from sqlalchemy import inspect inspector = inspect(self.db_connection) tables = inspector.get_table_names() return f"数据库表: {', '.join(tables)}" async def _generate_query_prompt(self, table_name: str) -> str: """生成查询提示词""" return f""" 请为表 {table_name} 生成 SQL 查询语句。 要求: 1. 只使用 SELECT 查询 2. 包含适当的 WHERE 条件 3. 添加 LIMIT 限制结果数量 """ ``` **4. 插件配置** ```python import json from pathlib import Path class PluginConfig: def __init__(self, config_dir: str = "config/plugins"): self.config_dir = Path(config_dir) self.config_dir.mkdir(parents=True, exist_ok=True) def load_config(self, plugin_name: str) -> Dict[str, Any]: """加载插件配置""" config_file = self.config_dir / f"{plugin_name}.json" if not config_file.exists(): return {} with open(config_file, 'r') as f: return json.load(f) def save_config( self, plugin_name: str, config: Dict[str, Any] ): """保存插件配置""" config_file = self.config_dir / f"{plugin_name}.json" with open(config_file, 'w') as f: json.dump(config, f, indent=2) def get_all_configs(self) -> Dict[str, Dict[str, Any]]: """获取所有插件配置""" configs = {} for config_file in self.config_dir.glob("*.json"): plugin_name = config_file.stem with open(config_file, 'r') as f: configs[plugin_name] = json.load(f) return configs ``` **5. 插件依赖管理** ```python from typing import Dict, List, Set class PluginDependencyManager: def __init__(self): self.dependencies: Dict[str, Set[str]] = {} self.dependents: Dict[str, Set[str]] = {} def add_dependency(self, plugin_name: str, dependency: str): """添加依赖关系""" if plugin_name not in self.dependencies: self.dependencies[plugin_name] = set() self.dependencies[plugin_name].add(dependency) if dependency not in self.dependents: self.dependents[dependency] = set() self.dependents[dependency].add(plugin_name) def get_load_order(self, plugins: List[str]) -> List[str]: """获取插件加载顺序(拓扑排序)""" visited = set() result = [] def visit(plugin_name: str): if plugin_name in visited: return visited.add(plugin_name) # 先加载依赖 for dep in self.dependencies.get(plugin_name, []): visit(dep) result.append(plugin_name) for plugin_name in plugins: visit(plugin_name) return result def check_circular_dependency(self) -> bool: """检查循环依赖""" visited = set() recursion_stack = set() def has_cycle(plugin_name: str) -> bool: visited.add(plugin_name) recursion_stack.add(plugin_name) for dep in self.dependencies.get(plugin_name, []): if dep not in visited: if has_cycle(dep): return True elif dep in recursion_stack: return True recursion_stack.remove(plugin_name) return False for plugin_name in self.dependencies: if plugin_name not in visited: if has_cycle(plugin_name): return True return False ``` **6. 插件沙箱** ```python import sys from typing import Any class PluginSandbox: def __init__(self): self.restricted_modules = { 'os', 'sys', 'subprocess', 'socket', 'pickle', 'shelve', 'marshal' } def create_sandbox(self, plugin_name: str) -> dict: """创建插件沙箱环境""" safe_globals = { '__builtins__': self._create_safe_builtins(), '__name__': f'plugin_{plugin_name}', } return safe_globals def _create_safe_builtins(self) -> dict: """创建安全的内置函数""" safe_builtins = {} # 允许的安全内置函数 allowed_builtins = [ 'abs', 'all', 'any', 'bool', 'dict', 'enumerate', 'filter', 'float', 'int', 'len', 'list', 'map', 'max', 'min', 'range', 'set', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip' ] for name in allowed_builtins: if hasattr(__builtins__, name): safe_builtins[name] = getattr(__builtins__, name) return safe_builtins def execute_in_sandbox( self, code: str, sandbox_env: dict ) -> Any: """在沙箱中执行代码""" try: exec(code, sandbox_env) return True except Exception as e: print(f"沙箱执行失败: {e}") return False ``` **最佳实践:** 1. **插件隔离**:确保插件之间相互隔离,避免冲突 2. **依赖管理**:正确处理插件依赖关系 3. **错误处理**:插件错误不应影响核心系统 4. **版本兼容**:支持插件版本管理和兼容性检查 5. **安全限制**:限制插件的权限和资源访问 6. **文档完善**:为每个插件提供清晰的文档 通过完善的插件系统,可以灵活扩展 MCP 服务器的功能,满足各种定制化需求。
服务端 · 2月19日 21:39