乐闻世界logo
搜索文章和话题

MCP 的数据持久化和缓存策略有哪些?

2月21日 15:51

MCP 的数据持久化和缓存策略对于提高系统性能和可靠性至关重要。以下是详细的实现方法和最佳实践:

数据持久化架构

MCP 数据持久化应考虑以下方面:

  1. 存储类型:选择合适的存储类型(关系型、文档型、键值型等)
  2. 数据模型:设计合理的数据模型
  3. 持久化策略:实现高效的数据持久化策略
  4. 缓存策略:实现多层缓存策略
  5. 数据一致性:确保数据一致性
  6. 备份恢复:实现数据备份和恢复机制

1. 数据模型设计

python
from dataclasses import dataclass from typing import Optional, Dict, Any, List from datetime import datetime from enum import Enum class DataType(Enum): """数据类型""" TOOL = "tool" RESOURCE = "resource" PROMPT = "prompt" SESSION = "session" METADATA = "metadata" @dataclass class DataRecord: """数据记录""" id: str data_type: DataType content: Dict[str, Any] created_at: datetime updated_at: datetime version: int = 1 metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {} class DataModel: """数据模型""" def __init__(self): self.records: Dict[str, DataRecord] = {} def create_record( self, data_type: DataType, content: Dict[str, Any], metadata: Dict[str, Any] = None ) -> DataRecord: """创建数据记录""" record_id = self._generate_id(data_type) now = datetime.now() record = DataRecord( id=record_id, data_type=data_type, content=content, created_at=now, updated_at=now, metadata=metadata or {} ) self.records[record_id] = record return record def update_record( self, record_id: str, content: Dict[str, Any] = None, metadata: Dict[str, Any] = None ) -> Optional[DataRecord]: """更新数据记录""" if record_id not in self.records: return None record = self.records[record_id] if content: record.content.update(content) if metadata: record.metadata.update(metadata) record.updated_at = datetime.now() record.version += 1 return record def get_record(self, record_id: str) -> Optional[DataRecord]: """获取数据记录""" return self.records.get(record_id) def delete_record(self, record_id: str) -> bool: """删除数据记录""" if record_id in self.records: del self.records[record_id] return True return False def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询数据记录""" records = list(self.records.values()) if data_type: records = [r for r in records if r.data_type == data_type] if filters: for key, value in filters.items(): records = [ r for r in records if self._match_filter(r, key, value) ] return records def _generate_id(self, data_type: DataType) -> str: """生成记录 ID""" import uuid return f"{data_type.value}_{uuid.uuid4().hex}" def _match_filter( self, record: DataRecord, key: str, value: Any ) -> bool: """匹配过滤条件""" # 检查内容 if key in record.content: return record.content[key] == value # 检查元数据 if key in record.metadata: return record.metadata[key] == value return False

2. 持久化存储实现

python
from abc import ABC, abstractmethod from typing import Dict, List, Optional import json import os from pathlib import Path class PersistenceStorage(ABC): """持久化存储基类""" @abstractmethod async def save_record(self, record: DataRecord) -> bool: """保存记录""" pass @abstractmethod async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" pass @abstractmethod async def delete_record(self, record_id: str) -> bool: """删除记录""" pass @abstractmethod async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" pass class FileStorage(PersistenceStorage): """文件存储""" def __init__(self, storage_dir: str = "data"): self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(parents=True, exist_ok=True) def _get_file_path(self, record_id: str) -> Path: """获取文件路径""" return self.storage_dir / f"{record_id}.json" async def save_record(self, record: DataRecord) -> bool: """保存记录""" file_path = self._get_file_path(record.id) try: data = { "id": record.id, "data_type": record.data_type.value, "content": record.content, "created_at": record.created_at.isoformat(), "updated_at": record.updated_at.isoformat(), "version": record.version, "metadata": record.metadata } with open(file_path, 'w') as f: json.dump(data, f, indent=2) return True except Exception as e: print(f"保存记录失败: {e}") return False async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" file_path = self._get_file_path(record_id) if not file_path.exists(): return None try: with open(file_path, 'r') as f: data = json.load(f) record = DataRecord( id=data["id"], data_type=DataType(data["data_type"]), content=data["content"], created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), version=data["version"], metadata=data.get("metadata", {}) ) return record except Exception as e: print(f"加载记录失败: {e}") return None async def delete_record(self, record_id: str) -> bool: """删除记录""" file_path = self._get_file_path(record_id) if file_path.exists(): file_path.unlink() return True return False async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" records = [] for file_path in self.storage_dir.glob("*.json"): try: with open(file_path, 'r') as f: data = json.load(f) record = DataRecord( id=data["id"], data_type=DataType(data["data_type"]), content=data["content"], created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), version=data["version"], metadata=data.get("metadata", {}) ) # 应用过滤条件 if data_type and record.data_type != data_type: continue if filters: match = True for key, value in filters.items(): if key in record.content and record.content[key] != value: match = False break if key in record.metadata and record.metadata[key] != value: match = False break if not match: continue records.append(record) except Exception as e: print(f"加载记录失败 {file_path}: {e}") return records class DatabaseStorage(PersistenceStorage): """数据库存储""" def __init__(self, database_url: str): self.database_url = database_url self._initialize_database() def _initialize_database(self): """初始化数据库""" from sqlalchemy import create_engine, Column, String, Integer, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker Base = declarative_base() class DataRecordTable(Base): __tablename__ = 'data_records' id = Column(String(100), primary_key=True) data_type = Column(String(50), nullable=False, index=True) content = Column(Text, nullable=False) created_at = Column(DateTime, nullable=False) updated_at = Column(DateTime, nullable=False) version = Column(Integer, default=1) metadata = Column(Text) self.engine = create_engine(self.database_url) Base.metadata.create_all(self.engine) self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) self.DataRecordTable = DataRecordTable async def save_record(self, record: DataRecord) -> bool: """保存记录""" session = self.SessionLocal() try: db_record = self.DataRecordTable( id=record.id, data_type=record.data_type.value, content=json.dumps(record.content), created_at=record.created_at, updated_at=record.updated_at, version=record.version, metadata=json.dumps(record.metadata) ) session.merge(db_record) session.commit() return True except Exception as e: session.rollback() print(f"保存记录失败: {e}") return False finally: session.close() async def load_record(self, record_id: str) -> Optional[DataRecord]: """加载记录""" session = self.SessionLocal() try: db_record = session.query(self.DataRecordTable).filter( self.DataRecordTable.id == record_id ).first() if not db_record: return None record = DataRecord( id=db_record.id, data_type=DataType(db_record.data_type), content=json.loads(db_record.content), created_at=db_record.created_at, updated_at=db_record.updated_at, version=db_record.version, metadata=json.loads(db_record.metadata) if db_record.metadata else {} ) return record except Exception as e: print(f"加载记录失败: {e}") return None finally: session.close() async def delete_record(self, record_id: str) -> bool: """删除记录""" session = self.SessionLocal() try: session.query(self.DataRecordTable).filter( self.DataRecordTable.id == record_id ).delete() session.commit() return True except Exception as e: session.rollback() print(f"删除记录失败: {e}") return False finally: session.close() async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" session = self.SessionLocal() try: query = session.query(self.DataRecordTable) if data_type: query = query.filter( self.DataRecordTable.data_type == data_type.value ) db_records = query.all() records = [] for db_record in db_records: record = DataRecord( id=db_record.id, data_type=DataType(db_record.data_type), content=json.loads(db_record.content), created_at=db_record.created_at, updated_at=db_record.updated_at, version=db_record.version, metadata=json.loads(db_record.metadata) if db_record.metadata else {} ) # 应用过滤条件 if filters: match = True for key, value in filters.items(): if key in record.content and record.content[key] != value: match = False break if key in record.metadata and record.metadata[key] != value: match = False break if not match: continue records.append(record) return records except Exception as e: print(f"查询记录失败: {e}") return [] finally: session.close()

3. 缓存策略实现

python
from typing import Optional, Dict, Any, List from abc import ABC, abstractmethod import time class CacheStrategy(ABC): """缓存策略基类""" @abstractmethod async def get(self, key: str) -> Optional[Any]: """获取缓存值""" pass @abstractmethod async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" pass @abstractmethod async def delete(self, key: str) -> bool: """删除缓存值""" pass @abstractmethod async def clear(self): """清空缓存""" pass class MemoryCache(CacheStrategy): """内存缓存""" def __init__(self, max_size: int = 1000, default_ttl: int = 300): self.max_size = max_size self.default_ttl = default_ttl self.cache: Dict[str, tuple] = {} async def get(self, key: str) -> Optional[Any]: """获取缓存值""" if key not in self.cache: return None value, timestamp, ttl = self.cache[key] # 检查是否过期 if ttl and time.time() - timestamp > ttl: del self.cache[key] return None return value async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" # 检查缓存大小 if len(self.cache) >= self.max_size: self._evict() ttl = ttl or self.default_ttl self.cache[key] = (value, time.time(), ttl) async def delete(self, key: str) -> bool: """删除缓存值""" if key in self.cache: del self.cache[key] return True return False async def clear(self): """清空缓存""" self.cache.clear() def _evict(self): """淘汰缓存项""" # 简单实现:随机淘汰 if self.cache: key = next(iter(self.cache)) del self.cache[key] class RedisCache(CacheStrategy): """Redis 缓存""" def __init__(self, redis_url: str = "redis://localhost:6379/0"): import redis.asyncio as aioredis self.redis = aioredis.from_url(redis_url) async def get(self, key: str) -> Optional[Any]: """获取缓存值""" try: value = await self.redis.get(key) if value: return json.loads(value) return None except Exception as e: print(f"获取缓存失败: {e}") return None async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" try: serialized_value = json.dumps(value) if ttl: await self.redis.setex(key, ttl, serialized_value) else: await self.redis.set(key, serialized_value) except Exception as e: print(f"设置缓存失败: {e}") async def delete(self, key: str) -> bool: """删除缓存值""" try: result = await self.redis.delete(key) return result > 0 except Exception as e: print(f"删除缓存失败: {e}") return False async def clear(self): """清空缓存""" try: await self.redis.flushdb() except Exception as e: print(f"清空缓存失败: {e}") class MultiLevelCache: """多级缓存""" def __init__( self, l1_cache: CacheStrategy, l2_cache: CacheStrategy = None ): self.l1_cache = l1_cache self.l2_cache = l2_cache async def get(self, key: str) -> Optional[Any]: """获取缓存值""" # 先从 L1 缓存获取 value = await self.l1_cache.get(key) if value is not None: return value # 从 L2 缓存获取 if self.l2_cache: value = await self.l2_cache.get(key) if value is not None: # 回填 L1 缓存 await self.l1_cache.set(key, value) return value return None async def set(self, key: str, value: Any, ttl: int = None): """设置缓存值""" # 同时设置 L1 和 L2 缓存 await self.l1_cache.set(key, value, ttl) if self.l2_cache: await self.l2_cache.set(key, value, ttl) async def delete(self, key: str) -> bool: """删除缓存值""" # 同时删除 L1 和 L2 缓存 l1_deleted = await self.l1_cache.delete(key) l2_deleted = True if self.l2_cache: l2_deleted = await self.l2_cache.delete(key) return l1_deleted or l2_deleted async def clear(self): """清空缓存""" await self.l1_cache.clear() if self.l2_cache: await self.l2_cache.clear()

4. 数据持久化管理器

python
from typing import Optional, Dict, Any, List class DataPersistenceManager: """数据持久化管理器""" def __init__( self, storage: PersistenceStorage, cache: CacheStrategy = None ): self.storage = storage self.cache = cache self.data_model = DataModel() async def save_record( self, data_type: DataType, content: Dict[str, Any], metadata: Dict[str, Any] = None, use_cache: bool = True ) -> Optional[DataRecord]: """保存记录""" # 创建记录 record = self.data_model.create_record( data_type, content, metadata ) # 持久化存储 success = await self.storage.save_record(record) if not success: return None # 更新缓存 if use_cache and self.cache: await self.cache.set(record.id, record) return record async def load_record( self, record_id: str, use_cache: bool = True ) -> Optional[DataRecord]: """加载记录""" # 先从缓存获取 if use_cache and self.cache: record = await self.cache.get(record_id) if record: return record # 从存储加载 record = await self.storage.load_record(record_id) if record and use_cache and self.cache: # 更新缓存 await self.cache.set(record.id, record) return record async def update_record( self, record_id: str, content: Dict[str, Any] = None, metadata: Dict[str, Any] = None, use_cache: bool = True ) -> Optional[DataRecord]: """更新记录""" # 更新数据模型 record = self.data_model.update_record( record_id, content, metadata ) if not record: return None # 持久化存储 success = await self.storage.save_record(record) if not success: return None # 更新缓存 if use_cache and self.cache: await self.cache.set(record.id, record) return record async def delete_record( self, record_id: str, use_cache: bool = True ) -> bool: """删除记录""" # 从存储删除 success = await self.storage.delete_record(record_id) if not success: return False # 从缓存删除 if use_cache and self.cache: await self.cache.delete(record_id) # 从数据模型删除 self.data_model.delete_record(record_id) return True async def query_records( self, data_type: DataType = None, filters: Dict[str, Any] = None ) -> List[DataRecord]: """查询记录""" return await self.storage.query_records(data_type, filters)

5. 数据备份和恢复

python
import shutil from typing import Optional from datetime import datetime class BackupManager: """备份管理器""" def __init__(self, storage_dir: str = "data", backup_dir: str = "backups"): self.storage_dir = Path(storage_dir) self.backup_dir = Path(backup_dir) self.backup_dir.mkdir(parents=True, exist_ok=True) async def create_backup(self, backup_name: str = None) -> Optional[str]: """创建备份""" if not backup_name: backup_name = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = self.backup_dir / backup_name try: # 创建备份目录 backup_path.mkdir(parents=True, exist_ok=True) # 复制数据文件 for file_path in self.storage_dir.glob("*.json"): shutil.copy2(file_path, backup_path / file_path.name) # 创建备份元数据 metadata = { "backup_name": backup_name, "created_at": datetime.now().isoformat(), "file_count": len(list(backup_path.glob("*.json"))) } metadata_path = backup_path / "backup_metadata.json" with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) return backup_name except Exception as e: print(f"创建备份失败: {e}") return None async def restore_backup(self, backup_name: str) -> bool: """恢复备份""" backup_path = self.backup_dir / backup_name if not backup_path.exists(): print(f"备份不存在: {backup_name}") return False try: # 清空当前数据目录 for file_path in self.storage_dir.glob("*.json"): file_path.unlink() # 复制备份文件 for file_path in backup_path.glob("*.json"): if file_path.name != "backup_metadata.json": shutil.copy2(file_path, self.storage_dir / file_path.name) return True except Exception as e: print(f"恢复备份失败: {e}") return False async def list_backups(self) -> List[Dict[str, Any]]: """列出所有备份""" backups = [] for backup_path in self.backup_dir.iterdir(): if not backup_path.is_dir(): continue metadata_path = backup_path / "backup_metadata.json" if metadata_path.exists(): with open(metadata_path, 'r') as f: metadata = json.load(f) backups.append(metadata) return backups async def delete_backup(self, backup_name: str) -> bool: """删除备份""" backup_path = self.backup_dir / backup_name if backup_path.exists(): shutil.rmtree(backup_path) return True return False

最佳实践:

  1. 分层存储:根据数据访问频率选择合适的存储类型
  2. 缓存策略:实现多级缓存,提高访问速度
  3. 数据一致性:确保缓存和存储的数据一致性
  4. 定期备份:定期创建数据备份,防止数据丢失
  5. 监控告警:监控存储和缓存的健康状态
  6. 性能优化:优化数据访问和存储性能

通过完善的数据持久化和缓存策略,可以确保 MCP 系统的高性能和可靠性。

标签:MCP