MCP 的会话管理和上下文维护是确保对话连续性和状态一致性的关键。以下是详细的实现方法:
会话管理基础
MCP 会话包含会话 ID、上下文数据、状态信息等:
python{ "session_id": "unique-session-id", "context": {}, "state": "active", "created_at": "2024-01-01T00:00:00Z", "last_activity": "2024-01-01T00:00:00Z" }
1. 会话创建和管理
pythonimport uuid from datetime import datetime from typing import Dict, Optional class SessionManager: def __init__(self): self.sessions = {} self.session_timeout = 3600 # 1小时 def create_session(self, initial_context: dict = None) -> str: """创建新会话""" session_id = str(uuid.uuid4()) self.sessions[session_id] = { "session_id": session_id, "context": initial_context or {}, "state": "active", "created_at": datetime.now(), "last_activity": datetime.now(), "message_history": [] } return session_id def get_session(self, session_id: str) -> Optional[dict]: """获取会话""" if session_id not in self.sessions: return None session = self.sessions[session_id] # 检查会话是否过期 if self._is_session_expired(session): self.close_session(session_id) return None return session def update_session(self, session_id: str, updates: dict): """更新会话""" session = self.get_session(session_id) if not session: raise ValueError(f"会话 {session_id} 不存在或已过期") session.update(updates) session["last_activity"] = datetime.now() def close_session(self, session_id: str): """关闭会话""" if session_id in self.sessions: self.sessions[session_id]["state"] = "closed" del self.sessions[session_id] def _is_session_expired(self, session: dict) -> bool: """检查会话是否过期""" elapsed = (datetime.now() - session["last_activity"]).total_seconds() return elapsed > self.session_timeout
2. 上下文管理
pythonclass ContextManager: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager def set_context( self, session_id: str, key: str, value: any ): """设置上下文值""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") session["context"][key] = value self.session_manager.update_session(session_id, { "context": session["context"] }) def get_context( self, session_id: str, key: str, default: any = None ) -> any: """获取上下文值""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") return session["context"].get(key, default) def update_context( self, session_id: str, updates: dict ): """批量更新上下文""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") session["context"].update(updates) self.session_manager.update_session(session_id, { "context": session["context"] }) def clear_context(self, session_id: str): """清空上下文""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") session["context"] = {} self.session_manager.update_session(session_id, { "context": session["context"] })
3. 消息历史管理
pythonfrom typing import List class MessageHistoryManager: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager self.max_history = 100 # 最多保存 100 条消息 def add_message( self, session_id: str, role: str, content: str, metadata: dict = None ): """添加消息到历史""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") message = { "role": role, # "user", "assistant", "system" "content": content, "timestamp": datetime.now().isoformat(), "metadata": metadata or {} } session["message_history"].append(message) # 限制历史记录大小 if len(session["message_history"]) > self.max_history: session["message_history"] = \ session["message_history"][-self.max_history:] self.session_manager.update_session(session_id, { "message_history": session["message_history"] }) def get_history( self, session_id: str, limit: int = None ) -> List[dict]: """获取消息历史""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") history = session["message_history"] if limit: return history[-limit:] return history def get_conversation_summary( self, session_id: str ) -> dict: """获取对话摘要""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") history = session["message_history"] return { "total_messages": len(history), "user_messages": len([ m for m in history if m["role"] == "user" ]), "assistant_messages": len([ m for m in history if m["role"] == "assistant" ]), "first_message": history[0] if history else None, "last_message": history[-1] if history else None }
4. 状态机管理
pythonfrom enum import Enum class SessionState(Enum): ACTIVE = "active" IDLE = "idle" SUSPENDED = "suspended" CLOSED = "closed" class StateMachine: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager self.transitions = { SessionState.ACTIVE: [SessionState.IDLE, SessionState.SUSPENDED, SessionState.CLOSED], SessionState.IDLE: [SessionState.ACTIVE, SessionState.CLOSED], SessionState.SUSPENDED: [SessionState.ACTIVE, SessionState.CLOSED], SessionState.CLOSED: [] } def transition_state( self, session_id: str, new_state: SessionState ): """转换会话状态""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") current_state = SessionState(session["state"]) # 验证状态转换 if new_state not in self.transitions.get(current_state, []): raise ValueError( f"无效的状态转换: {current_state} -> {new_state}" ) # 执行状态转换 self.session_manager.update_session(session_id, { "state": new_state.value }) def get_state(self, session_id: str) -> SessionState: """获取当前状态""" session = self.session_manager.get_session(session_id) if not session: raise ValueError("会话不存在") return SessionState(session["state"])
5. 会话持久化
pythonimport json import os from typing import Optional class SessionPersistence: def __init__(self, storage_path: str): self.storage_path = storage_path os.makedirs(storage_path, exist_ok=True) def save_session(self, session_id: str, session: dict): """保存会话到磁盘""" file_path = os.path.join( self.storage_path, f"{session_id}.json" ) # 转换 datetime 对象为字符串 session_copy = session.copy() for key, value in session_copy.items(): if isinstance(value, datetime): session_copy[key] = value.isoformat() with open(file_path, 'w') as f: json.dump(session_copy, f, indent=2) def load_session(self, session_id: str) -> Optional[dict]: """从磁盘加载会话""" file_path = os.path.join( self.storage_path, f"{session_id}.json" ) if not os.path.exists(file_path): return None with open(file_path, 'r') as f: session = json.load(f) # 转换字符串为 datetime 对象 for key in ["created_at", "last_activity"]: if key in session and isinstance(session[key], str): session[key] = datetime.fromisoformat(session[key]) return session def delete_session(self, session_id: str): """删除会话文件""" file_path = os.path.join( self.storage_path, f"{session_id}.json" ) if os.path.exists(file_path): os.remove(file_path) def list_sessions(self) -> List[str]: """列出所有会话""" sessions = [] for filename in os.listdir(self.storage_path): if filename.endswith('.json'): session_id = filename[:-5] # 移除 .json 扩展名 sessions.append(session_id) return sessions
6. 会话监控和分析
pythonfrom collections import defaultdict class SessionAnalytics: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager self.metrics = defaultdict(int) def track_event(self, event_type: str, session_id: str = None): """跟踪事件""" self.metrics[event_type] += 1 def get_metrics(self) -> dict: """获取指标""" return dict(self.metrics) def get_session_statistics(self) -> dict: """获取会话统计""" sessions = self.session_manager.sessions return { "total_sessions": len(sessions), "active_sessions": len([ s for s in sessions.values() if s["state"] == "active" ]), "average_session_duration": self._calculate_avg_duration(sessions), "total_messages": sum( len(s["message_history"]) for s in sessions.values() ) } def _calculate_avg_duration(self, sessions: dict) -> float: """计算平均会话时长""" if not sessions: return 0.0 durations = [ (s["last_activity"] - s["created_at"]).total_seconds() for s in sessions.values() ] return sum(durations) / len(durations)
最佳实践:
- 会话隔离:确保不同会话之间的上下文完全隔离
- 超时管理:合理设置会话超时时间,自动清理过期会话
- 持久化策略:对重要会话实施持久化,防止数据丢失
- 状态转换:使用状态机管理会话状态转换
- 监控和分析:监控会话指标,分析用户行为
- 资源清理:定期清理不再使用的会话和资源
通过完善的会话管理和上下文维护机制,可以确保 MCP 系统的对话连续性和状态一致性。