MCP 的多租户支持对于企业级应用至关重要,它允许在单一 MCP 服务器实例中为多个客户或组织提供隔离的服务。以下是详细的实现方法:
多租户架构设计
MCP 多租户应考虑以下方面:
- 数据隔离:确保不同租户的数据完全隔离
- 资源隔离:隔离计算资源和配额
- 安全隔离:实现租户级别的认证和授权
- 性能隔离:防止单个租户影响其他租户
1. 租户识别和上下文
pythonfrom typing import Optional from dataclasses import dataclass @dataclass class TenantContext: """租户上下文""" tenant_id: str tenant_name: str user_id: str permissions: list quotas: dict class TenantContextManager: def __init__(self): self.contexts = {} def create_context( self, tenant_id: str, tenant_name: str, user_id: str, permissions: list, quotas: dict = None ) -> TenantContext: """创建租户上下文""" context = TenantContext( tenant_id=tenant_id, tenant_name=tenant_name, user_id=user_id, permissions=permissions, quotas=quotas or self._get_default_quotas(tenant_id) ) self.contexts[tenant_id] = context return context def get_context(self, tenant_id: str) -> Optional[TenantContext]: """获取租户上下文""" return self.contexts.get(tenant_id) def set_current_context(self, tenant_id: str): """设置当前租户上下文""" context = self.get_context(tenant_id) if not context: raise ValueError(f"租户 {tenant_id} 不存在") # 使用线程本地存储或异步上下文变量 import contextvars current_tenant.set(context) def _get_default_quotas(self, tenant_id: str) -> dict: """获取默认配额""" return { "max_tools": 100, "max_resources": 1000, "max_requests_per_minute": 1000, "max_storage_mb": 1024 } # 当前租户上下文变量 current_tenant = contextvars.ContextVar('current_tenant', default=None)
2. 数据隔离
pythonfrom sqlalchemy import create_engine, Column, String, Integer, Text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, scoped_session Base = declarative_base() class TenantData(Base): """租户数据表""" __tablename__ = 'tenant_data' id = Column(Integer, primary_key=True) tenant_id = Column(String(50), nullable=False, index=True) data_key = Column(String(100), nullable=False) data_value = Column(Text) __table_args__ = ( # 确保租户隔离 Index('idx_tenant_key', 'tenant_id', 'data_key', unique=True), ) class MultiTenantDatabase: def __init__(self, database_url: str): self.engine = create_engine(database_url) Base.metadata.create_all(self.engine) self.SessionLocal = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=self.engine) ) def get_session(self, tenant_id: str): """获取租户专属的数据库会话""" session = self.SessionLocal() # 添加租户过滤器 from sqlalchemy import event @event.listens_for(session, 'before_flush') def add_tenant_filter(session, context, instances): for instance in session.new: if hasattr(instance, 'tenant_id'): instance.tenant_id = tenant_id return session def query_tenant_data( self, tenant_id: str, data_key: str ) -> Optional[str]: """查询租户数据""" session = self.get_session(tenant_id) try: result = session.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == data_key ).first() return result.data_value if result else None finally: session.close() def save_tenant_data( self, tenant_id: str, data_key: str, data_value: str ): """保存租户数据""" session = self.get_session(tenant_id) try: existing = session.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == data_key ).first() if existing: existing.data_value = data_value else: new_data = TenantData( tenant_id=tenant_id, data_key=data_key, data_value=data_value ) session.add(new_data) session.commit() except Exception as e: session.rollback() raise e finally: session.close()
3. 资源配额管理
pythonfrom collections import defaultdict import time class QuotaManager: def __init__(self): self.quotas = {} self.usage = defaultdict(lambda: defaultdict(int)) self.rate_limits = {} def set_quota( self, tenant_id: str, quota_type: str, limit: int ): """设置租户配额""" if tenant_id not in self.quotas: self.quotas[tenant_id] = {} self.quotas[tenant_id][quota_type] = limit def check_quota( self, tenant_id: str, quota_type: str, amount: int = 1 ) -> bool: """检查配额是否足够""" if tenant_id not in self.quotas: return True limit = self.quotas[tenant_id].get(quota_type) if limit is None: return True current_usage = self.usage[tenant_id][quota_type] return current_usage + amount <= limit def consume_quota( self, tenant_id: str, quota_type: str, amount: int = 1 ) -> bool: """消耗配额""" if not self.check_quota(tenant_id, quota_type, amount): return False self.usage[tenant_id][quota_type] += amount return True def get_usage( self, tenant_id: str, quota_type: str ) -> int: """获取使用量""" return self.usage[tenant_id][quota_type] def reset_usage(self, tenant_id: str): """重置使用量""" if tenant_id in self.usage: self.usage[tenant_id].clear() def check_rate_limit( self, tenant_id: str, window: int = 60, max_requests: int = 100 ) -> bool: """检查速率限制""" now = time.time() if tenant_id not in self.rate_limits: self.rate_limits[tenant_id] = [] # 清理过期的请求记录 self.rate_limits[tenant_id] = [ timestamp for timestamp in self.rate_limits[tenant_id] if now - timestamp < window ] # 检查是否超过限制 if len(self.rate_limits[tenant_id]) >= max_requests: return False # 记录新请求 self.rate_limits[tenant_id].append(now) return True
4. 租户级别的工具和资源
pythonfrom mcp.server import Server from functools import wraps class MultiTenantServer(Server): def __init__(self, name: str, tenant_manager: TenantContextManager): super().__init__(name) self.tenant_manager = tenant_manager self.tenant_tools = defaultdict(dict) self.tenant_resources = defaultdict(dict) def tenant_tool( self, name: str, description: str, tenant_id: str = None ): """租户专属工具装饰器""" def decorator(func): # 注册工具 self.tenant_tools[tenant_id or "default"][name] = { "function": func, "description": description } @wraps(func) async def wrapper(*args, **kwargs): # 获取当前租户 context = current_tenant.get() if not context: raise PermissionError("未找到租户上下文") # 检查租户权限 if tenant_id and context.tenant_id != tenant_id: raise PermissionError("无权访问此工具") # 执行工具 return await func(*args, **kwargs) return wrapper return decorator def tenant_resource( self, uri: str, name: str, description: str, tenant_id: str = None ): """租户专属资源装饰器""" def decorator(func): # 注册资源 self.tenant_resources[tenant_id or "default"][uri] = { "function": func, "name": name, "description": description } @wraps(func) async def wrapper(*args, **kwargs): # 获取当前租户 context = current_tenant.get() if not context: raise PermissionError("未找到租户上下文") # 检查租户权限 if tenant_id and context.tenant_id != tenant_id: raise PermissionError("无权访问此资源") # 执行资源 return await func(*args, **kwargs) return wrapper return decorator async def list_tools(self, tenant_id: str = None) -> list: """列出可用工具""" context = current_tenant.get() if not context: return [] # 获取默认工具和租户专属工具 tools = [] # 添加默认工具 for name, tool_info in self.tenant_tools["default"].items(): tools.append({ "name": name, "description": tool_info["description"] }) # 添加租户专属工具 if context.tenant_id in self.tenant_tools: for name, tool_info in self.tenant_tools[context.tenant_id].items(): tools.append({ "name": name, "description": tool_info["description"] }) return tools
5. 租户认证和授权
pythonimport jwt from datetime import datetime, timedelta from typing import Dict, Any class TenantAuthenticator: def __init__(self, secret_key: str): self.secret_key = secret_key def generate_token( self, tenant_id: str, user_id: str, permissions: list, expires_in: int = 3600 ) -> str: """生成租户令牌""" payload = { "tenant_id": tenant_id, "user_id": user_id, "permissions": permissions, "exp": datetime.utcnow() + timedelta(seconds=expires_in), "iat": datetime.utcnow() } token = jwt.encode(payload, self.secret_key, algorithm="HS256") return token def verify_token(self, token: str) -> Dict[str, Any]: """验证租户令牌""" try: payload = jwt.decode(token, self.secret_key, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise ValueError("令牌已过期") except jwt.InvalidTokenError: raise ValueError("无效的令牌") def check_permission( self, token: str, required_permission: str ) -> bool: """检查权限""" payload = self.verify_token(token) permissions = payload.get("permissions", []) return required_permission in permissions or "admin" in permissions
6. 租户监控和报告
pythonfrom collections import defaultdict from datetime import datetime, timedelta class TenantMonitor: def __init__(self): self.metrics = defaultdict(lambda: defaultdict(list)) def record_metric( self, tenant_id: str, metric_name: str, value: float ): """记录指标""" timestamp = datetime.now() self.metrics[tenant_id][metric_name].append({ "value": value, "timestamp": timestamp }) # 限制历史记录大小 if len(self.metrics[tenant_id][metric_name]) > 1000: self.metrics[tenant_id][metric_name] = \ self.metrics[tenant_id][metric_name][-1000:] def get_metrics( self, tenant_id: str, metric_name: str, since: datetime = None ) -> list: """获取指标""" if tenant_id not in self.metrics: return [] if metric_name not in self.metrics[tenant_id]: return [] records = self.metrics[tenant_id][metric_name] if since: records = [ record for record in records if record["timestamp"] >= since ] return records def get_aggregated_metrics( self, tenant_id: str, metric_name: str, since: datetime = None ) -> dict: """获取聚合指标""" records = self.get_metrics(tenant_id, metric_name, since) if not records: return {} values = [record["value"] for record in records] return { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values) } def generate_tenant_report( self, tenant_id: str, since: datetime = None ) -> dict: """生成租户报告""" if not since: since = datetime.now() - timedelta(days=7) report = { "tenant_id": tenant_id, "period": { "start": since, "end": datetime.now() }, "metrics": {} } if tenant_id in self.metrics: for metric_name in self.metrics[tenant_id]: report["metrics"][metric_name] = \ self.get_aggregated_metrics(tenant_id, metric_name, since) return report
最佳实践:
- 数据隔离:使用租户 ID 作为所有数据表的主键或索引
- 配额管理:为每个租户设置合理的资源配额
- 权限控制:实施细粒度的租户级权限控制
- 性能监控:监控每个租户的资源使用情况
- 安全审计:记录所有租户操作用于审计
- 弹性扩展:根据租户需求动态扩展资源
通过完善的多租户支持,可以在单一 MCP 服务器实例中为多个客户或组织提供隔离、安全、高效的服务。