MCP
模型上下文协议(Model Context Protocol,MCP) 解决了大型语言模型(LLM)与外部数据源和工具连接的难题,堪称 AI 领域的“万能遥控器”。
由 Anthropic 开源发布,MCP 在现有的函数调用机制基础上创新,免去了 LLM 与各类应用间定制集成的繁琐工作。
这意味着开发者无需为每种 AI 模型与外部系统的组合重新设计接口,能够更高效地构建功能更强大、上下文感知更精准的应用。

查看更多相关内容
MCP 的数据持久化和缓存策略有哪些?MCP 的数据持久化和缓存策略对于提高系统性能和可靠性至关重要。以下是详细的实现方法和最佳实践:
**数据持久化架构**
MCP 数据持久化应考虑以下方面:
1. **存储类型**:选择合适的存储类型(关系型、文档型、键值型等)
2. **数据模型**:设计合理的数据模型
3. **持久化策略**:实现高效的数据持久化策略
4. **缓存策略**:实现多层缓存策略
5. **数据一致性**:确保数据一致性
6. **备份恢复**:实现数据备份和恢复机制
**1. 数据模型设计**
```python
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from datetime import datetime
from enum import Enum
class DataType(Enum):
"""数据类型"""
TOOL = "tool"
RESOURCE = "resource"
PROMPT = "prompt"
SESSION = "session"
METADATA = "metadata"
@dataclass
class DataRecord:
"""数据记录"""
id: str
data_type: DataType
content: Dict[str, Any]
created_at: datetime
updated_at: datetime
version: int = 1
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class DataModel:
"""数据模型"""
def __init__(self):
self.records: Dict[str, DataRecord] = {}
def create_record(
self,
data_type: DataType,
content: Dict[str, Any],
metadata: Dict[str, Any] = None
) -> DataRecord:
"""创建数据记录"""
record_id = self._generate_id(data_type)
now = datetime.now()
record = DataRecord(
id=record_id,
data_type=data_type,
content=content,
created_at=now,
updated_at=now,
metadata=metadata or {}
)
self.records[record_id] = record
return record
def update_record(
self,
record_id: str,
content: Dict[str, Any] = None,
metadata: Dict[str, Any] = None
) -> Optional[DataRecord]:
"""更新数据记录"""
if record_id not in self.records:
return None
record = self.records[record_id]
if content:
record.content.update(content)
if metadata:
record.metadata.update(metadata)
record.updated_at = datetime.now()
record.version += 1
return record
def get_record(self, record_id: str) -> Optional[DataRecord]:
"""获取数据记录"""
return self.records.get(record_id)
def delete_record(self, record_id: str) -> bool:
"""删除数据记录"""
if record_id in self.records:
del self.records[record_id]
return True
return False
def query_records(
self,
data_type: DataType = None,
filters: Dict[str, Any] = None
) -> List[DataRecord]:
"""查询数据记录"""
records = list(self.records.values())
if data_type:
records = [r for r in records if r.data_type == data_type]
if filters:
for key, value in filters.items():
records = [
r for r in records
if self._match_filter(r, key, value)
]
return records
def _generate_id(self, data_type: DataType) -> str:
"""生成记录 ID"""
import uuid
return f"{data_type.value}_{uuid.uuid4().hex}"
def _match_filter(
self,
record: DataRecord,
key: str,
value: Any
) -> bool:
"""匹配过滤条件"""
# 检查内容
if key in record.content:
return record.content[key] == value
# 检查元数据
if key in record.metadata:
return record.metadata[key] == value
return False
```
**2. 持久化存储实现**
```python
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
import json
import os
from pathlib import Path
class PersistenceStorage(ABC):
"""持久化存储基类"""
@abstractmethod
async def save_record(self, record: DataRecord) -> bool:
"""保存记录"""
pass
@abstractmethod
async def load_record(self, record_id: str) -> Optional[DataRecord]:
"""加载记录"""
pass
@abstractmethod
async def delete_record(self, record_id: str) -> bool:
"""删除记录"""
pass
@abstractmethod
async def query_records(
self,
data_type: DataType = None,
filters: Dict[str, Any] = None
) -> List[DataRecord]:
"""查询记录"""
pass
class FileStorage(PersistenceStorage):
"""文件存储"""
def __init__(self, storage_dir: str = "data"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
def _get_file_path(self, record_id: str) -> Path:
"""获取文件路径"""
return self.storage_dir / f"{record_id}.json"
async def save_record(self, record: DataRecord) -> bool:
"""保存记录"""
file_path = self._get_file_path(record.id)
try:
data = {
"id": record.id,
"data_type": record.data_type.value,
"content": record.content,
"created_at": record.created_at.isoformat(),
"updated_at": record.updated_at.isoformat(),
"version": record.version,
"metadata": record.metadata
}
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
return True
except Exception as e:
print(f"保存记录失败: {e}")
return False
async def load_record(self, record_id: str) -> Optional[DataRecord]:
"""加载记录"""
file_path = self._get_file_path(record_id)
if not file_path.exists():
return None
try:
with open(file_path, 'r') as f:
data = json.load(f)
record = DataRecord(
id=data["id"],
data_type=DataType(data["data_type"]),
content=data["content"],
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
version=data["version"],
metadata=data.get("metadata", {})
)
return record
except Exception as e:
print(f"加载记录失败: {e}")
return None
async def delete_record(self, record_id: str) -> bool:
"""删除记录"""
file_path = self._get_file_path(record_id)
if file_path.exists():
file_path.unlink()
return True
return False
async def query_records(
self,
data_type: DataType = None,
filters: Dict[str, Any] = None
) -> List[DataRecord]:
"""查询记录"""
records = []
for file_path in self.storage_dir.glob("*.json"):
try:
with open(file_path, 'r') as f:
data = json.load(f)
record = DataRecord(
id=data["id"],
data_type=DataType(data["data_type"]),
content=data["content"],
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
version=data["version"],
metadata=data.get("metadata", {})
)
# 应用过滤条件
if data_type and record.data_type != data_type:
continue
if filters:
match = True
for key, value in filters.items():
if key in record.content and record.content[key] != value:
match = False
break
if key in record.metadata and record.metadata[key] != value:
match = False
break
if not match:
continue
records.append(record)
except Exception as e:
print(f"加载记录失败 {file_path}: {e}")
return records
class DatabaseStorage(PersistenceStorage):
"""数据库存储"""
def __init__(self, database_url: str):
self.database_url = database_url
self._initialize_database()
def _initialize_database(self):
"""初始化数据库"""
from sqlalchemy import create_engine, Column, String, Integer, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class DataRecordTable(Base):
__tablename__ = 'data_records'
id = Column(String(100), primary_key=True)
data_type = Column(String(50), nullable=False, index=True)
content = Column(Text, nullable=False)
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
version = Column(Integer, default=1)
metadata = Column(Text)
self.engine = create_engine(self.database_url)
Base.metadata.create_all(self.engine)
self.SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine
)
self.DataRecordTable = DataRecordTable
async def save_record(self, record: DataRecord) -> bool:
"""保存记录"""
session = self.SessionLocal()
try:
db_record = self.DataRecordTable(
id=record.id,
data_type=record.data_type.value,
content=json.dumps(record.content),
created_at=record.created_at,
updated_at=record.updated_at,
version=record.version,
metadata=json.dumps(record.metadata)
)
session.merge(db_record)
session.commit()
return True
except Exception as e:
session.rollback()
print(f"保存记录失败: {e}")
return False
finally:
session.close()
async def load_record(self, record_id: str) -> Optional[DataRecord]:
"""加载记录"""
session = self.SessionLocal()
try:
db_record = session.query(self.DataRecordTable).filter(
self.DataRecordTable.id == record_id
).first()
if not db_record:
return None
record = DataRecord(
id=db_record.id,
data_type=DataType(db_record.data_type),
content=json.loads(db_record.content),
created_at=db_record.created_at,
updated_at=db_record.updated_at,
version=db_record.version,
metadata=json.loads(db_record.metadata) if db_record.metadata else {}
)
return record
except Exception as e:
print(f"加载记录失败: {e}")
return None
finally:
session.close()
async def delete_record(self, record_id: str) -> bool:
"""删除记录"""
session = self.SessionLocal()
try:
session.query(self.DataRecordTable).filter(
self.DataRecordTable.id == record_id
).delete()
session.commit()
return True
except Exception as e:
session.rollback()
print(f"删除记录失败: {e}")
return False
finally:
session.close()
async def query_records(
self,
data_type: DataType = None,
filters: Dict[str, Any] = None
) -> List[DataRecord]:
"""查询记录"""
session = self.SessionLocal()
try:
query = session.query(self.DataRecordTable)
if data_type:
query = query.filter(
self.DataRecordTable.data_type == data_type.value
)
db_records = query.all()
records = []
for db_record in db_records:
record = DataRecord(
id=db_record.id,
data_type=DataType(db_record.data_type),
content=json.loads(db_record.content),
created_at=db_record.created_at,
updated_at=db_record.updated_at,
version=db_record.version,
metadata=json.loads(db_record.metadata) if db_record.metadata else {}
)
# 应用过滤条件
if filters:
match = True
for key, value in filters.items():
if key in record.content and record.content[key] != value:
match = False
break
if key in record.metadata and record.metadata[key] != value:
match = False
break
if not match:
continue
records.append(record)
return records
except Exception as e:
print(f"查询记录失败: {e}")
return []
finally:
session.close()
```
**3. 缓存策略实现**
```python
from typing import Optional, Dict, Any, List
from abc import ABC, abstractmethod
import time
class CacheStrategy(ABC):
"""缓存策略基类"""
@abstractmethod
async def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
pass
@abstractmethod
async def set(self, key: str, value: Any, ttl: int = None):
"""设置缓存值"""
pass
@abstractmethod
async def delete(self, key: str) -> bool:
"""删除缓存值"""
pass
@abstractmethod
async def clear(self):
"""清空缓存"""
pass
class MemoryCache(CacheStrategy):
"""内存缓存"""
def __init__(self, max_size: int = 1000, default_ttl: int = 300):
self.max_size = max_size
self.default_ttl = default_ttl
self.cache: Dict[str, tuple] = {}
async def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
if key not in self.cache:
return None
value, timestamp, ttl = self.cache[key]
# 检查是否过期
if ttl and time.time() - timestamp > ttl:
del self.cache[key]
return None
return value
async def set(self, key: str, value: Any, ttl: int = None):
"""设置缓存值"""
# 检查缓存大小
if len(self.cache) >= self.max_size:
self._evict()
ttl = ttl or self.default_ttl
self.cache[key] = (value, time.time(), ttl)
async def delete(self, key: str) -> bool:
"""删除缓存值"""
if key in self.cache:
del self.cache[key]
return True
return False
async def clear(self):
"""清空缓存"""
self.cache.clear()
def _evict(self):
"""淘汰缓存项"""
# 简单实现:随机淘汰
if self.cache:
key = next(iter(self.cache))
del self.cache[key]
class RedisCache(CacheStrategy):
"""Redis 缓存"""
def __init__(self, redis_url: str = "redis://localhost:6379/0"):
import redis.asyncio as aioredis
self.redis = aioredis.from_url(redis_url)
async def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
try:
value = await self.redis.get(key)
if value:
return json.loads(value)
return None
except Exception as e:
print(f"获取缓存失败: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = None):
"""设置缓存值"""
try:
serialized_value = json.dumps(value)
if ttl:
await self.redis.setex(key, ttl, serialized_value)
else:
await self.redis.set(key, serialized_value)
except Exception as e:
print(f"设置缓存失败: {e}")
async def delete(self, key: str) -> bool:
"""删除缓存值"""
try:
result = await self.redis.delete(key)
return result > 0
except Exception as e:
print(f"删除缓存失败: {e}")
return False
async def clear(self):
"""清空缓存"""
try:
await self.redis.flushdb()
except Exception as e:
print(f"清空缓存失败: {e}")
class MultiLevelCache:
"""多级缓存"""
def __init__(
self,
l1_cache: CacheStrategy,
l2_cache: CacheStrategy = None
):
self.l1_cache = l1_cache
self.l2_cache = l2_cache
async def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
# 先从 L1 缓存获取
value = await self.l1_cache.get(key)
if value is not None:
return value
# 从 L2 缓存获取
if self.l2_cache:
value = await self.l2_cache.get(key)
if value is not None:
# 回填 L1 缓存
await self.l1_cache.set(key, value)
return value
return None
async def set(self, key: str, value: Any, ttl: int = None):
"""设置缓存值"""
# 同时设置 L1 和 L2 缓存
await self.l1_cache.set(key, value, ttl)
if self.l2_cache:
await self.l2_cache.set(key, value, ttl)
async def delete(self, key: str) -> bool:
"""删除缓存值"""
# 同时删除 L1 和 L2 缓存
l1_deleted = await self.l1_cache.delete(key)
l2_deleted = True
if self.l2_cache:
l2_deleted = await self.l2_cache.delete(key)
return l1_deleted or l2_deleted
async def clear(self):
"""清空缓存"""
await self.l1_cache.clear()
if self.l2_cache:
await self.l2_cache.clear()
```
**4. 数据持久化管理器**
```python
from typing import Optional, Dict, Any, List
class DataPersistenceManager:
"""数据持久化管理器"""
def __init__(
self,
storage: PersistenceStorage,
cache: CacheStrategy = None
):
self.storage = storage
self.cache = cache
self.data_model = DataModel()
async def save_record(
self,
data_type: DataType,
content: Dict[str, Any],
metadata: Dict[str, Any] = None,
use_cache: bool = True
) -> Optional[DataRecord]:
"""保存记录"""
# 创建记录
record = self.data_model.create_record(
data_type,
content,
metadata
)
# 持久化存储
success = await self.storage.save_record(record)
if not success:
return None
# 更新缓存
if use_cache and self.cache:
await self.cache.set(record.id, record)
return record
async def load_record(
self,
record_id: str,
use_cache: bool = True
) -> Optional[DataRecord]:
"""加载记录"""
# 先从缓存获取
if use_cache and self.cache:
record = await self.cache.get(record_id)
if record:
return record
# 从存储加载
record = await self.storage.load_record(record_id)
if record and use_cache and self.cache:
# 更新缓存
await self.cache.set(record.id, record)
return record
async def update_record(
self,
record_id: str,
content: Dict[str, Any] = None,
metadata: Dict[str, Any] = None,
use_cache: bool = True
) -> Optional[DataRecord]:
"""更新记录"""
# 更新数据模型
record = self.data_model.update_record(
record_id,
content,
metadata
)
if not record:
return None
# 持久化存储
success = await self.storage.save_record(record)
if not success:
return None
# 更新缓存
if use_cache and self.cache:
await self.cache.set(record.id, record)
return record
async def delete_record(
self,
record_id: str,
use_cache: bool = True
) -> bool:
"""删除记录"""
# 从存储删除
success = await self.storage.delete_record(record_id)
if not success:
return False
# 从缓存删除
if use_cache and self.cache:
await self.cache.delete(record_id)
# 从数据模型删除
self.data_model.delete_record(record_id)
return True
async def query_records(
self,
data_type: DataType = None,
filters: Dict[str, Any] = None
) -> List[DataRecord]:
"""查询记录"""
return await self.storage.query_records(data_type, filters)
```
**5. 数据备份和恢复**
```python
import shutil
from typing import Optional
from datetime import datetime
class BackupManager:
"""备份管理器"""
def __init__(self, storage_dir: str = "data", backup_dir: str = "backups"):
self.storage_dir = Path(storage_dir)
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(parents=True, exist_ok=True)
async def create_backup(self, backup_name: str = None) -> Optional[str]:
"""创建备份"""
if not backup_name:
backup_name = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = self.backup_dir / backup_name
try:
# 创建备份目录
backup_path.mkdir(parents=True, exist_ok=True)
# 复制数据文件
for file_path in self.storage_dir.glob("*.json"):
shutil.copy2(file_path, backup_path / file_path.name)
# 创建备份元数据
metadata = {
"backup_name": backup_name,
"created_at": datetime.now().isoformat(),
"file_count": len(list(backup_path.glob("*.json")))
}
metadata_path = backup_path / "backup_metadata.json"
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
return backup_name
except Exception as e:
print(f"创建备份失败: {e}")
return None
async def restore_backup(self, backup_name: str) -> bool:
"""恢复备份"""
backup_path = self.backup_dir / backup_name
if not backup_path.exists():
print(f"备份不存在: {backup_name}")
return False
try:
# 清空当前数据目录
for file_path in self.storage_dir.glob("*.json"):
file_path.unlink()
# 复制备份文件
for file_path in backup_path.glob("*.json"):
if file_path.name != "backup_metadata.json":
shutil.copy2(file_path, self.storage_dir / file_path.name)
return True
except Exception as e:
print(f"恢复备份失败: {e}")
return False
async def list_backups(self) -> List[Dict[str, Any]]:
"""列出所有备份"""
backups = []
for backup_path in self.backup_dir.iterdir():
if not backup_path.is_dir():
continue
metadata_path = backup_path / "backup_metadata.json"
if metadata_path.exists():
with open(metadata_path, 'r') as f:
metadata = json.load(f)
backups.append(metadata)
return backups
async def delete_backup(self, backup_name: str) -> bool:
"""删除备份"""
backup_path = self.backup_dir / backup_name
if backup_path.exists():
shutil.rmtree(backup_path)
return True
return False
```
**最佳实践:**
1. **分层存储**:根据数据访问频率选择合适的存储类型
2. **缓存策略**:实现多级缓存,提高访问速度
3. **数据一致性**:确保缓存和存储的数据一致性
4. **定期备份**:定期创建数据备份,防止数据丢失
5. **监控告警**:监控存储和缓存的健康状态
6. **性能优化**:优化数据访问和存储性能
通过完善的数据持久化和缓存策略,可以确保 MCP 系统的高性能和可靠性。
服务端 · 2月21日 15:51
MCP 的错误处理和重试机制如何实现?MCP 的错误处理和重试机制对于确保系统稳定性和可靠性至关重要。以下是详细的错误处理策略和重试机制实现:
**错误处理架构**
MCP 错误处理应考虑以下方面:
1. **错误分类**:区分不同类型的错误
2. **错误传播**:正确传播错误信息
3. **错误恢复**:实现错误恢复机制
4. **重试策略**:智能的重试策略
5. **熔断机制**:防止级联故障
6. **降级策略**:在故障时提供降级服务
**1. 错误分类和定义**
```python
from enum import Enum
from typing import Optional, Dict, Any
from dataclasses import dataclass
class ErrorType(Enum):
"""错误类型"""
VALIDATION_ERROR = "validation_error"
AUTHENTICATION_ERROR = "authentication_error"
AUTHORIZATION_ERROR = "authorization_error"
NOT_FOUND_ERROR = "not_found_error"
CONFLICT_ERROR = "conflict_error"
RATE_LIMIT_ERROR = "rate_limit_error"
INTERNAL_ERROR = "internal_error"
EXTERNAL_SERVICE_ERROR = "external_service_error"
TIMEOUT_ERROR = "timeout_error"
NETWORK_ERROR = "network_error"
class ErrorSeverity(Enum):
"""错误严重程度"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class MCPError(Exception):
"""MCP 错误基类"""
error_type: ErrorType
message: str
code: int
details: Dict[str, Any] = None
severity: ErrorSeverity = ErrorSeverity.MEDIUM
retryable: bool = False
def __post_init__(self):
if self.details is None:
self.details = {}
super().__init__(self.message)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"error_type": self.error_type.value,
"message": self.message,
"code": self.code,
"details": self.details,
"severity": self.severity.value,
"retryable": self.retryable
}
class ValidationError(MCPError):
"""验证错误"""
def __init__(self, message: str, details: Dict[str, Any] = None):
super().__init__(
error_type=ErrorType.VALIDATION_ERROR,
message=message,
code=400,
details=details,
severity=ErrorSeverity.LOW,
retryable=False
)
class AuthenticationError(MCPError):
"""认证错误"""
def __init__(self, message: str = "Authentication failed"):
super().__init__(
error_type=ErrorType.AUTHENTICATION_ERROR,
message=message,
code=401,
severity=ErrorSeverity.HIGH,
retryable=False
)
class AuthorizationError(MCPError):
"""授权错误"""
def __init__(self, message: str = "Access denied"):
super().__init__(
error_type=ErrorType.AUTHORIZATION_ERROR,
message=message,
code=403,
severity=ErrorSeverity.HIGH,
retryable=False
)
class NotFoundError(MCPError):
"""未找到错误"""
def __init__(self, resource: str, identifier: str):
super().__init__(
error_type=ErrorType.NOT_FOUND_ERROR,
message=f"{resource} not found: {identifier}",
code=404,
details={"resource": resource, "identifier": identifier},
severity=ErrorSeverity.LOW,
retryable=False
)
class RateLimitError(MCPError):
"""速率限制错误"""
def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60):
super().__init__(
error_type=ErrorType.RATE_LIMIT_ERROR,
message=message,
code=429,
details={"retry_after": retry_after},
severity=ErrorSeverity.MEDIUM,
retryable=True
)
class InternalError(MCPError):
"""内部错误"""
def __init__(self, message: str = "Internal server error"):
super().__init__(
error_type=ErrorType.INTERNAL_ERROR,
message=message,
code=500,
severity=ErrorSeverity.CRITICAL,
retryable=True
)
class ExternalServiceError(MCPError):
"""外部服务错误"""
def __init__(self, service: str, message: str):
super().__init__(
error_type=ErrorType.EXTERNAL_SERVICE_ERROR,
message=f"{service} error: {message}",
code=502,
details={"service": service},
severity=ErrorSeverity.HIGH,
retryable=True
)
class TimeoutError(MCPError):
"""超时错误"""
def __init__(self, operation: str, timeout: float):
super().__init__(
error_type=ErrorType.TIMEOUT_ERROR,
message=f"{operation} timed out after {timeout}s",
code=504,
details={"operation": operation, "timeout": timeout},
severity=ErrorSeverity.HIGH,
retryable=True
)
```
**2. 错误处理器**
```python
from typing import Callable, Optional, Dict, Any
import logging
import traceback
class ErrorHandler:
"""错误处理器"""
def __init__(self, logger: logging.Logger = None):
self.logger = logger or logging.getLogger(__name__)
self.error_handlers: Dict[ErrorType, Callable] = {}
self.error_reporters: List[Callable] = []
def register_handler(
self,
error_type: ErrorType,
handler: Callable
):
"""注册错误处理器"""
self.error_handlers[error_type] = handler
def register_reporter(self, reporter: Callable):
"""注册错误报告器"""
self.error_reporters.append(reporter)
async def handle_error(
self,
error: Exception,
context: Dict[str, Any] = None
) -> Dict[str, Any]:
"""处理错误"""
# 记录错误
await self._log_error(error, context)
# 报告错误
await self._report_error(error, context)
# 转换为 MCP 错误
mcp_error = self._convert_to_mcp_error(error)
# 调用特定错误处理器
if mcp_error.error_type in self.error_handlers:
try:
result = await self.error_handlers[mcp_error.error_type](
mcp_error,
context
)
return result
except Exception as e:
self.logger.error(f"错误处理器失败: {e}")
# 返回默认错误响应
return mcp_error.to_dict()
async def _log_error(
self,
error: Exception,
context: Dict[str, Any] = None
):
"""记录错误"""
if isinstance(error, MCPError):
self.logger.error(
f"MCP Error: {error.error_type.value} - {error.message}",
extra={
"error_code": error.code,
"error_details": error.details,
"context": context
}
)
else:
self.logger.error(
f"Unexpected error: {str(error)}",
exc_info=True,
extra={"context": context}
)
async def _report_error(
self,
error: Exception,
context: Dict[str, Any] = None
):
"""报告错误"""
for reporter in self.error_reporters:
try:
await reporter(error, context)
except Exception as e:
self.logger.error(f"错误报告器失败: {e}")
def _convert_to_mcp_error(self, error: Exception) -> MCPError:
"""转换为 MCP 错误"""
if isinstance(error, MCPError):
return error
# 根据异常类型转换
if isinstance(error, ValueError):
return ValidationError(str(error))
elif isinstance(error, PermissionError):
return AuthorizationError(str(error))
elif isinstance(error, TimeoutError):
return TimeoutError("operation", 0)
else:
return InternalError(str(error))
# 错误报告器示例
class ErrorReporter:
"""错误报告器"""
def __init__(self, error_service_url: str):
self.error_service_url = error_service_url
async def report_error(
self,
error: Exception,
context: Dict[str, Any] = None
):
"""报告错误到错误服务"""
import aiohttp
error_data = {
"error": str(error),
"error_type": type(error).__name__,
"context": context or {},
"timestamp": datetime.now().isoformat()
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
self.error_service_url,
json=error_data
) as response:
if response.status != 200:
self.logger.error(
f"报告错误失败: {response.status}"
)
except Exception as e:
self.logger.error(f"报告错误失败: {e}")
```
**3. 重试机制**
```python
import asyncio
from typing import Callable, Optional, Type
import time
class RetryStrategy:
"""重试策略基类"""
async def should_retry(
self,
attempt: int,
error: Exception
) -> bool:
"""判断是否应该重试"""
raise NotImplementedError
async def get_delay(self, attempt: int) -> float:
"""获取重试延迟"""
raise NotImplementedError
class FixedDelayRetry(RetryStrategy):
"""固定延迟重试"""
def __init__(self, max_attempts: int = 3, delay: float = 1.0):
self.max_attempts = max_attempts
self.delay = delay
async def should_retry(
self,
attempt: int,
error: Exception
) -> bool:
"""判断是否应该重试"""
if attempt >= self.max_attempts:
return False
if isinstance(error, MCPError):
return error.retryable
return True
async def get_delay(self, attempt: int) -> float:
"""获取重试延迟"""
return self.delay
class ExponentialBackoffRetry(RetryStrategy):
"""指数退避重试"""
def __init__(
self,
max_attempts: int = 5,
initial_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0
):
self.max_attempts = max_attempts
self.initial_delay = initial_delay
self.max_delay = max_delay
self.backoff_factor = backoff_factor
async def should_retry(
self,
attempt: int,
error: Exception
) -> bool:
"""判断是否应该重试"""
if attempt >= self.max_attempts:
return False
if isinstance(error, MCPError):
return error.retryable
return True
async def get_delay(self, attempt: int) -> float:
"""获取重试延迟"""
delay = self.initial_delay * (self.backoff_factor ** attempt)
return min(delay, self.max_delay)
class RetryManager:
"""重试管理器"""
def __init__(self, retry_strategy: RetryStrategy):
self.retry_strategy = retry_strategy
async def execute_with_retry(
self,
func: Callable,
*args,
**kwargs
) -> Any:
"""带重试执行函数"""
attempt = 0
last_error = None
while True:
attempt += 1
try:
result = await func(*args, **kwargs)
return result
except Exception as error:
last_error = error
# 判断是否应该重试
should_retry = await self.retry_strategy.should_retry(
attempt,
error
)
if not should_retry:
raise error
# 获取重试延迟
delay = await self.retry_strategy.get_delay(attempt)
# 等待后重试
await asyncio.sleep(delay)
raise last_error
# 重试装饰器
def retry(
max_attempts: int = 3,
delay: float = 1.0,
backoff_factor: float = 2.0,
max_delay: float = 60.0
):
"""重试装饰器"""
def decorator(func: Callable):
retry_strategy = ExponentialBackoffRetry(
max_attempts=max_attempts,
initial_delay=delay,
max_delay=max_delay,
backoff_factor=backoff_factor
)
retry_manager = RetryManager(retry_strategy)
@wraps(func)
async def wrapper(*args, **kwargs):
return await retry_manager.execute_with_retry(
func,
*args,
**kwargs
)
return wrapper
return decorator
```
**4. 熔断机制**
```python
from enum import Enum
from typing import Callable, Optional
import asyncio
class CircuitState(Enum):
"""熔断器状态"""
CLOSED = "closed" # 正常状态
OPEN = "open" # 熔断状态
HALF_OPEN = "half_open" # 半开状态
class CircuitBreaker:
"""熔断器"""
def __init__(
self,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 60.0
):
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time: Optional[float] = None
self.lock = asyncio.Lock()
async def execute(
self,
func: Callable,
*args,
**kwargs
) -> Any:
"""执行函数"""
async with self.lock:
# 检查熔断器状态
if self.state == CircuitState.OPEN:
# 检查是否应该尝试恢复
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise MCPError(
error_type=ErrorType.INTERNAL_ERROR,
message="Circuit breaker is OPEN",
code=503,
retryable=True
)
try:
result = await func(*args, **kwargs)
# 成功执行
async with self.lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
return result
except Exception as error:
# 执行失败
async with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise error
def get_state(self) -> CircuitState:
"""获取熔断器状态"""
return self.state
def reset(self):
"""重置熔断器"""
async with self.lock:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
# 熔断器装饰器
def circuit_breaker(
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 60.0
):
"""熔断器装饰器"""
def decorator(func: Callable):
breaker = CircuitBreaker(
failure_threshold=failure_threshold,
success_threshold=success_threshold,
timeout=timeout
)
@wraps(func)
async def wrapper(*args, **kwargs):
return await breaker.execute(func, *args, **kwargs)
return wrapper
return decorator
```
**5. 降级策略**
```python
from typing import Callable, Optional, Dict, Any
import asyncio
class FallbackStrategy:
"""降级策略基类"""
async def execute_fallback(
self,
error: Exception,
context: Dict[str, Any] = None
) -> Any:
"""执行降级逻辑"""
raise NotImplementedError
class CacheFallback(FallbackStrategy):
"""缓存降级"""
def __init__(self, cache: Dict[str, Any]):
self.cache = cache
async def execute_fallback(
self,
error: Exception,
context: Dict[str, Any] = None
) -> Any:
"""从缓存获取数据"""
cache_key = context.get("cache_key") if context else None
if cache_key and cache_key in self.cache:
return self.cache[cache_key]
raise error
class DefaultFallback(FallbackStrategy):
"""默认值降级"""
def __init__(self, default_value: Any):
self.default_value = default_value
async def execute_fallback(
self,
error: Exception,
context: Dict[str, Any] = None
) -> Any:
"""返回默认值"""
return self.default_value
class FallbackManager:
"""降级管理器"""
def __init__(self):
self.fallback_strategies: Dict[ErrorType, FallbackStrategy] = {}
self.default_fallback: Optional[FallbackStrategy] = None
def register_fallback(
self,
error_type: ErrorType,
fallback: FallbackStrategy
):
"""注册降级策略"""
self.fallback_strategies[error_type] = fallback
def set_default_fallback(self, fallback: FallbackStrategy):
"""设置默认降级策略"""
self.default_fallback = fallback
async def execute_with_fallback(
self,
func: Callable,
context: Dict[str, Any] = None,
*args,
**kwargs
) -> Any:
"""带降级执行函数"""
try:
return await func(*args, **kwargs)
except Exception as error:
# 转换为 MCP 错误
if not isinstance(error, MCPError):
error = InternalError(str(error))
# 查找对应的降级策略
fallback = self.fallback_strategies.get(
error.error_type,
self.default_fallback
)
if fallback:
try:
return await fallback.execute_fallback(error, context)
except Exception as fallback_error:
raise fallback_error
raise error
# 降级装饰器
def fallback(
error_type: ErrorType = None,
default_value: Any = None
):
"""降级装饰器"""
def decorator(func: Callable):
fallback_manager = FallbackManager()
if error_type and default_value is not None:
fallback_manager.register_fallback(
error_type,
DefaultFallback(default_value)
)
@wraps(func)
async def wrapper(*args, **kwargs):
return await fallback_manager.execute_with_fallback(
func,
None,
*args,
**kwargs
)
return wrapper
return decorator
```
**6. 综合错误处理示例**
```python
from mcp.server import Server
class RobustMCPServer(Server):
"""健壮的 MCP 服务器"""
def __init__(self, name: str):
super().__init__(name)
# 初始化错误处理组件
self.error_handler = ErrorHandler()
self.retry_manager = RetryManager(ExponentialBackoffRetry())
self.circuit_breaker = CircuitBreaker()
self.fallback_manager = FallbackManager()
# 配置错误处理
self._setup_error_handling()
def _setup_error_handling(self):
"""设置错误处理"""
# 注册错误处理器
self.error_handler.register_handler(
ErrorType.VALIDATION_ERROR,
self._handle_validation_error
)
self.error_handler.register_handler(
ErrorType.RATE_LIMIT_ERROR,
self._handle_rate_limit_error
)
# 注册降级策略
self.fallback_manager.register_fallback(
ErrorType.EXTERNAL_SERVICE_ERROR,
CacheFallback({})
)
async def _handle_validation_error(
self,
error: ValidationError,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""处理验证错误"""
return {
"error": error.to_dict(),
"suggestions": self._get_validation_suggestions(error.details)
}
async def _handle_rate_limit_error(
self,
error: RateLimitError,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""处理速率限制错误"""
retry_after = error.details.get("retry_after", 60)
return {
"error": error.to_dict(),
"retry_after": retry_after,
"message": f"请等待 {retry_after} 秒后重试"
}
def _get_validation_suggestions(
self,
details: Dict[str, Any]
) -> List[str]:
"""获取验证建议"""
suggestions = []
# 根据错误详情提供建议
# ...
return suggestions
@retry(max_attempts=3, delay=1.0)
@circuit_breaker(failure_threshold=5, timeout=60.0)
@fallback(error_type=ErrorType.EXTERNAL_SERVICE_ERROR, default_value={})
async def call_external_service(
self,
service_url: str,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""调用外部服务"""
try:
# 调用外部服务
# ...
pass
except Exception as error:
# 转换为 MCP 错误
raise ExternalServiceError("external", str(error))
```
**最佳实践:**
1. **错误分类**:正确分类错误类型,便于针对性处理
2. **重试策略**:根据错误类型选择合适的重试策略
3. **熔断机制**:防止级联故障,保护系统稳定性
4. **降级策略**:在故障时提供降级服务,保证基本功能
5. **错误日志**:详细记录错误信息,便于问题排查
6. **监控告警**:监控错误率,及时发现问题
通过完善的错误处理和重试机制,可以确保 MCP 系统的稳定性和可靠性。
服务端 · 2月21日 15:51
MCP 的生态系统和社区支持有哪些?MCP 的生态系统和社区支持对于其发展和采用至关重要。以下是详细的生态系统分析和社区参与方式:
**MCP 生态系统架构**
MCP 生态系统包括以下组成部分:
1. **核心协议**:MCP 协议规范和实现
2. **客户端库**:各种编程语言的客户端库
3. **服务器实现**:不同平台的服务器实现
4. **工具和插件**:扩展 MCP 功能的工具和插件
5. **文档和教程**:学习资源和最佳实践
6. **社区贡献**:开源项目和社区活动
**1. MCP 客户端库**
```python
# Python 客户端库示例
from typing import Dict, Any, Optional, List
import asyncio
import json
class MCPClient:
"""MCP 客户端"""
def __init__(self, server_url: str):
self.server_url = server_url
self.session_id: Optional[str] = None
self.capabilities: Dict[str, Any] = {}
async def connect(self) -> bool:
"""连接到 MCP 服务器"""
try:
# 初始化连接
response = await self._send_request({
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {},
"resources": {},
"prompts": {}
}
},
"id": 1
})
if "error" in response:
print(f"连接失败: {response['error']}")
return False
# 保存会话信息
self.session_id = response.get("result", {}).get("sessionId")
self.capabilities = response.get("result", {}).get("capabilities", {})
# 发送 initialized 通知
await self._send_notification({
"jsonrpc": "2.0",
"method": "notifications/initialized"
})
return True
except Exception as e:
print(f"连接错误: {e}")
return False
async def list_tools(self) -> List[Dict[str, Any]]:
"""列出可用工具"""
response = await self._send_request({
"jsonrpc": "2.0",
"method": "tools/list",
"id": 2
})
if "error" in response:
print(f"获取工具列表失败: {response['error']}")
return []
return response.get("result", {}).get("tools", [])
async def call_tool(
self,
name: str,
arguments: Dict[str, Any]
) -> Any:
"""调用工具"""
response = await self._send_request({
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": name,
"arguments": arguments
},
"id": 3
})
if "error" in response:
raise Exception(f"工具调用失败: {response['error']}")
return response.get("result")
async def list_resources(self) -> List[Dict[str, Any]]:
"""列出可用资源"""
response = await self._send_request({
"jsonrpc": "2.0",
"method": "resources/list",
"id": 4
})
if "error" in response:
print(f"获取资源列表失败: {response['error']}")
return []
return response.get("result", {}).get("resources", [])
async def read_resource(self, uri: str) -> Any:
"""读取资源"""
response = await self._send_request({
"jsonrpc": "2.0",
"method": "resources/read",
"params": {
"uri": uri
},
"id": 5
})
if "error" in response:
raise Exception(f"读取资源失败: {response['error']}")
return response.get("result")
async def list_prompts(self) -> List[Dict[str, Any]]:
"""列出可用提示词"""
response = await self._send_request({
"jsonrpc": "2.0",
"method": "prompts/list",
"id": 6
})
if "error" in response:
print(f"获取提示词列表失败: {response['error']}")
return []
return response.get("result", {}).get("prompts", [])
async def get_prompt(
self,
name: str,
arguments: Dict[str, Any] = None
) -> Any:
"""获取提示词"""
response = await self._send_request({
"jsonrpc": "2.0",
"method": "prompts/get",
"params": {
"name": name,
"arguments": arguments or {}
},
"id": 7
})
if "error" in response:
raise Exception(f"获取提示词失败: {response['error']}")
return response.get("result")
async def _send_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""发送请求"""
# 实现实际的请求发送逻辑
# 这里使用模拟实现
print(f"发送请求: {json.dumps(request, indent=2)}")
# 模拟响应
return {
"jsonrpc": "2.0",
"id": request["id"],
"result": {}
}
async def _send_notification(self, notification: Dict[str, Any]):
"""发送通知"""
print(f"发送通知: {json.dumps(notification, indent=2)}")
async def disconnect(self):
"""断开连接"""
await self._send_notification({
"jsonrpc": "2.0",
"method": "shutdown"
})
self.session_id = None
self.capabilities = {}
```
**2. MCP 服务器实现**
```python
# MCP 服务器实现示例
from mcp.server import Server
from typing import Dict, Any, List
class MyMCPServer(Server):
"""自定义 MCP 服务器"""
def __init__(self, name: str = "my-mcp-server"):
super().__init__(name)
self._setup_tools()
self._setup_resources()
self._setup_prompts()
def _setup_tools(self):
"""设置工具"""
@self.tool(
name="calculate",
description="执行数学计算"
)
async def calculate(
expression: str,
operation: str = "evaluate"
) -> str:
"""计算工具"""
try:
if operation == "evaluate":
result = eval(expression)
return f"计算结果: {result}"
else:
return "不支持的操作"
except Exception as e:
return f"计算错误: {str(e)}"
@self.tool(
name="search",
description="搜索信息"
)
async def search(
query: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""搜索工具"""
# 实现搜索逻辑
results = [
{
"title": f"结果 {i}",
"url": f"https://example.com/{i}",
"snippet": f"这是关于 {query} 的结果 {i}"
}
for i in range(min(limit, 10))
]
return results
def _setup_resources(self):
"""设置资源"""
@self.resource(
uri="config://settings",
name="配置设置",
description="服务器配置"
)
async def get_config() -> Dict[str, Any]:
"""获取配置"""
return {
"version": "1.0.0",
"features": ["tools", "resources", "prompts"],
"limits": {
"max_requests_per_minute": 100,
"max_concurrent_requests": 10
}
}
@self.resource(
uri="data://statistics",
name="统计数据",
description="服务器统计信息"
)
async def get_statistics() -> Dict[str, Any]:
"""获取统计信息"""
return {
"total_requests": 1000,
"successful_requests": 950,
"failed_requests": 50,
"average_response_time": 0.5
}
def _setup_prompts(self):
"""设置提示词"""
@self.prompt(
name="code_review",
description="代码审查提示词"
)
async def code_review_prompt(
language: str = "Python",
focus: str = "performance"
) -> str:
"""代码审查提示词"""
return f"""
请审查以下 {language} 代码,重点关注 {focus}:
1. 代码质量和可读性
2. {focus} 相关的问题
3. 潜在的 bug 和安全问题
4. 改进建议
请提供详细的审查意见和改进建议。
"""
@self.prompt(
name="documentation",
description="文档生成提示词"
)
async def documentation_prompt(
doc_type: str = "API",
format: str = "Markdown"
) -> str:
"""文档生成提示词"""
return f"""
请为以下代码生成 {doc_type} 文档,使用 {format} 格式:
1. 功能概述
2. 参数说明
3. 返回值说明
4. 使用示例
5. 注意事项
确保文档清晰、准确、易于理解。
"""
# 启动服务器
async def start_server():
"""启动 MCP 服务器"""
server = MyMCPServer()
# 启动服务器
await server.start()
print("MCP 服务器已启动")
# 保持服务器运行
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("正在关闭服务器...")
await server.stop()
print("服务器已关闭")
if __name__ == "__main__":
asyncio.run(start_server())
```
**3. MCP 社区项目**
```python
# 社区贡献的 MCP 工具和插件示例
class MCPCommunityTools:
"""社区 MCP 工具集合"""
@staticmethod
def get_popular_tools() -> List[Dict[str, Any]]:
"""获取热门工具"""
return [
{
"name": "mcp-database",
"description": "数据库操作工具",
"author": "community",
"stars": 150,
"url": "https://github.com/example/mcp-database"
},
{
"name": "mcp-filesystem",
"description": "文件系统操作工具",
"author": "community",
"stars": 120,
"url": "https://github.com/example/mcp-filesystem"
},
{
"name": "mcp-web-scraper",
"description": "网页抓取工具",
"author": "community",
"stars": 100,
"url": "https://github.com/example/mcp-web-scraper"
},
{
"name": "mcp-api-client",
"description": "API 客户端工具",
"author": "community",
"stars": 90,
"url": "https://github.com/example/mcp-api-client"
}
]
@staticmethod
def get_server_implementations() -> List[Dict[str, Any]]:
"""获取服务器实现"""
return [
{
"name": "mcp-server-python",
"language": "Python",
"description": "Python MCP 服务器实现",
"version": "1.0.0"
},
{
"name": "mcp-server-nodejs",
"language": "JavaScript",
"description": "Node.js MCP 服务器实现",
"version": "1.0.0"
},
{
"name": "mcp-server-go",
"language": "Go",
"description": "Go MCP 服务器实现",
"version": "0.9.0"
},
{
"name": "mcp-server-rust",
"language": "Rust",
"description": "Rust MCP 服务器实现",
"version": "0.8.0"
}
]
@staticmethod
def get_client_libraries() -> List[Dict[str, Any]]:
"""获取客户端库"""
return [
{
"name": "mcp-client-python",
"language": "Python",
"description": "Python MCP 客户端库",
"version": "1.0.0",
"pip": "pip install mcp-client"
},
{
"name": "mcp-client-js",
"language": "JavaScript",
"description": "JavaScript MCP 客户端库",
"version": "1.0.0",
"npm": "npm install @mcp/client"
},
{
"name": "mcp-client-java",
"language": "Java",
"description": "Java MCP 客户端库",
"version": "0.9.0",
"maven": "implementation 'com.mcp:client:0.9.0'"
}
]
```
**4. MCP 文档和教程**
```python
class MCPDocumentation:
"""MCP 文档资源"""
@staticmethod
def get_official_docs() -> List[Dict[str, Any]]:
"""获取官方文档"""
return [
{
"title": "MCP 协议规范",
"url": "https://spec.modelcontextprotocol.io",
"description": "MCP 协议的完整技术规范",
"language": "en"
},
{
"title": "快速入门指南",
"url": "https://docs.modelcontextprotocol.io/quickstart",
"description": "快速开始使用 MCP 的指南",
"language": "zh"
},
{
"title": "服务器实现指南",
"url": "https://docs.modelcontextprotocol.io/server",
"description": "如何实现 MCP 服务器",
"language": "zh"
},
{
"title": "客户端开发指南",
"url": "https://docs.modelcontextprotocol.io/client",
"description": "如何开发 MCP 客户端",
"language": "zh"
}
]
@staticmethod
def get_tutorials() -> List[Dict[str, Any]]:
"""获取教程"""
return [
{
"title": "构建第一个 MCP 服务器",
"url": "https://docs.modelcontextprotocol.io/tutorials/first-server",
"description": "从零开始构建 MCP 服务器",
"difficulty": "beginner",
"duration": "30 minutes"
},
{
"title": "MCP 工具开发",
"url": "https://docs.modelcontextprotocol.io/tutorials/tool-development",
"description": "开发自定义 MCP 工具",
"difficulty": "intermediate",
"duration": "1 hour"
},
{
"title": "MCP 集成最佳实践",
"url": "https://docs.modelcontextprotocol.io/tutorials/best-practices",
"description": "MCP 集成的最佳实践",
"difficulty": "advanced",
"duration": "2 hours"
}
]
@staticmethod
def get_examples() -> List[Dict[str, Any]]:
"""获取示例代码"""
return [
{
"title": "简单计算器服务器",
"url": "https://github.com/modelcontextprotocol/examples/tree/main/calculator",
"description": "一个简单的计算器 MCP 服务器示例",
"language": "Python"
},
{
"title": "文件系统服务器",
"url": "https://github.com/modelcontextprotocol/examples/tree/main/filesystem",
"description": "文件系统操作 MCP 服务器示例",
"language": "Python"
},
{
"title": "数据库集成服务器",
"url": "https://github.com/modelcontextprotocol/examples/tree/main/database",
"description": "数据库集成 MCP 服务器示例",
"language": "Python"
}
]
```
**5. MCP 社区参与**
```python
class MCPCommunity:
"""MCP 社区参与"""
@staticmethod
def get_community_channels() -> List[Dict[str, Any]]:
"""获取社区渠道"""
return [
{
"name": "GitHub",
"url": "https://github.com/modelcontextprotocol",
"description": "MCP GitHub 组织",
"type": "code"
},
{
"name": "Discord",
"url": "https://discord.gg/mcp",
"description": "MCP Discord 社区",
"type": "chat"
},
{
"name": "Twitter",
"url": "https://twitter.com/modelcontext",
"description": "MCP Twitter 账号",
"type": "social"
},
{
"name": "Reddit",
"url": "https://reddit.com/r/modelcontextprotocol",
"description": "MCP Reddit 社区",
"type": "forum"
}
]
@staticmethod
def get_contribution_guidelines() -> Dict[str, Any]:
"""获取贡献指南"""
return {
"code_of_conduct": "https://github.com/modelcontextprotocol/.github/blob/main/CODE_OF_CONDUCT.md",
"contributing": "https://github.com/modelcontextprotocol/.github/blob/main/CONTRIBUTING.md",
"pull_request_template": "https://github.com/modelcontextprotocol/.github/blob/main/PULL_REQUEST_TEMPLATE.md",
"issue_template": "https://github.com/modelcontextprotocol/.github/blob/main/ISSUE_TEMPLATE.md"
}
@staticmethod
def get_ways_to_contribute() -> List[Dict[str, Any]]:
"""获取贡献方式"""
return [
{
"type": "代码贡献",
"description": "提交代码改进和 bug 修复",
"difficulty": "intermediate"
},
{
"type": "文档改进",
"description": "改进和翻译文档",
"difficulty": "beginner"
},
{
"type": "问题报告",
"description": "报告 bug 和提出功能请求",
"difficulty": "beginner"
},
{
"type": "工具开发",
"description": "开发新的 MCP 工具和插件",
"difficulty": "advanced"
},
{
"type": "社区支持",
"description": "在社区中回答问题和提供帮助",
"difficulty": "intermediate"
}
]
```
**最佳实践:**
1. **参与社区**:积极参与 MCP 社区讨论和贡献
2. **学习资源**:充分利用官方文档和教程资源
3. **开源贡献**:为 MCP 生态系统贡献代码和工具
4. **分享经验**:分享使用 MCP 的经验和最佳实践
5. **反馈改进**:提供反馈帮助改进 MCP 协议和工具
6. **持续学习**:跟踪 MCP 的最新发展和更新
通过积极参与 MCP 生态系统和社区,可以更好地利用 MCP 的功能并为社区做出贡献。
服务端 · 2月21日 15:15
MCP 的性能监控和优化有哪些策略?MCP 的性能监控和优化对于确保系统高效运行至关重要。以下是详细的监控策略和优化方法:
**性能监控架构**
MCP 性能监控应考虑以下方面:
1. **指标收集**:收集系统运行的关键指标
2. **性能分析**:分析性能瓶颈和优化机会
3. **实时监控**:实时监控系统状态和性能
4. **告警机制**:及时发现和通知性能问题
5. **优化策略**:基于监控数据进行性能优化
**1. 指标收集系统**
```python
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime
import time
@dataclass
class Metric:
"""性能指标"""
name: str
value: float
timestamp: datetime
tags: Dict[str, str] = None
def __post_init__(self):
if self.tags is None:
self.tags = {}
class MetricsCollector:
"""指标收集器"""
def __init__(self):
self.metrics: List[Metric] = []
self.counters: Dict[str, int] = {}
self.gauges: Dict[str, float] = {}
self.histograms: Dict[str, List[float]] = {}
def increment_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
"""增加计数器"""
key = self._make_key(name, tags)
self.counters[key] = self.counters.get(key, 0) + value
# 记录指标
self._record_metric(name, self.counters[key], tags)
def set_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""设置仪表值"""
key = self._make_key(name, tags)
self.gauges[key] = value
# 记录指标
self._record_metric(name, value, tags)
def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None):
"""记录直方图值"""
key = self._make_key(name, tags)
if key not in self.histograms:
self.histograms[key] = []
self.histograms[key].append(value)
# 限制历史记录大小
if len(self.histograms[key]) > 1000:
self.histograms[key] = self.histograms[key][-1000:]
# 记录指标
self._record_metric(name, value, tags)
def _record_metric(self, name: str, value: float, tags: Dict[str, str] = None):
"""记录指标"""
metric = Metric(
name=name,
value=value,
timestamp=datetime.now(),
tags=tags or {}
)
self.metrics.append(metric)
# 限制指标历史大小
if len(self.metrics) > 10000:
self.metrics = self.metrics[-10000:]
def _make_key(self, name: str, tags: Dict[str, str] = None) -> str:
"""生成指标键"""
if not tags:
return name
tag_str = ",".join([f"{k}={v}" for k, v in sorted(tags.items())])
return f"{name}{{{tag_str}}}"
def get_metrics(
self,
name: str = None,
since: datetime = None
) -> List[Metric]:
"""获取指标"""
metrics = self.metrics
if name:
metrics = [m for m in metrics if m.name == name]
if since:
metrics = [m for m in metrics if m.timestamp >= since]
return metrics
def get_histogram_stats(
self,
name: str,
tags: Dict[str, str] = None
) -> Optional[Dict]:
"""获取直方图统计"""
key = self._make_key(name, tags)
if key not in self.histograms:
return None
values = self.histograms[key]
if not values:
return None
sorted_values = sorted(values)
return {
"count": len(values),
"sum": sum(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
"p50": self._percentile(sorted_values, 50),
"p90": self._percentile(sorted_values, 90),
"p95": self._percentile(sorted_values, 95),
"p99": self._percentile(sorted_values, 99),
}
def _percentile(self, sorted_values: List[float], percentile: int) -> float:
"""计算百分位数"""
if not sorted_values:
return 0.0
index = int(len(sorted_values) * percentile / 100)
return sorted_values[min(index, len(sorted_values) - 1)]
```
**2. 性能分析器**
```python
from functools import wraps
from typing import Callable, Optional
import time
class PerformanceProfiler:
"""性能分析器"""
def __init__(self, metrics_collector: MetricsCollector):
self.metrics_collector = metrics_collector
def profile_function(
self,
metric_name: str,
tags: Dict[str, str] = None
):
"""函数性能分析装饰器"""
def decorator(func: Callable):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
# 记录执行时间
execution_time = time.time() - start_time
self.metrics_collector.record_histogram(
f"{metric_name}_duration",
execution_time,
tags
)
# 记录成功计数
self.metrics_collector.increment_counter(
f"{metric_name}_success",
tags=tags
)
return result
except Exception as e:
# 记录失败计数
self.metrics_collector.increment_counter(
f"{metric_name}_error",
tags=tags
)
raise e
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
# 记录执行时间
execution_time = time.time() - start_time
self.metrics_collector.record_histogram(
f"{metric_name}_duration",
execution_time,
tags
)
# 记录成功计数
self.metrics_collector.increment_counter(
f"{metric_name}_success",
tags=tags
)
return result
except Exception as e:
# 记录失败计数
self.metrics_collector.increment_counter(
f"{metric_name}_error",
tags=tags
)
raise e
# 根据函数类型返回对应的包装器
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
def profile_context(
self,
metric_name: str,
tags: Dict[str, str] = None
):
"""上下文管理器性能分析"""
class ProfileContext:
def __init__(self, profiler, name, tags):
self.profiler = profiler
self.name = name
self.tags = tags
self.start_time = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
execution_time = time.time() - self.start_time
self.profiler.metrics_collector.record_histogram(
f"{self.name}_duration",
execution_time,
self.tags
)
if exc_type is None:
self.profiler.metrics_collector.increment_counter(
f"{self.name}_success",
tags=self.tags
)
else:
self.profiler.metrics_collector.increment_counter(
f"{self.name}_error",
tags=self.tags
)
return False
return ProfileContext(self, metric_name, tags)
```
**3. 实时监控系统**
```python
import asyncio
from typing import Dict, List, Callable, Optional
class RealTimeMonitor:
"""实时监控器"""
def __init__(
self,
metrics_collector: MetricsCollector,
check_interval: int = 5
):
self.metrics_collector = metrics_collector
self.check_interval = check_interval
self.alerts: List[Dict] = []
self.alert_rules: List[Dict] = []
self.subscribers: List[Callable] = []
self.running = False
def add_alert_rule(
self,
name: str,
metric_name: str,
condition: str,
threshold: float,
severity: str = "warning"
):
"""添加告警规则"""
self.alert_rules.append({
"name": name,
"metric_name": metric_name,
"condition": condition,
"threshold": threshold,
"severity": severity
})
def subscribe(self, callback: Callable):
"""订阅告警"""
self.subscribers.append(callback)
def unsubscribe(self, callback: Callable):
"""取消订阅"""
if callback in self.subscribers:
self.subscribers.remove(callback)
async def start(self):
"""启动监控"""
self.running = True
while self.running:
await self.check_alerts()
await asyncio.sleep(self.check_interval)
async def stop(self):
"""停止监控"""
self.running = False
async def check_alerts(self):
"""检查告警"""
for rule in self.alert_rules:
try:
alert = await self._evaluate_rule(rule)
if alert:
self.alerts.append(alert)
await self._notify_subscribers(alert)
except Exception as e:
print(f"检查告警规则失败: {rule['name']}, 错误: {e}")
async def _evaluate_rule(self, rule: Dict) -> Optional[Dict]:
"""评估告警规则"""
metric_name = rule["metric_name"]
condition = rule["condition"]
threshold = rule["threshold"]
# 获取最近的指标
recent_metrics = self.metrics_collector.get_metrics(
metric_name,
since=datetime.now() - timedelta(minutes=1)
)
if not recent_metrics:
return None
# 计算聚合值
values = [m.value for m in recent_metrics]
avg_value = sum(values) / len(values)
# 检查条件
triggered = False
if condition == "greater_than":
triggered = avg_value > threshold
elif condition == "less_than":
triggered = avg_value < threshold
elif condition == "equals":
triggered = avg_value == threshold
elif condition == "not_equals":
triggered = avg_value != threshold
if triggered:
return {
"rule_name": rule["name"],
"metric_name": metric_name,
"current_value": avg_value,
"threshold": threshold,
"condition": condition,
"severity": rule["severity"],
"timestamp": datetime.now()
}
return None
async def _notify_subscribers(self, alert: Dict):
"""通知订阅者"""
for callback in self.subscribers:
try:
await callback(alert)
except Exception as e:
print(f"通知订阅者失败: {e}")
def get_recent_alerts(
self,
since: datetime = None,
severity: str = None
) -> List[Dict]:
"""获取最近的告警"""
alerts = self.alerts
if since:
alerts = [a for a in alerts if a["timestamp"] >= since]
if severity:
alerts = [a for a in alerts if a["severity"] == severity]
return alerts
```
**4. 性能优化策略**
```python
from typing import Dict, List, Optional
import asyncio
from functools import lru_cache
class PerformanceOptimizer:
"""性能优化器"""
def __init__(self, metrics_collector: MetricsCollector):
self.metrics_collector = metrics_collector
self.optimization_strategies = {}
def register_strategy(
self,
name: str,
strategy: Callable,
priority: int = 0
):
"""注册优化策略"""
self.optimization_strategies[name] = {
"strategy": strategy,
"priority": priority
}
async def optimize(self, context: Dict) -> Dict:
"""执行优化"""
results = {}
# 按优先级排序策略
sorted_strategies = sorted(
self.optimization_strategies.items(),
key=lambda x: x[1]["priority"],
reverse=True
)
for name, strategy_info in sorted_strategies:
try:
result = await strategy_info["strategy"](context)
results[name] = result
except Exception as e:
results[name] = {"error": str(e)}
return results
# 缓存优化策略
class CacheOptimizer:
"""缓存优化器"""
def __init__(self, max_size: int = 1000, ttl: int = 300):
self.max_size = max_size
self.ttl = ttl
self.cache: Dict[str, tuple] = {}
def get(self, key: str) -> Optional[any]:
"""获取缓存值"""
if key not in self.cache:
return None
value, timestamp = self.cache[key]
# 检查是否过期
if time.time() - timestamp > self.ttl:
del self.cache[key]
return None
return value
def set(self, key: str, value: any):
"""设置缓存值"""
# 检查缓存大小
if len(self.cache) >= self.max_size:
self._evict()
self.cache[key] = (value, time.time())
def _evict(self):
"""淘汰缓存项"""
# 简单实现:随机淘汰
if self.cache:
key = next(iter(self.cache))
del self.cache[key]
# 连接池优化
class ConnectionPoolOptimizer:
"""连接池优化器"""
def __init__(self, min_size: int = 5, max_size: int = 20):
self.min_size = min_size
self.max_size = max_size
self.pool = asyncio.Queue()
self.created = 0
self.lock = asyncio.Lock()
async def acquire(self) -> any:
"""获取连接"""
try:
# 尝试从池中获取连接
connection = await asyncio.wait_for(
self.pool.get(),
timeout=1.0
)
return connection
except asyncio.TimeoutError:
# 池中没有可用连接,创建新连接
async with self.lock:
if self.created < self.max_size:
connection = await self._create_connection()
self.created += 1
return connection
# 等待其他连接释放
return await self.pool.get()
async def release(self, connection: any):
"""释放连接"""
await self.pool.put(connection)
async def _create_connection(self) -> any:
"""创建新连接"""
# 实现具体的连接创建逻辑
pass
# 批处理优化
class BatchProcessor:
"""批处理器"""
def __init__(self, batch_size: int = 100, timeout: float = 1.0):
self.batch_size = batch_size
self.timeout = timeout
self.buffer: List = []
self.lock = asyncio.Lock()
async def add(self, item: any):
"""添加项目到批处理"""
async with self.lock:
self.buffer.append(item)
if len(self.buffer) >= self.batch_size:
await self._flush()
async def _flush(self):
"""刷新缓冲区"""
if not self.buffer:
return
batch = self.buffer.copy()
self.buffer.clear()
# 处理批次
await self._process_batch(batch)
async def _process_batch(self, batch: List):
"""处理批次"""
# 实现具体的批处理逻辑
pass
async def start_periodic_flush(self):
"""启动定期刷新"""
while True:
await asyncio.sleep(self.timeout)
async with self.lock:
await self._flush()
```
**5. 性能报告生成器**
```python
from typing import Dict, List
from datetime import datetime, timedelta
class PerformanceReportGenerator:
"""性能报告生成器"""
def __init__(self, metrics_collector: MetricsCollector):
self.metrics_collector = metrics_collector
def generate_report(
self,
start_time: datetime,
end_time: datetime
) -> Dict:
"""生成性能报告"""
report = {
"period": {
"start": start_time,
"end": end_time
},
"summary": {},
"metrics": {},
"alerts": [],
"recommendations": []
}
# 生成摘要
report["summary"] = self._generate_summary(start_time, end_time)
# 生成指标详情
report["metrics"] = self._generate_metrics_detail(
start_time,
end_time
)
# 生成优化建议
report["recommendations"] = self._generate_recommendations(
report["metrics"]
)
return report
def _generate_summary(
self,
start_time: datetime,
end_time: datetime
) -> Dict:
"""生成摘要"""
summary = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"average_response_time": 0.0,
"p95_response_time": 0.0,
"p99_response_time": 0.0
}
# 获取请求指标
success_metrics = self.metrics_collector.get_metrics(
"request_success",
since=start_time
)
error_metrics = self.metrics_collector.get_metrics(
"request_error",
since=start_time
)
duration_metrics = self.metrics_collector.get_metrics(
"request_duration",
since=start_time
)
summary["successful_requests"] = len(success_metrics)
summary["failed_requests"] = len(error_metrics)
summary["total_requests"] = (
summary["successful_requests"] +
summary["failed_requests"]
)
if duration_metrics:
durations = [m.value for m in duration_metrics]
summary["average_response_time"] = sum(durations) / len(durations)
summary["p95_response_time"] = self._percentile(durations, 95)
summary["p99_response_time"] = self._percentile(durations, 99)
return summary
def _generate_metrics_detail(
self,
start_time: datetime,
end_time: datetime
) -> Dict:
"""生成指标详情"""
metrics_detail = {}
# 获取所有指标名称
metric_names = set(m.name for m in self.metrics_collector.metrics)
for metric_name in metric_names:
metrics = self.metrics_collector.get_metrics(
metric_name,
since=start_time
)
if not metrics:
continue
values = [m.value for m in metrics]
metrics_detail[metric_name] = {
"count": len(values),
"sum": sum(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
"p50": self._percentile(values, 50),
"p90": self._percentile(values, 90),
"p95": self._percentile(values, 95),
"p99": self._percentile(values, 99),
}
return metrics_detail
def _generate_recommendations(
self,
metrics_detail: Dict
) -> List[str]:
"""生成优化建议"""
recommendations = []
# 检查响应时间
if "request_duration" in metrics_detail:
avg_response_time = metrics_detail["request_duration"]["avg"]
p95_response_time = metrics_detail["request_duration"]["p95"]
if avg_response_time > 1.0:
recommendations.append(
"平均响应时间超过 1 秒,建议优化慢查询或增加缓存"
)
if p95_response_time > 5.0:
recommendations.append(
"P95 响应时间超过 5 秒,建议检查性能瓶颈"
)
# 检查错误率
if "request_error" in metrics_detail:
error_count = metrics_detail["request_error"]["count"]
if error_count > 100:
recommendations.append(
"错误数量较多,建议检查错误日志并修复问题"
)
return recommendations
def _percentile(self, sorted_values: List[float], percentile: int) -> float:
"""计算百分位数"""
if not sorted_values:
return 0.0
sorted_values = sorted(sorted_values)
index = int(len(sorted_values) * percentile / 100)
return sorted_values[min(index, len(sorted_values) - 1)]
```
**最佳实践:**
1. **全面监控**:监控所有关键指标,包括请求、响应时间、错误率等
2. **实时告警**:设置合理的告警阈值,及时发现性能问题
3. **性能分析**:定期分析性能数据,识别优化机会
4. **缓存优化**:合理使用缓存减少重复计算和查询
5. **连接池**:使用连接池优化数据库和网络连接
6. **批处理**:使用批处理提高吞吐量
通过完善的性能监控和优化,可以确保 MCP 系统高效稳定运行。
服务端 · 2月19日 21:43
MCP 的版本管理和兼容性如何处理?MCP 的版本管理和兼容性对于确保系统的稳定性和可维护性至关重要。以下是详细的版本管理策略和兼容性处理方法:
**版本管理策略**
MCP 版本管理应考虑以下方面:
1. **语义化版本控制**:使用语义化版本号(SemVer)
2. **向后兼容性**:确保新版本向后兼容旧版本
3. **废弃策略**:明确的功能废弃和移除流程
4. **迁移指南**:提供详细的版本迁移指南
5. **版本协商**:客户端和服务器的版本协商机制
**1. 语义化版本控制**
```python
from dataclasses import dataclass
from typing import Optional
import re
@dataclass
class Version:
"""版本号"""
major: int
minor: int
patch: int
prerelease: Optional[str] = None
build_metadata: Optional[str] = None
def __str__(self) -> str:
version_str = f"{self.major}.{self.minor}.{self.patch}"
if self.prerelease:
version_str += f"-{self.prerelease}"
if self.build_metadata:
version_str += f"+{self.build_metadata}"
return version_str
@classmethod
def parse(cls, version_str: str) -> 'Version':
"""解析版本字符串"""
# 匹配语义化版本格式
pattern = r'^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?$'
match = re.match(pattern, version_str)
if not match:
raise ValueError(f"无效的版本格式: {version_str}")
major = int(match.group(1))
minor = int(match.group(2))
patch = int(match.group(3))
prerelease = match.group(4)
build_metadata = match.group(5)
return cls(
major=major,
minor=minor,
patch=patch,
prerelease=prerelease,
build_metadata=build_metadata
)
def is_compatible(self, other: 'Version') -> bool:
"""检查版本兼容性"""
# 主版本号相同,次版本号向后兼容
if self.major != other.major:
return False
# 如果当前版本 >= 其他版本,则兼容
if (self.minor, self.patch) >= (other.minor, other.patch):
return True
return False
def __lt__(self, other: 'Version') -> bool:
"""版本比较"""
if self.major != other.major:
return self.major < other.major
if self.minor != other.minor:
return self.minor < other.minor
if self.patch != other.patch:
return self.patch < other.patch
# 预发布版本比较
if self.prerelease and not other.prerelease:
return True
if not self.prerelease and other.prerelease:
return False
if self.prerelease and other.prerelease:
return self.prerelease < other.prerelease
return False
def __eq__(self, other: 'Version') -> bool:
"""版本相等"""
return (
self.major == other.major and
self.minor == other.minor and
self.patch == other.patch and
self.prerelease == other.prerelease
)
# 当前 MCP 版本
MCP_VERSION = Version(1, 0, 0)
```
**2. 版本协商机制**
```python
from typing import Dict, List, Optional
class VersionNegotiator:
"""版本协商器"""
def __init__(self, server_version: Version):
self.server_version = server_version
self.supported_versions: List[Version] = [
Version(1, 0, 0),
Version(0, 9, 0),
]
def negotiate_version(
self,
client_versions: List[Version]
) -> Optional[Version]:
"""协商最佳版本"""
# 找到客户端支持的最高版本
client_versions_sorted = sorted(client_versions, reverse=True)
for client_version in client_versions_sorted:
# 检查服务器是否支持此版本
for server_version in self.supported_versions:
if server_version.is_compatible(client_version):
return server_version
# 没有找到兼容版本
return None
def get_server_info(self) -> Dict:
"""获取服务器版本信息"""
return {
"version": str(self.server_version),
"supported_versions": [
str(v) for v in self.supported_versions
]
}
class MCPClient:
"""MCP 客户端"""
def __init__(self, supported_versions: List[Version]):
self.supported_versions = supported_versions
self.negotiated_version: Optional[Version] = None
async def connect(self, server_info: Dict) -> bool:
"""连接到服务器并协商版本"""
server_version = Version.parse(server_info["version"])
server_supported_versions = [
Version.parse(v)
for v in server_info["supported_versions"]
]
# 创建临时协商器
negotiator = VersionNegotiator(server_version)
# 协商版本
self.negotiated_version = negotiator.negotiate_version(
self.supported_versions
)
if not self.negotiated_version:
print("无法协商兼容的版本")
return False
print(f"协商版本: {self.negotiated_version}")
return True
```
**3. 功能废弃管理**
```python
from enum import Enum
from typing import Dict, List, Optional
from datetime import datetime
class DeprecationStatus(Enum):
"""废弃状态"""
STABLE = "stable"
DEPRECATED = "deprecated"
REMOVED = "removed"
@dataclass
class DeprecationInfo:
"""废弃信息"""
status: DeprecationStatus
deprecated_in: Optional[Version] = None
removed_in: Optional[Version] = None
replacement: Optional[str] = None
message: Optional[str] = None
class DeprecationManager:
"""废弃管理器"""
def __init__(self):
self.deprecations: Dict[str, DeprecationInfo] = {}
def deprecate_feature(
self,
feature_name: str,
deprecated_in: Version,
removed_in: Version,
replacement: str = None,
message: str = None
):
"""标记功能为废弃"""
self.deprecations[feature_name] = DeprecationInfo(
status=DeprecationStatus.DEPRECATED,
deprecated_in=deprecated_in,
removed_in=removed_in,
replacement=replacement,
message=message
)
def remove_feature(self, feature_name: str, removed_in: Version):
"""移除功能"""
if feature_name in self.deprecations:
self.deprecations[feature_name].status = DeprecationStatus.REMOVED
self.deprecations[feature_name].removed_in = removed_in
def check_feature(
self,
feature_name: str,
current_version: Version
) -> tuple:
"""检查功能状态"""
if feature_name not in self.deprecations:
return True, None
info = self.deprecations[feature_name]
if info.status == DeprecationStatus.REMOVED:
return False, f"功能 {feature_name} 已在版本 {info.removed_in} 中移除"
if info.status == DeprecationStatus.DEPRECATED:
if current_version >= info.removed_in:
return False, f"功能 {feature_name} 已在版本 {info.removed_in} 中移除"
warning = f"功能 {feature_name} 已废弃,将在版本 {info.removed_in} 中移除"
if info.replacement:
warning += f",请使用 {info.replacement}"
return True, warning
return True, None
def get_deprecation_warnings(
self,
current_version: Version
) -> List[str]:
"""获取所有废弃警告"""
warnings = []
for feature_name, info in self.deprecations.items():
if info.status == DeprecationStatus.DEPRECATED:
if current_version < info.removed_in:
warning = f"功能 {feature_name} 已废弃"
if info.replacement:
warning += f",请使用 {info.replacement}"
warnings.append(warning)
return warnings
```
**4. 版本迁移指南**
```python
from typing import Dict, List, Callable
class MigrationGuide:
"""版本迁移指南"""
def __init__(self):
self.migrations: Dict[Version, List[Migration]] = {}
def add_migration(
self,
from_version: Version,
to_version: Version,
migration_func: Callable,
description: str
):
"""添加迁移步骤"""
if from_version not in self.migrations:
self.migrations[from_version] = []
migration = Migration(
from_version=from_version,
to_version=to_version,
func=migration_func,
description=description
)
self.migrations[from_version].append(migration)
def get_migration_path(
self,
from_version: Version,
to_version: Version
) -> List[Migration]:
"""获取迁移路径"""
if from_version == to_version:
return []
# 简单实现:直接迁移
if from_version in self.migrations:
for migration in self.migrations[from_version]:
if migration.to_version == to_version:
return [migration]
# 复杂实现:查找多步迁移路径
return self._find_migration_path(from_version, to_version)
def _find_migration_path(
self,
from_version: Version,
to_version: Version,
visited: set = None
) -> List[Migration]:
"""递归查找迁移路径"""
if visited is None:
visited = set()
if from_version in visited:
return []
visited.add(from_version)
if from_version == to_version:
return []
if from_version not in self.migrations:
return []
for migration in self.migrations[from_version]:
path = self._find_migration_path(
migration.to_version,
to_version,
visited.copy()
)
if path is not None:
return [migration] + path
return []
async def execute_migration(
self,
from_version: Version,
to_version: Version,
context: Dict
) -> bool:
"""执行迁移"""
migration_path = self.get_migration_path(from_version, to_version)
if not migration_path:
print(f"无法找到从 {from_version} 到 {to_version} 的迁移路径")
return False
print(f"开始迁移: {from_version} -> {to_version}")
for migration in migration_path:
print(f"执行迁移: {migration.description}")
try:
await migration.func(context)
print(f"迁移完成: {migration.description}")
except Exception as e:
print(f"迁移失败: {migration.description}, 错误: {e}")
return False
print(f"迁移完成: {from_version} -> {to_version}")
return True
@dataclass
class Migration:
"""迁移步骤"""
from_version: Version
to_version: Version
func: Callable
description: str
```
**5. 版本兼容性检查**
```python
from typing import Dict, List, Tuple
class CompatibilityChecker:
"""兼容性检查器"""
def __init__(self, current_version: Version):
self.current_version = current_version
self.compatibility_matrix: Dict[Version, List[Version]] = {}
def add_compatibility(
self,
server_version: Version,
compatible_client_versions: List[Version]
):
"""添加兼容性规则"""
self.compatibility_matrix[server_version] = compatible_client_versions
def check_compatibility(
self,
client_version: Version
) -> Tuple[bool, Optional[str]]:
"""检查客户端版本是否兼容"""
# 检查服务器是否支持此版本
if self.current_version in self.compatibility_matrix:
compatible_versions = self.compatibility_matrix[self.current_version]
for compatible_version in compatible_versions:
if client_version.is_compatible(compatible_version):
return True, None
# 使用默认兼容性规则
if self.current_version.is_compatible(client_version):
return True, None
return False, f"客户端版本 {client_version} 与服务器版本 {self.current_version} 不兼容"
def get_compatible_versions(self) -> List[Version]:
"""获取所有兼容的客户端版本"""
compatible_versions = []
if self.current_version in self.compatibility_matrix:
compatible_versions = self.compatibility_matrix[self.current_version]
return compatible_versions
```
**6. 版本信息 API**
```python
from fastapi import FastAPI
from typing import Dict
class VersionInfoAPI:
"""版本信息 API"""
def __init__(
self,
current_version: Version,
supported_versions: List[Version],
deprecation_manager: DeprecationManager,
migration_guide: MigrationGuide
):
self.current_version = current_version
self.supported_versions = supported_versions
self.deprecation_manager = deprecation_manager
self.migration_guide = migration_guide
def setup_routes(self, app: FastAPI):
"""设置路由"""
@app.get("/version")
async def get_version() -> Dict:
"""获取当前版本信息"""
return {
"version": str(self.current_version),
"supported_versions": [str(v) for v in self.supported_versions],
"deprecation_warnings": self.deprecation_manager.get_deprecation_warnings(
self.current_version
)
}
@app.get("/version/compatibility/{client_version}")
async def check_compatibility(client_version: str) -> Dict:
"""检查客户端版本兼容性"""
checker = CompatibilityChecker(self.current_version)
try:
version = Version.parse(client_version)
except ValueError:
return {
"compatible": False,
"message": "无效的版本格式"
}
compatible, message = checker.check_compatibility(version)
return {
"compatible": compatible,
"message": message,
"server_version": str(self.current_version)
}
@app.get("/version/migration/{from_version}/{to_version}")
async def get_migration_path(
from_version: str,
to_version: str
) -> Dict:
"""获取迁移路径"""
try:
from_ver = Version.parse(from_version)
to_ver = Version.parse(to_version)
except ValueError:
return {
"success": False,
"message": "无效的版本格式"
}
path = self.migration_guide.get_migration_path(from_ver, to_ver)
return {
"success": True,
"migration_path": [
{
"from": str(m.from_version),
"to": str(m.to_version),
"description": m.description
}
for m in path
]
}
```
**最佳实践:**
1. **语义化版本**:严格遵循语义化版本控制规范
2. **向后兼容**:确保新版本向后兼容旧版本
3. **渐进废弃**:提前通知功能废弃,给用户迁移时间
4. **文档完善**:提供详细的版本迁移指南
5. **版本协商**:实现客户端和服务器的版本协商机制
6. **测试覆盖**:为每个版本编写兼容性测试
通过完善的版本管理和兼容性处理,可以确保 MCP 系统的稳定性和可维护性。
服务端 · 2月19日 21:43
MCP 的未来发展趋势是什么?有哪些挑战和机遇?MCP 作为新兴的 AI 集成协议,具有广阔的发展前景和潜力。以下是 MCP 的未来发展趋势:
**1. 标准化推进**
- **行业认可**:获得更多 AI 模型提供商和企业的认可
- **协议完善**:持续完善协议规范,解决现有局限性
- **标准化组织**:可能提交给标准化组织(如 W3C、IETF)进行标准化
- **互操作性增强**:与现有协议(如 OpenAPI、GraphQL)的互操作性
**2. 生态系统扩展**
- **更多语言支持**:扩展到 Rust、Java、C#、PHP 等更多编程语言
- **服务器生态**:社区贡献更多针对特定领域的 MCP 服务器
- **客户端集成**:更多 AI 应用和平台原生支持 MCP
- **工具库丰富**:提供更多预构建的工具和资源
**3. 性能优化**
- **协议优化**:引入二进制协议、压缩、批量操作等优化
- **异步增强**:更强大的异步和流式处理能力
- **缓存机制**:智能缓存策略减少重复计算
- **边缘计算**:支持边缘节点部署,降低延迟
**4. 安全性增强**
- **高级认证**:支持 OAuth 2.0、SAML 等企业级认证
- **细粒度权限**:更精细的访问控制和权限管理
- **安全审计**:完整的安全审计和合规支持
- **加密增强**:端到端加密和密钥管理
**5. 功能扩展**
- **实时通信**:支持 WebSocket 等实时双向通信
- **流式处理**:更好的流式数据处理能力
- **事件驱动**:支持事件订阅和推送机制
- **多模态支持**:增强对图像、音频、视频等多模态数据的支持
**6. 企业级特性**
- **多租户支持**:完善的多租户隔离和管理
- **高可用性**:内置高可用和灾难恢复机制
- **可观测性**:完整的监控、日志和追踪能力
- **治理工具**:提供企业级治理和管理工具
**7. AI 模型集成**
- **更多模型支持**:支持更多开源和商业 AI 模型
- **模型适配器**:提供模型适配器简化集成
- **性能优化**:针对不同模型的性能优化
- **成本控制**:智能的成本控制和优化
**8. 开发者体验**
- **更好的工具**:更强大的开发、测试和调试工具
- **文档完善**:更全面和易懂的文档
- **示例丰富**:更多实际应用场景的示例
- **社区支持**:活跃的社区支持和交流
**9. 应用场景拓展**
- **企业应用**:更多企业级应用场景
- **物联网**:IoT 设备和系统的集成
- **边缘 AI**:边缘计算和 AI 的结合
- **自动化**:更广泛的自动化应用
**10. 挑战和机遇**
**挑战:**
- 与现有协议的竞争和兼容性
- 社区建设和生态发展
- 性能和可扩展性的平衡
- 安全性和易用性的权衡
**机遇:**
- 成为 AI 集成的行业标准
- 推动AI应用的大规模部署
- 促进AI技术的民主化
- 创造新的商业模式和机会
**预测:**
未来 2-3 年内,MCP 有望成为 AI 模型与外部系统集成的主流标准之一,被广泛采用于企业级应用、开发工具和各种 AI 产品中。其开放性和标准化特性将推动整个 AI 生态系统的发展。
开发者现在学习和采用 MCP,将能够在未来的 AI 应用开发中占据有利位置。
服务端 · 2月19日 21:41
MCP 如何与其他 AI 框架(如 LangChain、LlamaIndex)集成?MCP 可以与各种 AI 框架和工具集成,扩展其功能和应用场景。以下是详细的集成方法和最佳实践:
**集成架构设计**
MCP 集成应考虑以下方面:
1. **框架兼容性**:确保与目标框架的兼容性
2. **性能影响**:最小化对系统性能的影响
3. **功能完整性**:保持 MCP 和框架功能的完整性
4. **错误处理**:正确处理集成过程中的错误
5. **配置管理**:统一管理集成配置
**1. 与 LangChain 集成**
```python
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from mcp.server import Server
class MCPLangChainIntegration:
def __init__(self, mcp_server: Server):
self.mcp_server = mcp_server
self.langchain_tools = []
async def convert_mcp_to_langchain_tools(self) -> list:
"""将 MCP 工具转换为 LangChain 工具"""
mcp_tools = await self.mcp_server.list_tools()
langchain_tools = []
for tool_info in mcp_tools:
tool = Tool(
name=tool_info["name"],
description=tool_info["description"],
func=self._create_tool_wrapper(tool_info["name"])
)
langchain_tools.append(tool)
return langchain_tools
def _create_tool_wrapper(self, tool_name: str):
"""创建工具包装器"""
async def wrapper(**kwargs):
result = await self.mcp_server.call_tool(
tool_name,
kwargs
)
return result
return wrapper
async def create_langchain_agent(self):
"""创建 LangChain Agent"""
# 转换 MCP 工具
tools = await self.convert_mcp_to_langchain_tools()
# 创建 LLM
llm = ChatOpenAI(
model="gpt-4",
temperature=0
)
# 创建 Agent
agent = create_openai_tools_agent(llm, tools)
# 创建 AgentExecutor
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True
)
return agent_executor
async def run_agent(self, query: str):
"""运行 Agent"""
agent_executor = await self.create_langchain_agent()
result = await agent_executor.ainvoke({
"input": query
})
return result["output"]
```
**2. 与 LlamaIndex 集成**
```python
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.agent import ReActAgent
from mcp.server import Server
class MCPLlamaIndexIntegration:
def __init__(self, mcp_server: Server):
self.mcp_server = mcp_server
async def create_mcp_query_engine(self, tool_name: str):
"""创建 MCP 查询引擎"""
async def query_engine_fn(query: str) -> str:
result = await self.mcp_server.call_tool(
tool_name,
{"query": query}
)
return result
return query_engine_fn
async def create_llamaindex_tools(self) -> list:
"""创建 LlamaIndex 工具"""
mcp_tools = await self.mcp_server.list_tools()
tools = []
for tool_info in mcp_tools:
query_engine = await self.create_mcp_query_engine(
tool_info["name"]
)
tool = QueryEngineTool(
query_engine=query_engine,
metadata=ToolMetadata(
name=tool_info["name"],
description=tool_info["description"]
)
)
tools.append(tool)
return tools
async def create_react_agent(self):
"""创建 ReAct Agent"""
tools = await self.create_llamaindex_tools()
agent = ReActAgent.from_tools(
tools=tools,
verbose=True
)
return agent
async def query_with_agent(self, query: str):
"""使用 Agent 查询"""
agent = await self.create_react_agent()
response = agent.query(query)
return response.response
```
**3. 与 AutoGPT 集成**
```python
from autogpt.agent.agent import Agent
from autogpt.config import Config
from autogpt.models.command import Command
from mcp.server import Server
class MCPAutoGPTIntegration:
def __init__(self, mcp_server: Server):
self.mcp_server = mcp_server
self.command_registry = {}
async def register_mcp_commands(self):
"""注册 MCP 命令"""
mcp_tools = await self.mcp_server.list_tools()
for tool_info in mcp_tools:
command = Command(
name=tool_info["name"],
description=tool_info["description"],
function=self._create_command_function(tool_info["name"])
)
self.command_registry[tool_info["name"]] = command
def _create_command_function(self, tool_name: str):
"""创建命令函数"""
async def command_function(**kwargs):
result = await self.mcp_server.call_tool(
tool_name,
kwargs
)
return result
return command_function
async def create_autogpt_agent(self, config: Config):
"""创建 AutoGPT Agent"""
# 注册 MCP 命令
await self.register_mcp_commands()
# 创建 Agent
agent = Agent(
ai_name="MCP-Agent",
ai_role="Assistant",
commands=self.command_registry,
config=config
)
return agent
async def run_autogpt_task(self, task: str):
"""运行 AutoGPT 任务"""
config = Config()
agent = await self.create_autogpt_agent(config)
result = await agent.run(task)
return result
```
**4. 与 FastAPI 集成**
```python
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from mcp.server import Server
from pydantic import BaseModel
class MCPFastAPIIntegration:
def __init__(self, mcp_server: Server):
self.mcp_server = mcp_server
self.app = FastAPI(title="MCP API")
self._setup_middleware()
self._setup_routes()
def _setup_middleware(self):
"""设置中间件"""
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def _setup_routes(self):
"""设置路由"""
@self.app.get("/tools")
async def list_tools():
"""列出所有工具"""
tools = await self.mcp_server.list_tools()
return {"tools": tools}
@self.app.post("/tools/{tool_name}/call")
async def call_tool(tool_name: str, params: dict):
"""调用工具"""
try:
result = await self.mcp_server.call_tool(
tool_name,
params
)
return {"result": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/resources")
async def list_resources():
"""列出所有资源"""
resources = await self.mcp_server.list_resources()
return {"resources": resources}
@self.app.get("/resources/{uri}")
async def read_resource(uri: str):
"""读取资源"""
try:
content = await self.mcp_server.read_resource(uri)
return {"content": content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@self.app.get("/prompts")
async def list_prompts():
"""列出所有提示词"""
prompts = await self.mcp_server.list_prompts()
return {"prompts": prompts}
@self.app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy"}
def get_app(self) -> FastAPI:
"""获取 FastAPI 应用"""
return self.app
```
**5. 与 WebSocket 集成**
```python
import asyncio
import json
from fastapi import WebSocket
from mcp.server import Server
class MCPWebSocketIntegration:
def __init__(self, mcp_server: Server):
self.mcp_server = mcp_server
self.active_connections = []
async def handle_websocket(self, websocket: WebSocket):
"""处理 WebSocket 连接"""
await websocket.accept()
self.active_connections.append(websocket)
try:
while True:
# 接收消息
data = await websocket.receive_text()
message = json.loads(data)
# 处理消息
response = await self._handle_message(message)
# 发送响应
await websocket.send_text(json.dumps(response))
except Exception as e:
print(f"WebSocket 错误: {e}")
finally:
self.active_connections.remove(websocket)
async def _handle_message(self, message: dict) -> dict:
"""处理消息"""
message_type = message.get("type")
if message_type == "list_tools":
tools = await self.mcp_server.list_tools()
return {"type": "tools_list", "data": tools}
elif message_type == "call_tool":
result = await self.mcp_server.call_tool(
message["tool_name"],
message.get("params", {})
)
return {"type": "tool_result", "data": result}
elif message_type == "list_resources":
resources = await self.mcp_server.list_resources()
return {"type": "resources_list", "data": resources}
else:
return {"type": "error", "message": "未知消息类型"}
async def broadcast_message(self, message: dict):
"""广播消息到所有连接"""
message_text = json.dumps(message)
for connection in self.active_connections:
try:
await connection.send_text(message_text)
except Exception as e:
print(f"发送消息失败: {e}")
```
**6. 与 GraphQL 集成**
```python
import strawberry
from strawberry.types import Info
from mcp.server import Server
@strawberry.type
class MCPTool:
name: str
description: str
@strawberry.type
class MCPResource:
uri: str
name: str
description: str
@strawberry.type
class Query:
@strawberry.field
async def tools(self, info: Info) -> list[MCPTool]:
"""获取所有工具"""
mcp_server = info.context["mcp_server"]
tools = await mcp_server.list_tools()
return [
MCPTool(
name=tool["name"],
description=tool["description"]
)
for tool in tools
]
@strawberry.field
async def resources(self, info: Info) -> list[MCPResource]:
"""获取所有资源"""
mcp_server = info.context["mcp_server"]
resources = await mcp_server.list_resources()
return [
MCPResource(
uri=resource["uri"],
name=resource["name"],
description=resource["description"]
)
for resource in resources
]
@strawberry.type
class Mutation:
@strawberry.mutation
async def call_tool(
self,
tool_name: str,
params: dict,
info: Info
) -> str:
"""调用工具"""
mcp_server = info.context["mcp_server"]
result = await mcp_server.call_tool(tool_name, params)
return str(result)
class MCPGraphQLIntegration:
def __init__(self, mcp_server: Server):
self.mcp_server = mcp_server
self.schema = strawberry.Schema(
query=Query,
mutation=Mutation
)
async def execute_query(self, query: str, variables: dict = None):
"""执行 GraphQL 查询"""
context = {"mcp_server": self.mcp_server}
result = await self.schema.execute(
query,
variable_values=variables,
context_value=context
)
if result.errors:
return {
"errors": [str(error) for error in result.errors]
}
return {"data": result.data}
```
**7. 与 gRPC 集成**
```python
import grpc
from concurrent import futures
import mcp_pb2
import mcp_pb2_grpc
from mcp.server import Server
class MCPServicer(mcp_pb2_grpc.MCPServicer):
def __init__(self, mcp_server: Server):
self.mcp_server = mcp_server
async def ListTools(
self,
request: mcp_pb2.ListToolsRequest,
context: grpc.ServicerContext
) -> mcp_pb2.ListToolsResponse:
"""列出工具"""
tools = await self.mcp_server.list_tools()
tool_protos = [
mcp_pb2.Tool(
name=tool["name"],
description=tool["description"]
)
for tool in tools
]
return mcp_pb2.ListToolsResponse(tools=tool_protos)
async def CallTool(
self,
request: mcp_pb2.CallToolRequest,
context: grpc.ServicerContext
) -> mcp_pb2.CallToolResponse:
"""调用工具"""
params = dict(request.params)
result = await self.mcp_server.call_tool(
request.tool_name,
params
)
return mcp_pb2.CallToolResponse(result=str(result))
class MCPGRPCIntegration:
def __init__(self, mcp_server: Server, port: int = 50051):
self.mcp_server = mcp_server
self.port = port
self.server = None
async def start_server(self):
"""启动 gRPC 服务器"""
self.server = grpc.aio.server(
futures.ThreadPoolExecutor(max_workers=10)
)
mcp_pb2_grpc.add_MCPServicer_to_server(
MCPServicer(self.mcp_server),
self.server
)
self.server.add_insecure_port(f'[::]:{self.port}')
await self.server.start()
print(f"gRPC 服务器启动在端口 {self.port}")
async def stop_server(self):
"""停止 gRPC 服务器"""
if self.server:
await self.server.stop(0)
print("gRPC 服务器已停止")
```
**最佳实践:**
1. **异步处理**:使用异步编程避免阻塞
2. **错误处理**:正确处理集成过程中的错误
3. **性能优化**:缓存频繁调用的结果
4. **日志记录**:记录所有集成操作
5. **测试覆盖**:编写集成测试确保功能正常
6. **文档完善**:提供清晰的集成文档
通过与其他 AI 框架和工具的集成,可以扩展 MCP 的功能和应用场景。
服务端 · 2月19日 21:40
MCP 的安全性设计有哪些关键机制?MCP 的安全性设计包含多个层面,确保 AI 模型与外部系统的交互是安全可控的:
**1. 认证和授权机制**
- **身份认证**:支持多种认证方式(API Key、OAuth、JWT 等)
- **访问控制**:基于角色的权限管理(RBAC)
- **令牌管理**:安全的令牌生成、验证和刷新机制
- **多租户支持**:隔离不同用户或租户的数据和资源
**2. 通信安全**
- **加密传输**:强制使用 TLS/SSL 加密所有通信
- **证书验证**:严格的证书验证和吊销检查
- **安全协议**:基于 JSON-RPC 2.0 的安全扩展
- **防止中间人攻击**:完整的证书链验证
**3. 输入验证和清理**
- **参数验证**:严格验证所有输入参数的类型和格式
- **SQL 注入防护**:使用参数化查询,防止 SQL 注入
- **XSS 防护**:清理和转义用户输入,防止跨站脚本攻击
- **命令注入防护**:限制和验证系统命令执行
**4. 资源访问控制**
- **文件系统隔离**:限制可访问的文件路径和权限
- **网络访问限制**:白名单机制控制外部网络访问
- **资源配额**:限制 CPU、内存、磁盘等资源使用
- **操作审计**:记录所有资源访问和修改操作
**5. 执行环境安全**
- **沙箱隔离**:在隔离的沙箱环境中执行代码
- **权限最小化**:只授予必要的最小权限
- **超时控制**:设置执行超时,防止无限循环
- **资源限制**:限制内存、CPU 等资源使用
**6. 错误处理和日志**
- **安全错误消息**:不泄露敏感信息的错误提示
- **详细日志记录**:记录所有操作和安全事件
- **审计追踪**:完整的操作审计链
- **异常监控**:实时监控异常行为
**7. 数据保护**
- **数据加密**:敏感数据加密存储和传输
- **数据脱敏**:日志和错误消息中的敏感数据脱敏
- **数据隔离**:不同用户数据的严格隔离
- **数据备份**:安全的数据备份和恢复机制
**8. 速率限制和防护**
- **请求限流**:防止 API 滥用和 DDoS 攻击
- **并发控制**:限制并发请求数量
- **黑名单机制**:阻止恶意 IP 或用户
- **异常检测**:检测和阻止异常行为模式
**安全最佳实践:**
1. 定期进行安全审计和渗透测试
2. 及时更新依赖库和框架
3. 实施最小权限原则
4. 建立安全事件响应流程
5. 提供安全配置指南和文档
通过这些多层安全机制,MCP 能够在提供强大功能的同时,确保系统的安全性和可靠性。
服务端 · 2月19日 21:40
如何对 MCP 进行测试?有哪些测试策略和最佳实践?MCP 的测试策略对于确保系统质量和可靠性至关重要。以下是详细的测试方法和最佳实践:
**测试层次结构**
MCP 测试应涵盖以下层次:
1. **单元测试**:测试单个函数和组件
2. **集成测试**:测试组件之间的交互
3. **端到端测试**:测试完整的用户场景
4. **性能测试**:测试系统性能和可扩展性
5. **安全测试**:测试安全漏洞和防护机制
**1. 单元测试**
```python
import pytest
from unittest.mock import Mock, AsyncMock
from mcp.server import Server
class TestMCPTools:
@pytest.fixture
def server(self):
"""创建测试服务器实例"""
return Server("test-server")
@pytest.mark.asyncio
async def test_tool_registration(self, server):
"""测试工具注册"""
@server.tool(
name="test_tool",
description="测试工具"
)
async def test_tool(param: str) -> str:
return f"Result: {param}"
# 验证工具已注册
tools = await server.list_tools()
assert any(t["name"] == "test_tool" for t in tools)
@pytest.mark.asyncio
async def test_tool_execution(self, server):
"""测试工具执行"""
@server.tool(
name="calculate",
description="计算工具"
)
async def calculate(a: int, b: int) -> int:
return a + b
# 执行工具
result = await server.call_tool("calculate", {"a": 2, "b": 3})
assert result == 5
@pytest.mark.asyncio
async def test_parameter_validation(self, server):
"""测试参数验证"""
@server.tool(
name="validate",
description="参数验证工具",
inputSchema={
"type": "object",
"properties": {
"email": {
"type": "string",
"format": "email"
}
},
"required": ["email"]
}
)
async def validate(email: str) -> str:
return f"Valid: {email}"
# 测试有效参数
result = await server.call_tool(
"validate",
{"email": "test@example.com"}
)
assert "Valid" in result
# 测试无效参数
with pytest.raises(ValueError):
await server.call_tool("validate", {"email": "invalid"})
```
**2. 集成测试**
```python
class TestMCPIntegration:
@pytest.mark.asyncio
async def test_client_server_communication(self):
"""测试客户端-服务器通信"""
from mcp.client import Client
from mcp.server import Server
# 创建服务器
server = Server("integration-test-server")
@server.tool(name="echo", description="回显工具")
async def echo(message: str) -> str:
return message
# 启动服务器
await server.start()
try:
# 创建客户端
client = Client("http://localhost:8000")
# 测试通信
result = await client.call_tool("echo", {"message": "Hello"})
assert result == "Hello"
finally:
await server.stop()
@pytest.mark.asyncio
async def test_resource_access(self):
"""测试资源访问"""
server = Server("resource-test-server")
@server.resource(
uri="file:///test.txt",
name="测试文件",
description="测试资源"
)
async def test_resource() -> str:
return "Test content"
await server.start()
try:
client = Client("http://localhost:8000")
# 读取资源
content = await client.read_resource("file:///test.txt")
assert content == "Test content"
finally:
await server.stop()
```
**3. 端到端测试**
```python
class TestMCPEndToEnd:
@pytest.mark.asyncio
async def test_complete_workflow(self):
"""测试完整工作流"""
# 模拟用户场景:查询数据库并生成报告
server = Server("e2e-test-server")
@server.tool(name="query_db", description="查询数据库")
async def query_db(query: str) -> list:
return [{"id": 1, "name": "Test"}]
@server.tool(name="generate_report", description="生成报告")
async def generate_report(data: list) -> str:
return f"Report: {len(data)} items"
await server.start()
try:
client = Client("http://localhost:8000")
# 执行工作流
data = await client.call_tool("query_db", {"query": "SELECT *"})
report = await client.call_tool("generate_report", {"data": data})
assert "1 items" in report
finally:
await server.stop()
```
**4. 性能测试**
```python
import asyncio
import time
from locust import HttpUser, task, between
class MCPPerformanceTest(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""测试开始时的初始化"""
self.client = Client(self.host)
@task
def tool_call_performance(self):
"""测试工具调用性能"""
start_time = time.time()
result = self.client.call_tool("test_tool", {"param": "value"})
elapsed = time.time() - start_time
# 断言响应时间
assert elapsed < 1.0, f"响应时间过长: {elapsed}s"
@task
def concurrent_requests(self):
"""测试并发请求"""
async def make_request():
return self.client.call_tool("test_tool", {"param": "value"})
# 并发执行 10 个请求
tasks = [make_request() for _ in range(10)]
results = asyncio.run(asyncio.gather(*tasks))
# 验证所有请求都成功
assert all(results)
```
**5. 安全测试**
```python
class TestMCPSecurity:
@pytest.mark.asyncio
async def test_authentication(self):
"""测试认证机制"""
server = Server("security-test-server")
# 配置认证
server.set_authenticator(lambda token: token == "valid-token")
@server.tool(name="secure_tool", description="安全工具")
async def secure_tool() -> str:
return "Secure data"
await server.start()
try:
# 测试有效令牌
client = Client("http://localhost:8000", token="valid-token")
result = await client.call_tool("secure_tool", {})
assert result == "Secure data"
# 测试无效令牌
client_invalid = Client("http://localhost:8000", token="invalid-token")
with pytest.raises(AuthenticationError):
await client_invalid.call_tool("secure_tool", {})
finally:
await server.stop()
@pytest.mark.asyncio
async def test_sql_injection_prevention(self):
"""测试 SQL 注入防护"""
server = Server("sql-injection-test-server")
@server.tool(name="query", description="查询工具")
async def query(sql: str) -> list:
# 应该使用参数化查询
return execute_safe_query(sql)
# 测试 SQL 注入尝试
malicious_sql = "SELECT * FROM users WHERE '1'='1'"
result = await server.call_tool("query", {"sql": malicious_sql})
# 验证注入被阻止
assert result == []
@pytest.mark.asyncio
async def test_rate_limiting(self):
"""测试速率限制"""
server = Server("rate-limit-test-server")
# 配置速率限制
server.set_rate_limit(max_requests=10, window=60)
@server.tool(name="limited_tool", description="受限工具")
async def limited_tool() -> str:
return "Success"
# 快速发送多个请求
for i in range(15):
try:
await server.call_tool("limited_tool", {})
except RateLimitError:
# 预期的速率限制错误
assert i >= 10
break
else:
pytest.fail("未触发速率限制")
```
**6. Mock 和 Stub**
```python
from unittest.mock import Mock, patch
class TestMCPWithMocks:
@pytest.mark.asyncio
async def test_with_external_dependency_mock(self):
"""使用 Mock 测试外部依赖"""
server = Server("mock-test-server")
@server.tool(name="fetch_data", description="获取数据")
async def fetch_data(url: str) -> dict:
# Mock 外部 API 调用
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = {
"data": "mocked"
}
response = requests.get(url)
return response.json()
result = await server.call_tool(
"fetch_data",
{"url": "http://api.example.com"}
)
assert result == {"data": "mocked"}
mock_get.assert_called_once()
```
**7. 测试覆盖率**
```python
# 使用 pytest-cov 生成覆盖率报告
# 运行命令: pytest --cov=mcp --cov-report=html
class TestCoverage:
@pytest.mark.asyncio
async def test_all_code_paths(self):
"""测试所有代码路径"""
server = Server("coverage-test-server")
@server.tool(name="complex_tool", description="复杂工具")
async def complex_tool(condition: bool) -> str:
if condition:
return "Branch A"
else:
return "Branch B"
# 测试所有分支
result_a = await server.call_tool("complex_tool", {"condition": True})
assert result_a == "Branch A"
result_b = await server.call_tool("complex_tool", {"condition": False})
assert result_b == "Branch B"
```
**最佳实践:**
1. **测试金字塔**:大量单元测试,适量集成测试,少量端到端测试
2. **独立性**:每个测试应该独立运行,不依赖其他测试
3. **可重复性**:测试结果应该可重复,不受环境因素影响
4. **快速反馈**:单元测试应该快速执行,提供快速反馈
5. **持续集成**:将测试集成到 CI/CD 流程中
6. **覆盖率目标**:设定合理的代码覆盖率目标(如 80%)
通过完善的测试策略,可以确保 MCP 系统的质量和可靠性。
服务端 · 2月19日 21:40
MCP 的插件系统是如何工作的?MCP 的插件系统允许开发者扩展 MCP 服务器的功能,无需修改核心代码。以下是详细的插件架构和实现方法:
**插件架构设计**
MCP 插件系统应考虑以下方面:
1. **插件发现**:自动发现和加载插件
2. **插件生命周期**:管理插件的加载、初始化、卸载
3. **插件隔离**:确保插件之间相互隔离
4. **插件通信**:提供插件间的通信机制
5. **插件安全**:限制插件的权限和资源访问
**1. 插件接口定义**
```python
from abc import ABC, abstractmethod
from typing import Dict, Any, List
class MCPPlugin(ABC):
"""MCP 插件基类"""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.name = self.__class__.__name__
self.version = getattr(self, 'VERSION', '1.0.0')
@abstractmethod
async def initialize(self, server):
"""初始化插件"""
pass
@abstractmethod
async def shutdown(self):
"""关闭插件"""
pass
@abstractmethod
def get_tools(self) -> List[Dict[str, Any]]:
"""获取插件提供的工具"""
return []
@abstractmethod
def get_resources(self) -> List[Dict[str, Any]]:
"""获取插件提供的资源"""
return []
@abstractmethod
def get_prompts(self) -> List[Dict[str, Any]]:
"""获取插件提供的提示词"""
return []
def get_metadata(self) -> Dict[str, Any]:
"""获取插件元数据"""
return {
"name": self.name,
"version": self.version,
"description": getattr(self, 'DESCRIPTION', ''),
"author": getattr(self, 'AUTHOR', ''),
"dependencies": getattr(self, 'DEPENDENCIES', [])
}
```
**2. 插件管理器**
```python
import importlib
import inspect
import os
from pathlib import Path
from typing import Dict, List, Type
class PluginManager:
def __init__(self, server):
self.server = server
self.plugins: Dict[str, MCPPlugin] = {}
self.plugin_directories: List[str] = []
def add_plugin_directory(self, directory: str):
"""添加插件目录"""
if directory not in self.plugin_directories:
self.plugin_directories.append(directory)
async def discover_plugins(self):
"""发现插件"""
discovered_plugins = []
for directory in self.plugin_directories:
plugin_path = Path(directory)
if not plugin_path.exists():
continue
# 遍历插件目录
for item in plugin_path.iterdir():
if item.is_file() and item.suffix == '.py':
# 单文件插件
discovered_plugins.append(str(item))
elif item.is_dir() and (item / '__init__.py').exists():
# 包插件
discovered_plugins.append(str(item))
return discovered_plugins
async def load_plugin(self, plugin_path: str) -> bool:
"""加载插件"""
try:
# 动态导入插件模块
spec = importlib.util.spec_from_file_location(
"plugin_module",
plugin_path
)
if spec is None or spec.loader is None:
return False
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 查找插件类
plugin_class = None
for name, obj in inspect.getmembers(module):
if (inspect.isclass(obj) and
issubclass(obj, MCPPlugin) and
obj is not MCPPlugin):
plugin_class = obj
break
if plugin_class is None:
return False
# 创建插件实例
plugin = plugin_class()
# 初始化插件
await plugin.initialize(self.server)
# 注册插件
self.plugins[plugin.name] = plugin
# 注册插件提供的工具、资源、提示词
self._register_plugin_tools(plugin)
self._register_plugin_resources(plugin)
self._register_plugin_prompts(plugin)
return True
except Exception as e:
print(f"加载插件失败 {plugin_path}: {e}")
return False
def _register_plugin_tools(self, plugin: MCPPlugin):
"""注册插件工具"""
tools = plugin.get_tools()
for tool_info in tools:
@self.server.tool(
name=tool_info["name"],
description=tool_info["description"]
)
async def tool_wrapper(**kwargs):
return await tool_info["function"](**kwargs)
def _register_plugin_resources(self, plugin: MCPPlugin):
"""注册插件资源"""
resources = plugin.get_resources()
for resource_info in resources:
@self.server.resource(
uri=resource_info["uri"],
name=resource_info["name"],
description=resource_info["description"]
)
async def resource_wrapper():
return await resource_info["function"]()
def _register_plugin_prompts(self, plugin: MCPPlugin):
"""注册插件提示词"""
prompts = plugin.get_prompts()
for prompt_info in prompts:
@self.server.prompt(
name=prompt_info["name"],
description=prompt_info["description"]
)
async def prompt_wrapper(**kwargs):
return await prompt_info["function"](**kwargs)
async def unload_plugin(self, plugin_name: str) -> bool:
"""卸载插件"""
if plugin_name not in self.plugins:
return False
plugin = self.plugins[plugin_name]
try:
# 关闭插件
await plugin.shutdown()
# 从插件列表中移除
del self.plugins[plugin_name]
return True
except Exception as e:
print(f"卸载插件失败 {plugin_name}: {e}")
return False
async def reload_plugin(self, plugin_name: str) -> bool:
"""重新加载插件"""
if plugin_name not in self.plugins:
return False
# 先卸载
await self.unload_plugin(plugin_name)
# 重新加载(需要记录插件路径)
# 这里简化处理,实际需要保存插件路径
return True
def get_plugin_info(self, plugin_name: str) -> Dict[str, Any]:
"""获取插件信息"""
if plugin_name not in self.plugins:
return {}
plugin = self.plugins[plugin_name]
return plugin.get_metadata()
def list_plugins(self) -> List[Dict[str, Any]]:
"""列出所有插件"""
return [
plugin.get_metadata()
for plugin in self.plugins.values()
]
```
**3. 示例插件实现**
```python
# plugins/database_plugin.py
class DatabasePlugin(MCPPlugin):
"""数据库插件"""
VERSION = "1.0.0"
DESCRIPTION = "提供数据库查询和管理功能"
AUTHOR = "Your Name"
DEPENDENCIES = ["sqlalchemy"]
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.db_connection = None
async def initialize(self, server):
"""初始化数据库连接"""
from sqlalchemy import create_engine
db_url = self.config.get("database_url", "sqlite:///mcp.db")
self.db_connection = create_engine(db_url)
print(f"数据库插件 {self.name} 已初始化")
async def shutdown(self):
"""关闭数据库连接"""
if self.db_connection:
self.db_connection.dispose()
print(f"数据库插件 {self.name} 已关闭")
def get_tools(self) -> List[Dict[str, Any]]:
"""获取数据库工具"""
return [
{
"name": "query_database",
"description": "执行数据库查询",
"function": self._query_database
},
{
"name": "execute_sql",
"description": "执行 SQL 语句",
"function": self._execute_sql
}
]
def get_resources(self) -> List[Dict[str, Any]]:
"""获取数据库资源"""
return [
{
"uri": "db://schema",
"name": "数据库模式",
"description": "数据库表结构",
"function": self._get_schema
}
]
def get_prompts(self) -> List[Dict[str, Any]]:
"""获取数据库提示词"""
return [
{
"name": "generate_query",
"description": "生成 SQL 查询提示词",
"function": self._generate_query_prompt
}
]
async def _query_database(self, query: str) -> str:
"""执行数据库查询"""
from sqlalchemy import text
with self.db_connection.connect() as conn:
result = conn.execute(text(query))
rows = result.fetchall()
return f"查询结果: {len(rows)} 行"
async def _execute_sql(self, sql: str) -> str:
"""执行 SQL 语句"""
from sqlalchemy import text
with self.db_connection.connect() as conn:
conn.execute(text(sql))
conn.commit()
return "SQL 执行成功"
async def _get_schema(self) -> str:
"""获取数据库模式"""
from sqlalchemy import inspect
inspector = inspect(self.db_connection)
tables = inspector.get_table_names()
return f"数据库表: {', '.join(tables)}"
async def _generate_query_prompt(self, table_name: str) -> str:
"""生成查询提示词"""
return f"""
请为表 {table_name} 生成 SQL 查询语句。
要求:
1. 只使用 SELECT 查询
2. 包含适当的 WHERE 条件
3. 添加 LIMIT 限制结果数量
"""
```
**4. 插件配置**
```python
import json
from pathlib import Path
class PluginConfig:
def __init__(self, config_dir: str = "config/plugins"):
self.config_dir = Path(config_dir)
self.config_dir.mkdir(parents=True, exist_ok=True)
def load_config(self, plugin_name: str) -> Dict[str, Any]:
"""加载插件配置"""
config_file = self.config_dir / f"{plugin_name}.json"
if not config_file.exists():
return {}
with open(config_file, 'r') as f:
return json.load(f)
def save_config(
self,
plugin_name: str,
config: Dict[str, Any]
):
"""保存插件配置"""
config_file = self.config_dir / f"{plugin_name}.json"
with open(config_file, 'w') as f:
json.dump(config, f, indent=2)
def get_all_configs(self) -> Dict[str, Dict[str, Any]]:
"""获取所有插件配置"""
configs = {}
for config_file in self.config_dir.glob("*.json"):
plugin_name = config_file.stem
with open(config_file, 'r') as f:
configs[plugin_name] = json.load(f)
return configs
```
**5. 插件依赖管理**
```python
from typing import Dict, List, Set
class PluginDependencyManager:
def __init__(self):
self.dependencies: Dict[str, Set[str]] = {}
self.dependents: Dict[str, Set[str]] = {}
def add_dependency(self, plugin_name: str, dependency: str):
"""添加依赖关系"""
if plugin_name not in self.dependencies:
self.dependencies[plugin_name] = set()
self.dependencies[plugin_name].add(dependency)
if dependency not in self.dependents:
self.dependents[dependency] = set()
self.dependents[dependency].add(plugin_name)
def get_load_order(self, plugins: List[str]) -> List[str]:
"""获取插件加载顺序(拓扑排序)"""
visited = set()
result = []
def visit(plugin_name: str):
if plugin_name in visited:
return
visited.add(plugin_name)
# 先加载依赖
for dep in self.dependencies.get(plugin_name, []):
visit(dep)
result.append(plugin_name)
for plugin_name in plugins:
visit(plugin_name)
return result
def check_circular_dependency(self) -> bool:
"""检查循环依赖"""
visited = set()
recursion_stack = set()
def has_cycle(plugin_name: str) -> bool:
visited.add(plugin_name)
recursion_stack.add(plugin_name)
for dep in self.dependencies.get(plugin_name, []):
if dep not in visited:
if has_cycle(dep):
return True
elif dep in recursion_stack:
return True
recursion_stack.remove(plugin_name)
return False
for plugin_name in self.dependencies:
if plugin_name not in visited:
if has_cycle(plugin_name):
return True
return False
```
**6. 插件沙箱**
```python
import sys
from typing import Any
class PluginSandbox:
def __init__(self):
self.restricted_modules = {
'os', 'sys', 'subprocess', 'socket',
'pickle', 'shelve', 'marshal'
}
def create_sandbox(self, plugin_name: str) -> dict:
"""创建插件沙箱环境"""
safe_globals = {
'__builtins__': self._create_safe_builtins(),
'__name__': f'plugin_{plugin_name}',
}
return safe_globals
def _create_safe_builtins(self) -> dict:
"""创建安全的内置函数"""
safe_builtins = {}
# 允许的安全内置函数
allowed_builtins = [
'abs', 'all', 'any', 'bool', 'dict', 'enumerate',
'filter', 'float', 'int', 'len', 'list', 'map',
'max', 'min', 'range', 'set', 'sorted', 'str',
'sum', 'tuple', 'type', 'zip'
]
for name in allowed_builtins:
if hasattr(__builtins__, name):
safe_builtins[name] = getattr(__builtins__, name)
return safe_builtins
def execute_in_sandbox(
self,
code: str,
sandbox_env: dict
) -> Any:
"""在沙箱中执行代码"""
try:
exec(code, sandbox_env)
return True
except Exception as e:
print(f"沙箱执行失败: {e}")
return False
```
**最佳实践:**
1. **插件隔离**:确保插件之间相互隔离,避免冲突
2. **依赖管理**:正确处理插件依赖关系
3. **错误处理**:插件错误不应影响核心系统
4. **版本兼容**:支持插件版本管理和兼容性检查
5. **安全限制**:限制插件的权限和资源访问
6. **文档完善**:为每个插件提供清晰的文档
通过完善的插件系统,可以灵活扩展 MCP 服务器的功能,满足各种定制化需求。
服务端 · 2月19日 21:39