5月30日 20:38
How does MCP implement error handling and retry mechanisms?
Error handling and retry mechanisms for MCP are crucial for ensuring system stability and reliability. Here are detailed error handling strategies and retry mechanism implementations:
Error Handling Architecture
MCP error handling should consider following aspects:
- Error Classification: Distinguish between different types of errors
- Error Propagation: Properly propagate error information
- Error Recovery: Implement error recovery mechanisms
- Retry Strategy: Intelligent retry strategies
- Circuit Breaker: Prevent cascading failures
- Fallback Strategy: Provide degraded services during failures
1. Error Classification and Definition
pythonfrom enum import Enum from typing import Optional, Dict, Any from dataclasses import dataclass class ErrorType(Enum): """Error type""" VALIDATION_ERROR = "validation_error" AUTHENTICATION_ERROR = "authentication_error" AUTHORIZATION_ERROR = "authorization_error" NOT_FOUND_ERROR = "not_found_error" CONFLICT_ERROR = "conflict_error" RATE_LIMIT_ERROR = "rate_limit_error" INTERNAL_ERROR = "internal_error" EXTERNAL_SERVICE_ERROR = "external_service_error" TIMEOUT_ERROR = "timeout_error" NETWORK_ERROR = "network_error" class ErrorSeverity(Enum): """Error severity""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class MCPError(Exception): """MCP error base class""" error_type: ErrorType message: str code: int details: Dict[str, Any] = None severity: ErrorSeverity = ErrorSeverity.MEDIUM retryable: bool = False def __post_init__(self): if self.details is None: self.details = {} super().__init__(self.message) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return { "error_type": self.error_type.value, "message": self.message, "code": self.code, "details": self.details, "severity": self.severity.value, "retryable": self.retryable } class ValidationError(MCPError): """Validation error""" def __init__(self, message: str, details: Dict[str, Any] = None): super().__init__( error_type=ErrorType.VALIDATION_ERROR, message=message, code=400, details=details, severity=ErrorSeverity.LOW, retryable=False ) class AuthenticationError(MCPError): """Authentication error""" def __init__(self, message: str = "Authentication failed"): super().__init__( error_type=ErrorType.AUTHENTICATION_ERROR, message=message, code=401, severity=ErrorSeverity.HIGH, retryable=False ) class AuthorizationError(MCPError): """Authorization error""" def __init__(self, message: str = "Access denied"): super().__init__( error_type=ErrorType.AUTHORIZATION_ERROR, message=message, code=403, severity=ErrorSeverity.HIGH, retryable=False ) class NotFoundError(MCPError): """Not found error""" def __init__(self, resource: str, identifier: str): super().__init__( error_type=ErrorType.NOT_FOUND_ERROR, message=f"{resource} not found: {identifier}", code=404, details={"resource": resource, "identifier": identifier}, severity=ErrorSeverity.LOW, retryable=False ) class RateLimitError(MCPError): """Rate limit error""" def __init__(self, message: str = "Rate limit exceeded", retry_after: int = 60): super().__init__( error_type=ErrorType.RATE_LIMIT_ERROR, message=message, code=429, details={"retry_after": retry_after}, severity=ErrorSeverity.MEDIUM, retryable=True ) class InternalError(MCPError): """Internal error""" def __init__(self, message: str = "Internal server error"): super().__init__( error_type=ErrorType.INTERNAL_ERROR, message=message, code=500, severity=ErrorSeverity.CRITICAL, retryable=True ) class ExternalServiceError(MCPError): """External service error""" def __init__(self, service: str, message: str): super().__init__( error_type=ErrorType.EXTERNAL_SERVICE_ERROR, message=f"{service} error: {message}", code=502, details={"service": service}, severity=ErrorSeverity.HIGH, retryable=True ) class TimeoutError(MCPError): """Timeout error""" def __init__(self, operation: str, timeout: float): super().__init__( error_type=ErrorType.TIMEOUT_ERROR, message=f"{operation} timed out after {timeout}s", code=504, details={"operation": operation, "timeout": timeout}, severity=ErrorSeverity.HIGH, retryable=True )
2. Error Handler
pythonfrom typing import Callable, Optional, Dict, Any import logging import traceback class ErrorHandler: """Error handler""" def __init__(self, logger: logging.Logger = None): self.logger = logger or logging.getLogger(__name__) self.error_handlers: Dict[ErrorType, Callable] = {} self.error_reporters: List[Callable] = [] def register_handler( self, error_type: ErrorType, handler: Callable ): """Register error handler""" self.error_handlers[error_type] = handler def register_reporter(self, reporter: Callable): """Register error reporter""" self.error_reporters.append(reporter) async def handle_error( self, error: Exception, context: Dict[str, Any] = None ) -> Dict[str, Any]: """Handle error""" # Log error await self._log_error(error, context) # Report error await self._report_error(error, context) # Convert to MCP error mcp_error = self._convert_to_mcp_error(error) # Call specific error handler if mcp_error.error_type in self.error_handlers: try: result = await self.error_handlers[mcp_error.error_type]( mcp_error, context ) return result except Exception as e: self.logger.error(f"Error handler failed: {e}") # Return default error response return mcp_error.to_dict() async def _log_error( self, error: Exception, context: Dict[str, Any] = None ): """Log error""" if isinstance(error, MCPError): self.logger.error( f"MCP Error: {error.error_type.value} - {error.message}", extra={ "error_code": error.code, "error_details": error.details, "context": context } ) else: self.logger.error( f"Unexpected error: {str(error)}", exc_info=True, extra={"context": context} ) async def _report_error( self, error: Exception, context: Dict[str, Any] = None ): """Report error""" for reporter in self.error_reporters: try: await reporter(error, context) except Exception as e: self.logger.error(f"Error reporter failed: {e}") def _convert_to_mcp_error(self, error: Exception) -> MCPError: """Convert to MCP error""" if isinstance(error, MCPError): return error # Convert based on exception type if isinstance(error, ValueError): return ValidationError(str(error)) elif isinstance(error, PermissionError): return AuthorizationError(str(error)) elif isinstance(error, TimeoutError): return TimeoutError("operation", 0) else: return InternalError(str(error)) # Error reporter example class ErrorReporter: """Error reporter""" def __init__(self, error_service_url: str): self.error_service_url = error_service_url async def report_error( self, error: Exception, context: Dict[str, Any] = None ): """Report error to error service""" import aiohttp error_data = { "error": str(error), "error_type": type(error).__name__, "context": context or {}, "timestamp": datetime.now().isoformat() } try: async with aiohttp.ClientSession() as session: async with session.post( self.error_service_url, json=error_data ) as response: if response.status != 200: self.logger.error( f"Failed to report error: {response.status}" ) except Exception as e: self.logger.error(f"Failed to report error: {e}")
3. Retry Mechanism
pythonimport asyncio from typing import Callable, Optional, Type import time class RetryStrategy: """Retry strategy base class""" async def should_retry( self, attempt: int, error: Exception ) -> bool: """Determine if should retry""" raise NotImplementedError async def get_delay(self, attempt: int) -> float: """Get retry delay""" raise NotImplementedError class FixedDelayRetry(RetryStrategy): """Fixed delay retry""" def __init__(self, max_attempts: int = 3, delay: float = 1.0): self.max_attempts = max_attempts self.delay = delay async def should_retry( self, attempt: int, error: Exception ) -> bool: """Determine if should retry""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """Get retry delay""" return self.delay class ExponentialBackoffRetry(RetryStrategy): """Exponential backoff retry""" def __init__( self, max_attempts: int = 5, initial_delay: float = 1.0, max_delay: float = 60.0, backoff_factor: float = 2.0 ): self.max_attempts = max_attempts self.initial_delay = initial_delay self.max_delay = max_delay self.backoff_factor = backoff_factor async def should_retry( self, attempt: int, error: Exception ) -> bool: """Determine if should retry""" if attempt >= self.max_attempts: return False if isinstance(error, MCPError): return error.retryable return True async def get_delay(self, attempt: int) -> float: """Get retry delay""" delay = self.initial_delay * (self.backoff_factor ** attempt) return min(delay, self.max_delay) class RetryManager: """Retry manager""" def __init__(self, retry_strategy: RetryStrategy): self.retry_strategy = retry_strategy async def execute_with_retry( self, func: Callable, *args, **kwargs ) -> Any: """Execute function with retry""" attempt = 0 last_error = None while True: attempt += 1 try: result = await func(*args, **kwargs) return result except Exception as error: last_error = error # Determine if should retry should_retry = await self.retry_strategy.should_retry( attempt, error ) if not should_retry: raise error # Get retry delay delay = await self.retry_strategy.get_delay(attempt) # Wait and retry await asyncio.sleep(delay) raise last_error # Retry decorator def retry( max_attempts: int = 3, delay: float = 1.0, backoff_factor: float = 2.0, max_delay: float = 60.0 ): """Retry decorator""" def decorator(func: Callable): retry_strategy = ExponentialBackoffRetry( max_attempts=max_attempts, initial_delay=delay, max_delay=max_delay, backoff_factor=backoff_factor ) retry_manager = RetryManager(retry_strategy) @wraps(func) async def wrapper(*args, **kwargs): return await retry_manager.execute_with_retry( func, *args, **kwargs ) return wrapper return decorator
4. Circuit Breaker Mechanism
pythonfrom enum import Enum from typing import Callable, Optional import asyncio class CircuitState(Enum): """Circuit breaker state""" CLOSED = "closed" # Normal state OPEN = "open" # Circuit broken state HALF_OPEN = "half_open" # Half-open state class CircuitBreaker: """Circuit breaker""" def __init__( self, failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0 ): self.failure_threshold = failure_threshold self.success_threshold = success_threshold self.timeout = timeout self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time: Optional[float] = None self.lock = asyncio.Lock() async def execute( self, func: Callable, *args, **kwargs ) -> Any: """Execute function""" async with self.lock: # Check circuit breaker state if self.state == CircuitState.OPEN: # Check if should try to recover if time.time() - self.last_failure_time > self.timeout: self.state = CircuitState.HALF_OPEN self.success_count = 0 else: raise MCPError( error_type=ErrorType.INTERNAL_ERROR, message="Circuit breaker is OPEN", code=503, retryable=True ) try: result = await func(*args, **kwargs) # Successful execution async with self.lock: if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.success_threshold: self.state = CircuitState.CLOSED self.failure_count = 0 elif self.state == CircuitState.CLOSED: self.failure_count = 0 return result except Exception as error: # Execution failed async with self.lock: self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN raise error def get_state(self) -> CircuitState: """Get circuit breaker state""" return self.state def reset(self): """Reset circuit breaker""" async with self.lock: self.state = CircuitState.CLOSED self.failure_count = 0 self.success_count = 0 self.last_failure_time = None # Circuit breaker decorator def circuit_breaker( failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 60.0 ): """Circuit breaker decorator""" def decorator(func: Callable): breaker = CircuitBreaker( failure_threshold=failure_threshold, success_threshold=success_threshold, timeout=timeout ) @wraps(func) async def wrapper(*args, **kwargs): return await breaker.execute(func, *args, **kwargs) return wrapper return decorator
5. Fallback Strategy
pythonfrom typing import Callable, Optional, Dict, Any import asyncio class FallbackStrategy: """Fallback strategy base class""" async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """Execute fallback logic""" raise NotImplementedError class CacheFallback(FallbackStrategy): """Cache fallback""" def __init__(self, cache: Dict[str, Any]): self.cache = cache async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """Get data from cache""" cache_key = context.get("cache_key") if context else None if cache_key and cache_key in self.cache: return self.cache[cache_key] raise error class DefaultFallback(FallbackStrategy): """Default value fallback""" def __init__(self, default_value: Any): self.default_value = default_value async def execute_fallback( self, error: Exception, context: Dict[str, Any] = None ) -> Any: """Return default value""" return self.default_value class FallbackManager: """Fallback manager""" def __init__(self): self.fallback_strategies: Dict[ErrorType, FallbackStrategy] = {} self.default_fallback: Optional[FallbackStrategy] = None def register_fallback( self, error_type: ErrorType, fallback: FallbackStrategy ): """Register fallback strategy""" self.fallback_strategies[error_type] = fallback def set_default_fallback(self, fallback: FallbackStrategy): """Set default fallback strategy""" self.default_fallback = fallback async def execute_with_fallback( self, func: Callable, context: Dict[str, Any] = None, *args, **kwargs ) -> Any: """Execute function with fallback""" try: return await func(*args, **kwargs) except Exception as error: # Convert to MCP error if not isinstance(error, MCPError): error = InternalError(str(error)) # Find corresponding fallback strategy fallback = self.fallback_strategies.get( error.error_type, self.default_fallback ) if fallback: try: return await fallback.execute_fallback(error, context) except Exception as fallback_error: raise fallback_error raise error # Fallback decorator def fallback( error_type: ErrorType = None, default_value: Any = None ): """Fallback decorator""" def decorator(func: Callable): fallback_manager = FallbackManager() if error_type and default_value is not None: fallback_manager.register_fallback( error_type, DefaultFallback(default_value) ) @wraps(func) async def wrapper(*args, **kwargs): return await fallback_manager.execute_with_fallback( func, None, *args, **kwargs ) return wrapper return decorator
6. Comprehensive Error Handling Example
pythonfrom mcp.server import Server class RobustMCPServer(Server): """Robust MCP server""" def __init__(self, name: str): super().__init__(name) # Initialize error handling components self.error_handler = ErrorHandler() self.retry_manager = RetryManager(ExponentialBackoffRetry()) self.circuit_breaker = CircuitBreaker() self.fallback_manager = FallbackManager() # Configure error handling self._setup_error_handling() def _setup_error_handling(self): """Setup error handling""" # Register error handlers self.error_handler.register_handler( ErrorType.VALIDATION_ERROR, self._handle_validation_error ) self.error_handler.register_handler( ErrorType.RATE_LIMIT_ERROR, self._handle_rate_limit_error ) # Register fallback strategies self.fallback_manager.register_fallback( ErrorType.EXTERNAL_SERVICE_ERROR, CacheFallback({}) ) async def _handle_validation_error( self, error: ValidationError, context: Dict[str, Any] ) -> Dict[str, Any]: """Handle validation error""" return { "error": error.to_dict(), "suggestions": self._get_validation_suggestions(error.details) } async def _handle_rate_limit_error( self, error: RateLimitError, context: Dict[str, Any] ) -> Dict[str, Any]: """Handle rate limit error""" retry_after = error.details.get("retry_after", 60) return { "error": error.to_dict(), "retry_after": retry_after, "message": f"Please wait {retry_after} seconds before retrying" } def _get_validation_suggestions( self, details: Dict[str, Any] ) -> List[str]: """Get validation suggestions""" suggestions = [] # Provide suggestions based on error details # ... return suggestions @retry(max_attempts=3, delay=1.0) @circuit_breaker(failure_threshold=5, timeout=60.0) @fallback(error_type=ErrorType.EXTERNAL_SERVICE_ERROR, default_value={}) async def call_external_service( self, service_url: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call external service""" try: # Call external service # ... pass except Exception as error: # Convert to MCP error raise ExternalServiceError("external", str(error))
Best Practices:
- Error Classification: Properly classify error types for targeted handling
- Retry Strategy: Choose appropriate retry strategy based on error type
- Circuit Breaker: Prevent cascading failures and protect system stability
- Fallback Strategy: Provide degraded services during failures to ensure basic functionality
- Error Logging: Log detailed error information for troubleshooting
- Monitoring and Alerting: Monitor error rates and detect issues in time
Through comprehensive error handling and retry mechanisms, you can ensure MCP system stability and reliability.