服务端阅读 05月28日 06:45
MCP 如何实现多租户隔离?从协议机制到工程落地
MCP(Model Context Protocol)在多租户场景下面临核心挑战:单一 MCP 服务器如何同时为多个组织或客户提供隔离的、安全的服务?这涉及数据隔离、资源隔离、认证授权和性能隔离四个层面。下面从协议机制和工程实现两个维度展开。MCP 协议层面的多租户机制MCP 采用 Client-Server 架构,通过 JSON-RPC 2.0 通信。多租户支持需要在协议层面解决三个问题:租户身份传递:MCP 规范本身不定义租户字段,但通过 meta 字段可以携带租户标识。OpenAI Agents SDK 的 tool_meta_resolver 就利用这个机制,在每次工具调用时解析请求元数据中的 tenant_id。会话级隔离:每个 MCP Client 与 Server 建立独立会话(session),会话本身天然具有隔离性。多租户实现可以将租户上下文绑定到会话上,确保同一会话内的所有操作都在租户上下文中执行。OAuth 2.0 认证:远程 MCP Server 支持 OAuth 2.0 流程,通过 token 中的 tenant_id claim 实现租户识别。Microsoft Entra 的多租户应用注册模式就是典型实践——不同组织的用户通过各自 Azure AD 租户认证后访问同一个 MCP Server。租户上下文管理租户上下文是多租户架构的基础设施,它需要在请求的整个生命周期中保持可用:from dataclasses import dataclass, fieldfrom typing import Optionalimport contextvars@dataclassclass TenantContext: tenant_id: str tenant_name: str user_id: str permissions: list = field(default_factory=list) quotas: dict = field(default_factory=dict)current_tenant: contextvars.ContextVar[Optional[TenantContext]] = \ contextvars.ContextVar('current_tenant', default=None)class TenantContextManager: def __init__(self): self._contexts: dict[str, TenantContext] = {} def register(self, tenant_id: str, tenant_name: str, user_id: str, permissions: list = None, quotas: dict = None) -> TenantContext: ctx = TenantContext( tenant_id=tenant_id, tenant_name=tenant_name, user_id=user_id, permissions=permissions or [], quotas=quotas or self._default_quotas(), ) self._contexts[tenant_id] = ctx return ctx def activate(self, tenant_id: str) -> None: ctx = self._contexts.get(tenant_id) if ctx is None: raise KeyError(f"租户 {tenant_id} 未注册") current_tenant.set(ctx) def deactivate(self) -> None: current_tenant.set(None) def current(self) -> TenantContext: ctx = current_tenant.get() if ctx is None: raise RuntimeError("当前无活跃租户上下文") return ctx @staticmethod def _default_quotas() -> dict: return { "max_tools": 100, "max_resources": 1000, "max_requests_per_minute": 1000, "max_storage_mb": 1024, }使用 contextvars.ContextVar 而非线程局部变量,是因为 MCP Server 通常运行在 asyncio 事件循环中,ContextVar 能正确支持协程间的上下文隔离。activate / deactivate 的设计使得中间件可以在请求进入时设置上下文、请求结束时清理,避免上下文泄漏。数据隔离的三种策略多租户数据隔离有三种主流方案,各有取舍:方案一:共享数据库 + 行级隔离(tenant_id 列过滤)所有租户数据存同一张表,通过 tenant_id 列区分。实现简单,但需要在每次查询中都加入 tenant_id 过滤条件,遗漏即造成数据泄漏。方案二:共享数据库 + Schema 隔离每个租户拥有独立的数据库 Schema,表结构相同但数据物理隔离。安全性高于行级隔离,但 Schema 迁移管理复杂。方案三:独立数据库每个租户使用独立的数据库实例,隔离性最强,但运维成本和资源开销最大。以下是基于方案一的行级隔离实现,配合 SQLAlchemy 全局自动过滤:from sqlalchemy import create_engine, Column, String, Integer, Text, Indexfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, scoped_session, SessionBase = declarative_base()class TenantData(Base): __tablename__ = 'tenant_data' id = Column(Integer, primary_key=True) tenant_id = Column(String(50), nullable=False, index=True) data_key = Column(String(100), nullable=False) data_value = Column(Text) __table_args__ = ( Index('idx_tenant_key', 'tenant_id', 'data_key', unique=True), )class TenantSession(Session): """自动注入租户过滤的 Session 子类""" def __init__(self, tenant_id: str, *args, **kwargs): super().__init__(*args, **kwargs) self._tenant_id = tenant_id def query(self, *entities, **kwargs): q = super().query(*entities, **kwargs) for entity in entities: if hasattr(entity, 'tenant_id'): q = q.filter(entity.tenant_id == self._tenant_id) return qclass MultiTenantDatabase: def __init__(self, database_url: str): self.engine = create_engine(database_url) Base.metadata.create_all(self.engine) def session(self, tenant_id: str) -> TenantSession: return TenantSession( tenant_id, bind=self.engine, ) def save(self, tenant_id: str, key: str, value: str): with self.session(tenant_id) as s: existing = s.query(TenantData).filter( TenantData.tenant_id == tenant_id, TenantData.data_key == key, ).first() if existing: existing.data_value = value else: s.add(TenantData( tenant_id=tenant_id, data_key=key, data_value=value, )) s.commit()关键设计点:TenantSession 继承自 Session 并在 query 方法中自动注入 tenant_id 过滤,从根本上杜绝了忘记加过滤条件导致的数据泄漏。写入时通过 Session 的 before_flush 事件确保 tenant_id 被正确填入。资源配额与速率限制不同租户的付费等级不同,需要精确控制每个租户可用的工具数量、存储空间和请求频率:import timefrom collections import defaultdictfrom dataclasses import dataclass, field@dataclassclass QuotaConfig: max_tools: int = 100 max_resources: int = 1000 max_requests_per_minute: int = 1000 max_storage_mb: int = 1024class QuotaManager: def __init__(self): self._configs: dict[str, QuotaConfig] = {} self._usage: dict[str, dict[str, int]] = defaultdict( lambda: defaultdict(int) ) self._rate_windows: dict[str, list[float]] = defaultdict(list) def configure(self, tenant_id: str, config: QuotaConfig): self._configs[tenant_id] = config def config(self, tenant_id: str) -> QuotaConfig: return self._configs.get(tenant_id, QuotaConfig()) def consume(self, tenant_id: str, resource: str, amount: int = 1) -> bool: cfg = self.config(tenant_id) limit = getattr(cfg, resource, None) if limit is None: return True if self._usage[tenant_id][resource] + amount > limit: return False self._usage[tenant_id][resource] += amount return True def release(self, tenant_id: str, resource: str, amount: int = 1): self._usage[tenant_id][resource] = max( 0, self._usage[tenant_id][resource] - amount ) def check_rate(self, tenant_id: str) -> bool: cfg = self.config(tenant_id) now = time.monotonic() window = self._rate_windows[tenant_id] cutoff = now - 60 self._rate_windows[tenant_id] = [ t for t in window if t > cutoff ] if len(self._rate_windows[tenant_id]) >= cfg.max_requests_per_minute: return False self._rate_windows[tenant_id].append(now) return TrueQuotaManager 区分了资源配额(如工具数量、存储空间)和速率限制(每分钟请求数)。前者是持久消耗型,用 consume / release 管理;后者是滑动窗口型,用 check_rate 在每次请求前校验。time.monotonic() 而非 time.time() 避免了系统时钟回拨导致的速率限制失效。租户级别的工具与资源注册MCP Server 中的工具(Tools)和资源(Resources)需要按租户可见性进行隔离。核心思路是:全局工具对所有租户可见,租户专属工具仅对拥有权限的租户展示:from mcp.server import Serverfrom functools import wrapsfrom collections import defaultdictclass MultiTenantServer(Server): def __init__(self, name: str, ctx_manager: TenantContextManager): super().__init__(name) self._ctx = ctx_manager self._global_tools: dict[str, dict] = {} self._tenant_tools: dict[str, dict[str, dict]] = defaultdict(dict) self._global_resources: dict[str, dict] = {} self._tenant_resources: dict[str, dict[str, dict]] = defaultdict(dict) def register_tool(self, name: str, handler, description: str, tenant_id: str = None): entry = {"handler": handler, "description": description} if tenant_id: self._tenant_tools[tenant_id][name] = entry else: self._global_tools[name] = entry def register_resource(self, uri: str, handler, name: str, description: str, tenant_id: str = None): entry = {"handler": handler, "name": name, "description": description} if tenant_id: self._tenant_resources[tenant_id][uri] = entry else: self._global_resources[uri] = entry async def list_tools(self) -> list[dict]: ctx = current_tenant.get() if ctx is None: return [] tools = [ {"name": n, "description": t["description"]} for n, t in self._global_tools.items() ] for n, t in self._tenant_tools.get(ctx.tenant_id, {}).items(): tools.append({"name": n, "description": t["description"]}) return tools async def call_tool(self, name: str, arguments: dict): ctx = current_tenant.get() if ctx is None: raise PermissionError("未找到租户上下文") entry = self._tenant_tools.get(ctx.tenant_id, {}).get(name) if entry is None: entry = self._global_tools.get(name) if entry is None: raise KeyError(f"工具 {name} 不存在") if "admin" not in ctx.permissions: required = entry.get("required_permission") if required and required not in ctx.permissions: raise PermissionError( f"租户 {ctx.tenant_id} 无权使用工具 {name}" ) return await entry["handler"](arguments)list_tools 只返回当前租户可见的工具列表,call_tool 在执行前校验租户权限。查找顺序是"租户专属优先,全局兜底",这样同名工具可以被租户覆盖以提供定制行为。认证与授权MCP 远程 Server 支持 OAuth 2.0,多租户认证的推荐做法是将租户身份编码在 JWT token 中:import jwtfrom datetime import datetime, timedelta, timezonefrom typing import Anyclass TenantAuthenticator: def __init__(self, secret_key: str, algorithm: str = "HS256"): self._secret = secret_key self._algo = algorithm def issue_token(self, tenant_id: str, user_id: str, permissions: list[str], expires_in: int = 3600) -> str: now = datetime.now(timezone.utc) payload = { "sub": user_id, "tid": tenant_id, "perms": permissions, "iat": now, "exp": now + timedelta(seconds=expires_in), } return jwt.encode(payload, self._secret, algorithm=self._algo) def verify(self, token: str) -> dict[str, Any]: try: return jwt.decode( token, self._secret, algorithms=[self._algo] ) except jwt.ExpiredSignatureError: raise ValueError("令牌已过期") except jwt.InvalidTokenError: raise ValueError("无效令牌") def authenticate(self, token: str) -> TenantContext: claims = self.verify(token) return TenantContext( tenant_id=claims["tid"], tenant_name=claims.get("tenant_name", claims["tid"]), user_id=claims["sub"], permissions=claims.get("perms", []), )JWT 中用 tid(tenant id)而非 tenant_id,是为了缩短 token 体积。authenticate 方法直接返回 TenantContext,中间件可以一行代码完成"验证 token + 激活租户上下文"。在实际部署中,如果 MCP Server 对接 Microsoft Entra 等 IdP,则 tid 可以直接映射为 Entra JWT 中的 tid claim(即 Azure AD 租户 ID),无需自行签发 token。租户监控与审计多租户环境下需要按租户维度收集指标和审计日志,既用于运营分析,也为故障排查和安全审计提供依据:from datetime import datetime, timedelta, timezonefrom collections import defaultdictclass TenantMonitor: def __init__(self, max_records: int = 5000): self._max = max_records self._metrics: dict[str, dict[str, list[dict]]] = \ defaultdict(lambda: defaultdict(list)) self._audit_log: list[dict] = [] def record(self, tenant_id: str, metric: str, value: float): entry = { "value": value, "ts": datetime.now(timezone.utc).isoformat(), } buf = self._metrics[tenant_id][metric] buf.append(entry) if len(buf) > self._max: self._metrics[tenant_id][metric] = buf[-self._max:] def audit(self, tenant_id: str, action: str, detail: str = ""): self._audit_log.append({ "tenant_id": tenant_id, "action": action, "detail": detail, "ts": datetime.now(timezone.utc).isoformat(), }) if len(self._audit_log) > 10000: self._audit_log = self._audit_log[-10000:] def aggregate(self, tenant_id: str, metric: str, since: datetime = None) -> dict: records = self._metrics[tenant_id].get(metric, []) if since: records = [ r for r in records if datetime.fromisoformat(r["ts"]) >= since ] if not records: return {} values = [r["value"] for r in records] return { "count": len(values), "sum": sum(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values), } def report(self, tenant_id: str, days: int = 7) -> dict: since = datetime.now(timezone.utc) - timedelta(days=days) return { "tenant_id": tenant_id, "period_days": days, "metrics": { m: self.aggregate(tenant_id, m, since) for m in self._metrics.get(tenant_id, {}) }, }审计日志记录了"哪个租户在什么时间做了什么操作",当发生安全事件时可以快速追溯。指标聚合支持按时间范围过滤,便于生成租户维度的运营报告。多租户 MCP Server 的中间件集成将上述组件串联起来,MCP Server 的请求处理流程如下:请求到达,中间件从 HTTP Header 或 OAuth token 中提取租户身份TenantAuthenticator.authenticate() 验证 token 并构建 TenantContextTenantContextManager.activate() 激活租户上下文QuotaManager.check_rate() 校验速率限制执行工具/资源操作(自动带租户隔离)TenantMonitor.record() 记录指标,audit() 记录审计日志TenantContextManager.deactivate() 清理租户上下文async def tenant_middleware(request, handler): token = request.headers.get("Authorization", "").removeprefix("Bearer ") try: ctx = authenticator.authenticate(token) ctx_manager.activate(ctx.tenant_id) if not quota_manager.check_rate(ctx.tenant_id): return {"error": "rate_limit_exceeded"}, 429 monitor.audit(ctx.tenant_id, "request", request.path) result = await handler(request) monitor.record(ctx.tenant_id, "request_count", 1) return result except ValueError as e: return {"error": str(e)}, 401 finally: ctx_manager.deactivate()中间件模式确保租户上下文的生命周期管理集中在一处,业务逻辑无需关心租户切换细节。finally 块中的 deactivate() 保证了即使请求处理异常,租户上下文也不会泄漏到下一个请求。隔离方案选择建议不同规模和场景适合不同的隔离策略:| 场景 | 推荐方案 | 理由 ||------|---------|------|| 初创期/租户少于20 | 行级隔离 | 实现简单,运维成本低 || 中等规模/20-200租户 | Schema 隔离 | 兼顾安全性和管理成本 || 企业级/200+租户 | 独立数据库 | 合规要求高,隔离性最强 || 混合场景 | 行级隔离 + 热点租户独立库 | 平衡成本与性能 |选择时还需考虑:是否需要支持租户级别的数据库备份恢复、是否面临合规审计要求、单个租户的数据量是否足以影响整体性能。多租户是 MCP Server 走向生产环境的关键能力。核心在于让租户身份贯穿请求全生命周期——从认证到数据访问再到监控审计,每一层都必须感知租户边界。