服务端阅读 05月28日 06:55
如何在 MCP 中实现会话管理和上下文维护?
MCP 的会话管理和上下文维护直接影响多轮对话的连贯性和服务端的可扩展性。面试中这道题考察的是你对 MCP 协议层的理解深度,而不只是写一个内存字典。核心答案MCP 会话管理分三个层级:无状态会话(每次请求携带完整上下文)、服务端托管会话(服务器维护状态,客户端持 session ID)、客户端托管会话(上下文存储在客户端,随请求发出)。选择哪种取决于你的部署架构——单机用服务端托管最简单,K8s 水平扩展优先无状态或客户端托管。上下文维护的关键在于:会话生命周期管理(创建/恢复/迁移/销毁)、上下文窗口的裁剪与摘要、以及状态转换的合法性校验。下面逐个拆解。MCP 协议层的会话机制MCP 基于 JSON-RPC 2.0 通信,传输层支持 Stdio(本地进程)和 HTTP+SSE(远程服务)。每个 client-server 连接是一对一的,客户端负责会话的完整生命周期——包括超时、重连和关闭。2025 年引入 Streamable HTTP 后,MCP 服务可以部署为远程服务,但有状态会话在水平扩展时暴露了问题:服务端内存中的 session 导致负载均衡必须使用 sticky routing,跨 Pod 部署需要 Redis 做会话映射。这也是 2026 MCP 规范重点解决的架构问题。会话创建与生命周期import uuidimport jsonimport osfrom datetime import datetime, timedeltafrom typing import Dict, List, Optional, Anyfrom enum import Enumfrom collections import defaultdictclass SessionState(Enum): ACTIVE = "active" IDLE = "idle" SUSPENDED = "suspended" CLOSED = "closed"class Session: def __init__(self, session_id: str, initial_context: dict = None): self.session_id = session_id self.context = initial_context or {} self.state = SessionState.ACTIVE self.created_at = datetime.now() self.last_activity = datetime.now() self.message_history: List[dict] = [] self.metadata: dict = {} def to_dict(self) -> dict: return { "session_id": self.session_id, "context": self.context, "state": self.state.value, "created_at": self.created_at.isoformat(), "last_activity": self.last_activity.isoformat(), "message_history": self.message_history, "metadata": self.metadata, } @classmethod def from_dict(cls, data: dict) -> "Session": session = cls(data["session_id"], data.get("context", {})) session.state = SessionState(data["state"]) session.created_at = datetime.fromisoformat(data["created_at"]) session.last_activity = datetime.fromisoformat(data["last_activity"]) session.message_history = data.get("message_history", []) session.metadata = data.get("metadata", {}) return sessionclass SessionManager: def __init__(self, timeout_seconds: int = 3600): self.sessions: Dict[str, Session] = {} self.session_timeout = timeout_seconds def create_session(self, initial_context: dict = None) -> str: session_id = str(uuid.uuid4()) self.sessions[session_id] = Session(session_id, initial_context) return session_id def get_session(self, session_id: str) -> Optional[Session]: session = self.sessions.get(session_id) if not session: return None if self._is_expired(session): self.close_session(session_id) return None return session def update_session(self, session_id: str, **updates) -> None: session = self.get_session(session_id) if not session: raise ValueError(f"Session {session_id} does not exist or expired") for key, value in updates.items(): if hasattr(session, key): setattr(session, key, value) session.last_activity = datetime.now() def close_session(self, session_id: str) -> None: session = self.sessions.pop(session_id, None) if session: session.state = SessionState.CLOSED def _is_expired(self, session: Session) -> bool: elapsed = (datetime.now() - session.last_activity).total_seconds() return elapsed > self.session_timeout def cleanup_expired(self) -> int: expired_ids = [ sid for sid, s in self.sessions.items() if self._is_expired(s) ] for sid in expired_ids: self.close_session(sid) return len(expired_ids)这里和简单字典实现的关键区别:Session 是独立实体,支持序列化/反序列化,为后续持久化和迁移打基础。cleanup_expired 是生产环境必须的定时清理逻辑。上下文管理器class ContextManager: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager def set_context(self, session_id: str, key: str, value: Any) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context[key] = value session.last_activity = datetime.now() def get_context(self, session_id: str, key: str, default: Any = None) -> Any: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") return session.context.get(key, default) def update_context(self, session_id: str, updates: dict) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context.update(updates) session.last_activity = datetime.now() def clear_context(self, session_id: str) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context = {} session.last_activity = datetime.now() def get_context_size(self, session_id: str) -> int: session = self.session_manager.get_session(session_id) if not session: return 0 return len(json.dumps(session.context, ensure_ascii=False).encode("utf-8")) def trim_context(self, session_id: str, max_bytes: int = 8192) -> dict: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") trimmed = {} current_size = 0 for key in reversed(list(session.context.keys())): value_json = json.dumps({key: session.context[key]}, ensure_ascii=False) entry_size = len(value_json.encode("utf-8")) if current_size + entry_size > max_bytes: break trimmed[key] = session.context[key] current_size += entry_size removed_keys = set(session.context.keys()) - set(trimmed.keys()) session.context = trimmed session.last_activity = datetime.now() return {"removed_keys": list(removed_keys), "remaining_bytes": current_size}trim_context 是上下文维护中容易被忽略但生产必须的逻辑——LLM 的上下文窗口有限,当上下文膨胀时需要裁剪。简单的 LRU 策略优先丢弃最旧的数据。消息历史管理class MessageHistoryManager: def __init__(self, session_manager: SessionManager, max_history: int = 100): self.session_manager = session_manager self.max_history = max_history def add_message( self, session_id: str, role: str, content: str, metadata: dict = None ) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") message = { "role": role, "content": content, "timestamp": datetime.now().isoformat(), "metadata": metadata or {}, } session.message_history.append(message) if len(session.message_history) > self.max_history: session.message_history = session.message_history[-self.max_history:] session.last_activity = datetime.now() def get_history(self, session_id: str, limit: int = None) -> List[dict]: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") history = session.message_history return history[-limit:] if limit else history def get_conversation_summary(self, session_id: str) -> dict: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") history = session.message_history return { "total_messages": len(history), "user_messages": sum(1 for m in history if m["role"] == "user"), "assistant_messages": sum(1 for m in history if m["role"] == "assistant"), "first_message_time": history[0]["timestamp"] if history else None, "last_message_time": history[-1]["timestamp"] if history else None, } def summarize_history(self, session_id: str, keep_recent: int = 10) -> str: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") if len(session.message_history) <= keep_recent: return "" old_messages = session.message_history[:-keep_recent] summary_parts = [] for msg in old_messages: summary_parts.append(f"[{msg['role']}]: {msg['content'][:100]}") summary = "\n".join(summary_parts) session.message_history = [ { "role": "system", "content": f"Earlier conversation summary:\n{summary}", "timestamp": old_messages[-1]["timestamp"], "metadata": {"type": "summary"}, } ] + session.message_history[-keep_recent:] session.last_activity = datetime.now() return summarysummarize_history 解决的是长对话场景下的上下文窗口溢出问题。保留最近消息的同时将早期内容压缩为摘要,这是实际 MCP 应用中常见的需求——特别是在对话持续几十轮的情况下。状态机管理class StateMachine: TRANSITIONS = { SessionState.ACTIVE: [SessionState.IDLE, SessionState.SUSPENDED, SessionState.CLOSED], SessionState.IDLE: [SessionState.ACTIVE, SessionState.CLOSED], SessionState.SUSPENDED: [SessionState.ACTIVE, SessionState.CLOSED], SessionState.CLOSED: [], } def __init__(self, session_manager: SessionManager): self.session_manager = session_manager def transition(self, session_id: str, new_state: SessionState) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") current = session.state if new_state not in self.TRANSITIONS.get(current, []): raise ValueError( f"Invalid transition: {current.value} -> {new_state.value}" ) session.state = new_state session.last_activity = datetime.now() def get_state(self, session_id: str) -> SessionState: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") return session.state def can_transition(self, session_id: str, new_state: SessionState) -> bool: session = self.session_manager.get_session(session_id) if not session: return False return new_state in self.TRANSITIONS.get(session.state, [])状态机保证会话不会从 CLOSED 状态复活,也不会跳过中间状态直接 SUSPENDED。这在多实例部署时尤其重要——状态转换必须是原子的、可审计的。会话持久化内存存储只适合开发和单机部署。生产环境必须持久化,否则进程重启所有会话丢失。class SessionPersistence: def __init__(self, storage_path: str): self.storage_path = storage_path os.makedirs(storage_path, exist_ok=True) def save_session(self, session: Session) -> None: file_path = os.path.join(self.storage_path, f"{session.session_id}.json") with open(file_path, "w", encoding="utf-8") as f: json.dump(session.to_dict(), f, indent=2, ensure_ascii=False) def load_session(self, session_id: str) -> Optional[Session]: file_path = os.path.join(self.storage_path, f"{session_id}.json") if not os.path.exists(file_path): return None with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) return Session.from_dict(data) def delete_session(self, session_id: str) -> None: file_path = os.path.join(self.storage_path, f"{session_id}.json") if os.path.exists(file_path): os.remove(file_path) def list_sessions(self) -> List[str]: return [ f[:-5] for f in os.listdir(self.storage_path) if f.endswith(".json") ]class RedisSessionPersistence: def __init__(self, redis_url: str = "redis://localhost:6379", key_prefix: str = "mcp:session:"): import redis self.client = redis.from_url(redis_url) self.key_prefix = key_prefix self.default_ttl = 3600 def save_session(self, session: Session, ttl: int = None) -> None: key = f"{self.key_prefix}{session.session_id}" data = json.dumps(session.to_dict(), ensure_ascii=False) self.client.setex(key, ttl or self.default_ttl, data) def load_session(self, session_id: str) -> Optional[Session]: key = f"{self.key_prefix}{session_id}" data = self.client.get(key) if not data: return None return Session.from_dict(json.loads(data)) def delete_session(self, session_id: str) -> None: key = f"{self.key_prefix}{session_id}" self.client.delete(key) def extend_ttl(self, session_id: str, ttl: int = None) -> None: key = f"{self.key_prefix}{session_id}" self.client.expire(key, ttl or self.default_ttl)Redis 方案解决的是水平扩展场景:多个 MCP 服务实例共享同一个 Redis,任何实例都能读取任意会话的状态。TTL 自动过期比定时清理更可靠。extend_ttl 在用户活跃时续期,实现"空闲超时"而非"绝对超时"。会话监控与指标class SessionAnalytics: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager self.metrics: Dict[str, int] = defaultdict(int) self.event_log: List[dict] = [] def track_event(self, event_type: str, session_id: str = None, detail: str = None) -> None: self.metrics[event_type] += 1 self.event_log.append({ "event_type": event_type, "session_id": session_id, "detail": detail, "timestamp": datetime.now().isoformat(), }) def get_metrics(self) -> dict: return dict(self.metrics) def get_session_statistics(self) -> dict: sessions = self.session_manager.sessions if not sessions: return { "total_sessions": 0, "active_sessions": 0, "idle_sessions": 0, "average_duration_seconds": 0.0, "total_messages": 0, } durations = [] active_count = 0 idle_count = 0 total_messages = 0 for session in sessions.values(): duration = (session.last_activity - session.created_at).total_seconds() durations.append(duration) total_messages += len(session.message_history) if session.state == SessionState.ACTIVE: active_count += 1 elif session.state == SessionState.IDLE: idle_count += 1 return { "total_sessions": len(sessions), "active_sessions": active_count, "idle_sessions": idle_count, "average_duration_seconds": sum(durations) / len(durations), "total_messages": total_messages, } def get_recent_events(self, limit: int = 50) -> List[dict]: return self.event_log[-limit:]监控不只是统计数字。event_log 记录每一次状态变更,可以用来排查"会话为什么丢失"这类线上问题。生产部署的三种会话模型2026 MCP 规范明确提出了三种会话模型,选择取决于架构:无状态模型:每次请求客户端发送完整上下文。负载均衡最友好,任何实例都能处理任何请求。缺点是网络开销大,上下文窗口大时不实际。服务端托管模型:服务端维护状态,客户端只持 session ID。实现最简单,但水平扩展需要 sticky routing 或共享存储(Redis)。适合中小规模部署。客户端托管模型:上下文存在客户端,随请求发出。结合无状态服务端,两者优势兼得——服务端无状态可水平扩展,客户端有完整控制权。缺点是客户端逻辑更复杂。实际选择建议:如果 MCP 服务只部署 1-2 个实例,服务端托管 + 文件持久化足够。如果需要 K8s 弹性伸缩,客户端托管或无状态模型更合适。Redis 持久化是折中方案——服务端托管但状态外置到 Redis。追问:MCP 会话和普通 HTTP Session 有什么区别?MCP 会话不是 HTTP Session。HTTP Session 基于浏览器 Cookie 机制,而 MCP 会话是 JSON-RPC 层面的连接生命周期管理。MCP 客户端在一个会话内可以发起多次请求/通知,会话 ID 关联的是对话上下文而非浏览器状态。此外,MCP 会话需要维护消息历史用于 LLM 的上下文窗口,这是普通 HTTP Session 不需要考虑的。