在 MCP 中实现错误处理和重试机制是确保系统稳定性和可靠性的关键。以下是详细的实现策略:
错误处理策略
1. 错误分类
- 可重试错误:网络超时、临时服务不可用、速率限制等
- 不可重试错误:参数错误、权限不足、资源不存在等
- 业务错误:业务逻辑相关的错误,需要特殊处理
2. 错误响应格式
json{ "jsonrpc": "2.0", "id": "req-123", "error": { "code": -32000, "message": "Server error", "data": { "retryable": true, "retryAfter": 5, "details": "数据库连接超时" } } }
3. 错误处理实现
pythonfrom typing import Optional import asyncio class MCPErrorHandler: def __init__(self): self.retryable_codes = [ -32000, # Server error -32001, # Timeout -32002 # Rate limit ] def is_retryable(self, error: dict) -> bool: """判断错误是否可重试""" error_code = error.get("code") return error_code in self.retryable_codes def get_retry_delay(self, error: dict) -> int: """获取重试延迟时间""" error_data = error.get("data", {}) return error_data.get("retryAfter", 1)
重试机制
4. 指数退避重试
pythonimport time import random async def exponential_backoff_retry( func, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 32.0 ): """指数退避重试机制""" last_exception = None for attempt in range(max_retries): try: return await func() except Exception as e: last_exception = e if attempt == max_retries - 1: raise # 计算延迟时间(加入随机抖动) delay = min( base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay ) await asyncio.sleep(delay) raise last_exception
5. 智能重试策略
pythonclass RetryStrategy: def __init__( self, max_retries: int = 3, backoff_factor: float = 2.0, jitter: bool = True ): self.max_retries = max_retries self.backoff_factor = backoff_factor self.jitter = jitter async def execute_with_retry( self, func, is_retryable: Optional[callable] = None ): """使用智能重试策略执行函数""" for attempt in range(self.max_retries): try: return await func() except Exception as e: if attempt == self.max_retries - 1: raise if is_retryable and not is_retryable(e): raise delay = self._calculate_delay(attempt) await asyncio.sleep(delay) def _calculate_delay(self, attempt: int) -> float: """计算重试延迟""" delay = self.backoff_factor ** attempt if self.jitter: delay += random.uniform(0, delay * 0.1) return delay
断路器模式
6. 实现断路器
pythonfrom enum import Enum import time class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" class CircuitBreaker: def __init__( self, failure_threshold: int = 5, recovery_timeout: float = 60.0 ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.last_failure_time = None async def call(self, func): """通过断路器调用函数""" if self.state == CircuitState.OPEN: if self._should_attempt_reset(): self.state = CircuitState.HALF_OPEN else: raise Exception("Circuit breaker is OPEN") try: result = await func() self._on_success() return result except Exception as e: self._on_failure() raise def _should_attempt_reset(self) -> bool: """判断是否应该尝试重置断路器""" if self.last_failure_time is None: return False elapsed = time.time() - self.last_failure_time return elapsed >= self.recovery_timeout def _on_success(self): """成功时的处理""" self.failure_count = 0 if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.CLOSED def _on_failure(self): """失败时的处理""" self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN
监控和日志
7. 错误监控
pythonclass ErrorMonitor: def __init__(self): self.error_counts = {} self.error_rates = {} def record_error(self, error_type: str): """记录错误""" self.error_counts[error_type] = \ self.error_counts.get(error_type, 0) + 1 def get_error_rate(self, error_type: str) -> float: """获取错误率""" total = sum(self.error_counts.values()) if total == 0: return 0.0 return self.error_counts.get(error_type, 0) / total
最佳实践
- 区分错误类型:正确识别可重试和不可重试错误
- 合理设置重试参数:根据业务场景调整重试次数和延迟
- 实现断路器:防止级联失败
- 详细日志记录:记录所有错误和重试信息
- 监控和告警:实时监控错误率并设置告警
- 优雅降级:在服务不可用时提供降级方案
通过这些策略,可以构建一个健壮的 MCP 系统,有效处理各种错误情况。