面试题手册

梳理高频技术问题,帮助你按主题复习和查漏补缺。

前端阅读 05月28日 07:07

Puppeteer 在实际项目中怎么用?

Puppeteer 是 Google 维护的 Node.js 浏览器自动化库,通过 Chrome DevTools Protocol 控制无头浏览器。它的实际应用远不止"跑个脚本打开网页",在爬虫、测试、文档生成、性能监控等场景中都是生产级方案。核心应用场景一览| 场景 | 典型用途 | 复杂度 ||------|---------|--------|| 网页爬虫 | SPA 数据采集、价格监控 | 中 || 自动化测试 | E2E 测试、视觉回归 | 中高 || PDF 生成 | 报表、发票批量输出 | 低 || 性能监控 | 页面加载分析、Core Web Vitals | 中 || SEO 审计 | 页面结构检查、可访问性扫描 | 低 || 自动化运维 | 表单批量填写、数据录入 | 中 |下面逐个场景拆解关键实现和踩坑要点。网页爬虫:SPA 和动态内容的克星传统爬虫(requests/axios)面对 Vue、React 渲染的页面基本无能为力,因为拿到的 HTML 只是空壳。Puppeteer 的优势在于它能等 JavaScript 执行完毕再提取数据。价格监控是最常见的爬虫场景。核心逻辑:启动浏览器 → 设置 User-Agent 伪装 → 等待目标元素出现 → 提取数据。一段精简实现:const puppeteer = require('puppeteer');async function monitorPrice(url) { const browser = await puppeteer.launch({ headless: 'new' }); const page = await browser.newPage(); await page.setUserAgent( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ); await page.goto(url, { waitUntil: 'networkidle2' }); await page.waitForSelector('.price', { timeout: 10000 }); const data = await page.evaluate(() => ({ title: document.querySelector('.product-title')?.textContent, price: document.querySelector('.price')?.textContent, })); await browser.close(); return data;}踩坑经验: networkidle2 不等于页面完全加载。如果目标元素是懒加载的,建议用 waitForSelector 配合超时做二次保障。另外,大批量采集时务必控制并发数,同时打开 20 个标签页会直接把内存撑爆。反爬虫要点: 裸跑 Puppeteer 会被大多数反爬系统识别——navigator.webdriver 属性默认为 true,WebGL 指纹也暴露无头浏览器特征。生产环境中需要配合 puppeteer-extra-plugin-stealth 插件修补这些泄露点,或者使用代理池轮换 IP。自动化测试:E2E 与视觉回归Puppeteer 在测试领域有两个典型用法:端到端流程测试——模拟用户完整操作路径,验证业务逻辑正确性。比如注册-登录-下单流程,每个步骤的页面跳转和状态变化都能断言。关键技巧是用 Promise.all 包裹点击和等待导航,避免竞态条件:await Promise.all([ page.waitForNavigation(), page.click('#submit-button'),]);视觉回归测试——截取页面快照与基线图对比,像素级检测 UI 变更。核心依赖 pixelmatch 库做图片 diff,差异超过阈值(通常 0.5%)即判定为回归。实际项目中建议把视觉回归集成到 CI 流程,每次提交自动跑一遍。注意截图的稳定性:字体渲染、动画状态、抗锯齿差异都可能产生误报。解决方法是截图前等动画完成,并用固定视口宽度。PDF 生成:报表和发票的批量引擎服务端生成 PDF 是个老大难问题。用 PDFKit 手动排版太痛苦,用 wkhtmltopdf 中文渲染经常出问题。Puppeteer 的方案最直接:渲染 HTML → 调用 page.pdf() 输出。await page.setContent(htmlContent);await page.pdf({ path: 'report.pdf', format: 'A4', printBackground: true, margin: { top: '20px', bottom: '20px', left: '20px', right: '20px' },});批量生成发票时,不要每个发票都启停浏览器。用一个浏览器实例复用 Page,速度能提升 5-10 倍。但要注意内存泄漏——每次 setContent 后如果页面越来越慢,说明需要定期 page.close() 再开新 Page。性能监控:比 Lighthouse 更灵活Lighthouse 适合一次性审计,但线上持续监控需要自定义方案。Puppeteer 可以精确采集每个页面的 FCP、LCP、DOM 节点数等指标,写入时序数据库做趋势分析。const client = await page.target().createCDPSession();await client.send('Performance.enable');await page.goto(url, { waitUntil: 'networkidle2' });const fcp = await page.evaluate(() => performance.getEntriesByType('paint') .find(e => e.name === 'first-contentful-paint')?.startTime);通过 CDP Session 还能拦截网络请求、监控 JS 堆内存变化,这些是 Lighthouse 做不到的细粒度采集。SEO 审计:自动化页面健康检查Puppeteer 可以批量扫描网站的 SEO 问题:缺少 title 标签、meta description 过长、H1 缺失或重复、图片缺少 alt 属性等。核心是 page.evaluate 在页面上下文中执行 DOM 查询,把结果结构化返回。相比纯 HTTP 请求的方式,Puppeteer 能检查 JS 渲染后的真实 DOM,对 SPA 应用尤其重要——很多 SPA 的 SEO 问题只有运行后才能发现。请求拦截与资源优化这是一个跨场景的通用技巧。通过拦截请求可以大幅降低资源消耗:await page.setRequestInterception(true);page.on('request', (req) => { const blocked = ['image', 'font', 'stylesheet']; blocked.includes(req.resourceType()) ? req.abort() : req.continue();});爬虫场景下屏蔽图片和字体能提速 40% 以上;测试场景下可以 mock 接口返回,实现更可控的测试环境。面试高频追问Q: Puppeteer 和 Playwright 怎么选?Puppeteer 只支持 Chromium,API 简洁,适合 Chrome 专属场景。Playwright 支持三浏览器(Chromium/Firefox/WebKit),自动等待机制更智能,新增项目推荐 Playwright。但 Puppeteer 生态更成熟,puppeteer-extra 插件体系(stealth、recaptcha)在爬虫场景无可替代。选型看需求:爬虫偏 Puppeteer,跨浏览器测试偏 Playwright。Q: 无头浏览器如何降低被检测概率?三层防护:第一层用 stealth 插件修补 navigator.webdriver、Chrome 对象等指纹;第二层用 ghost-cursor 模拟真人鼠标轨迹,避免点击坐标过于精确;第三层用代理池轮换 IP 和 User-Agent,避免单 IP 高频请求触发风控。没有银弹,三层全上才能通过中高级反爬。Q: Puppeteer 采集任务如何稳定运行在生产环境?三个关键点:一是进程管理,用 puppeteer.connect 连接常驻浏览器实例而非每次启动,配合 pm2 做进程守护;二是内存控制,每处理 50 个页面重启一次浏览器,防止内存泄漏积累;三是错误恢复,page.on('error') 监听页面崩溃,browser.on('disconnected') 监听浏览器断连,两者都要有自动重连逻辑。Puppeteer 的应用边界还在扩展——AI Agent 的浏览器操作层、RPA 流程自动化、竞品数据监控,都是 2026 年依然活跃的场景。掌握核心 API 再结合上述实战经验,基本能覆盖日常开发中 90% 的浏览器自动化需求。
服务端阅读 05月28日 07:06

Ollama API 有哪些核心端点,怎样正确调用?

Ollama 启动后默认在 http://localhost:11434 提供 RESTful API,所有端点均以 JSON 交互。调用前先验证服务是否就绪:curl http://localhost:11434# 返回 "Ollama is running" 即表示正常文本生成与对话POST /api/generate —— 单轮文本生成向模型发送 prompt 并获取生成结果,适合一次性问答、代码补全等场景:curl http://localhost:11434/api/generate -d '{ "model": "qwen2.5:7b", "prompt": "用 Python 写一个快速排序", "stream": false}'关键参数:stream 控制是否流式返回(默认 true),options 可设置 temperature、num_predict 等模型超参。非流式响应在 response 字段返回完整文本,流式响应逐行返回 JSON 片段,每片包含 response 和 done 字段。POST /api/chat —— 多轮对话支持 messages 数组传入对话历史,是构建聊天应用的核心端点:curl http://localhost:11434/api/chat -d '{ "model": "qwen2.5:7b", "messages": [ {"role": "system", "content": "你是一个有帮助的助手"}, {"role": "user", "content": "解释一下 RAG 的原理"}, {"role": "assistant", "content": "RAG 是检索增强生成..."}, {"role": "user", "content": "它和微调有什么区别?"} ], "stream": false}'角色支持 system、user、assistant 三种,对话历史越长上下文越完整,但也会增加显存占用和响应延迟。模型管理GET /api/tags —— 列出本地模型返回已下载模型的名称、大小和修改时间:curl http://localhost:11434/api/tags响应中 models 数组的每个元素包含 name、size、modified_at 等字段。当你不确定本地有哪些模型可用时,先调这个接口。POST /api/show —— 查看模型详情获取模型的模版、系统提示词、参数等元信息:curl http://localhost:11434/api/show -d '{ "name": "qwen2.5:7b"}'返回的 template 字段是模型使用的提示词模板,parameters 是默认参数,system 是内置系统提示词。调试模型行为时很有用。POST /api/pull —— 下载模型从 Ollama 仓库拉取模型到本地:curl http://localhost:11434/api/pull -d '{ "name": "qwen2.5:7b"}'大模型下载耗时较长,默认以流式返回进度信息(status: "pulling ..."),完成后 status 变为 success。POST /api/copy —— 复制模型创建模型副本,常用于在微调前备份原始模型:curl http://localhost:11434/api/copy -d '{ "source": "qwen2.5:7b", "destination": "my-qwen-backup"}'DELETE /api/delete —— 删除模型删除本地模型释放磁盘空间:curl -X DELETE http://localhost:11434/api/delete -d '{ "name": "my-qwen-backup"}'注意此操作不可恢复,删除后需要重新 pull。向量与嵌入POST /api/embeddings —— 生成文本向量将文本转为向量表示,是构建 RAG 应用的关键端点:curl http://localhost:11434/api/embeddings -d '{ "model": "nomic-embed-text", "prompt": "什么是向量数据库?"}'返回的 embedding 字段是浮点数数组,维度取决于模型。嵌入模型推荐使用 nomic-embed-text 或 mxbai-embed-large,它们专为文本向量化设计,不要用对话模型生成嵌入。OpenAI 兼容端点POST /v1/chat/completionsOllama 提供与 OpenAI API 兼容的端点,方便现有项目零改造迁移:curl http://localhost:11434/v1/chat/completions -H "Content-Type: application/json" -d '{ "model": "qwen2.5:7b", "messages": [{"role": "user", "content": "你好"}], "stream": false}'同理 /v1/completions 和 /v1/embeddings 也已支持。将 OpenAI SDK 的 base_url 改为 http://localhost:11434/v1,api_key 填任意字符串即可使用。Python 集成除了直接 HTTP 调用,Ollama 官方提供了 Python SDK:pip install ollamaimport ollama# 单轮生成response = ollama.generate(model='qwen2.5:7b', prompt='用 Python 写一个快速排序')print(response['response'])# 多轮对话stream = ollama.chat( model='qwen2.5:7b', messages=[{'role': 'user', 'content': '解释一下 RAG 的原理'}], stream=True)for chunk in stream: print(chunk['message']['content'], end='', flush=True)常见问题Q: 调用 API 返回 connection refused?确认 Ollama 服务已启动。macOS 检查是否在运行,Linux 执行 systemctl status ollama。若 Ollama 监听在非默认端口,需设置环境变量 OLLAMA_HOST。Q: 生成速度很慢怎么办?检查是否使用了 GPU。执行 ollama run qwen2.5:7b 进入交互模式后输入 /show info 查看推理设备。若显示 CPU 而非 GPU,需安装对应的 CUDA 或 Metal 驱动。Q: 如何在局域网内其他机器访问?默认 Ollama 只监听 localhost。设置 OLLAMA_HOST=0.0.0.0 后重启服务即可开放局域网访问,但务必注意此操作无认证保护,不要暴露到公网。
服务端阅读 05月28日 07:06

Ollama 生产环境部署有哪些关键点和最佳实践?

核心回答Ollama 生产环境部署的核心在于三点:GPU 资源规划决定推理性能上限,反向代理与认证保障接口安全,监控与并发配置维持服务稳定。实际落地中,Ollama 更适合中小规模内网场景和企业私有化 Agent 部署,若面对高并发 API 服务需求,需评估 vLLM 等专业推理框架。硬件选型与系统要求GPU 是影响推理速度的决定性因素。生产环境推荐 NVIDIA T4 及以上,CUDA 11.0+ 驱动。Apple Silicon(M1/M2/M3/M4)凭借统一内存架构,单机也能跑 70B 参数模型。内存和存储的底线配置:8GB RAM:可跑 3B-7B 模型(qwen3:8b、llama3.3:8b)16GB RAM:适合 7B-14B 模型(deepseek-v3:16b、qwen-coder:14b)32GB RAM:14B-32B 模型(deepseek-r1:32b)64GB+ RAM:32B-70B 模型(llama3.3:70b)存储必须用 SSD,每个模型占用 4-20GB,机械硬盘会导致加载延迟不可接受。操作系统优先选 Linux(Ubuntu 20.04+),macOS 11+ 和 Windows 10/11 也支持。部署方式选择单机直接部署最简单的方式,适合开发测试和小规模内网使用:# 安装后直接启动,默认监听 0.0.0.0:11434ollama serve# 预加载模型,避免首次请求冷启动ollama run llama3.1 &注意:生产环境不要把 11434 端口直接暴露到公网,必须通过反向代理加认证。Docker 容器部署推荐的服务器部署方式,环境一致性好,方便版本管理:# 基础启动,挂载模型持久化存储docker run -d -v ollama:/root/.ollama -p 11434:11434 --gpus all ollama/ollama:0.18.0关键点:务必指定版本标签(如 0.18.0),不要用 latest。版本更新可能引入不兼容变更,生产环境需要可控的升级节奏。自定义 Modelfile 的 Dockerfile 示例:FROM ollama/ollama:0.18.0COPY my-model.gguf /root/.ollama/models/CMD ["ollama", "serve"]Kubernetes 集群部署企业级多实例场景选择 K8s,配合 GPU 调度和水平自动扩缩容。核心资源清单包括:Deployment(挂载 PVC 存模型)、Service、Ingress(TLS 终结),以及 nvidia.com/gpu 资源请求。安全配置:必须做的三件事1. 反向代理 + TLS + 认证永远不要裸奔暴露 Ollama API。用 Nginx 做 TLS 终结和 Basic Auth:upstream ollama_backend { server 192.168.1.10:11434; server 192.168.1.11:11434;}server { listen 443 ssl; server_name ollama.example.com; ssl_certificate /etc/nginx/ssl/cert.pem; ssl_certificate_key /etc/nginx/ssl/key.pem; location /api/ { auth_basic "Restricted"; auth_basic_user_file /etc/nginx/.htpasswd; proxy_pass http://ollama_backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; }}2. 网络层防火墙只允许应用服务器所在网段访问 11434 端口:ufw allow from 192.168.1.0/24 to any port 114343. 速率限制在 Nginx 层配置 limit_req,防止单个客户端耗尽推理资源。负载均衡与高可用多实例场景用 Nginx upstream 做负载均衡,推荐 least_conn 策略(推理请求耗时不均匀,轮询会导致热点):upstream ollama_backend { least_conn; server 192.168.1.10:11434; server 192.168.1.11:11434; server 192.168.1.12:11434;}健康检查通过 Ollama 自带的模型列表接口:curl http://localhost:11434/api/tags返回 200 表示实例正常,可纳入负载均衡池。性能调优并发配置Modelfile 中设置并行请求数:PARAMETER num_parallel 4或通过环境变量 OLLAMA_NUM_PARALLEL 全局控制。数值要根据 GPU 显存大小调整——设置过大会 OOM,过小则吞吐不足。T4 实测开启动态批处理后吞吐可提升约 40%。上下文窗口通过 OLLAMA_MAX_LOADED_MODELS 控制同时加载的模型数量,避免显存碎片化。Flash Attention启用 Flash Attention 可显著降低长上下文的显存占用和推理延迟,Ollama 默认支持,确保 CUDA 版本兼容即可。Keep-Alive 策略生产服务建议设置较长的 keep-alive,避免模型频繁卸载重载:export OLLAMA_KEEP_ALIVE=24h监控与运维核心监控指标GPU 利用率和显存占用(nvidia-smi 或 Prometheus DCGM Exporter)推理延迟 P50/P95/P99请求队列深度和超时率模型加载/卸载频率日志管理# 实时日志ollama logs -f# 调整日志级别export OLLAMA_LOG_LEVEL=debug生产环境建议接入 ELK 或 Loki 统一收集,配合 Grafana 做告警看板。备份与故障恢复# 备份整个 Ollama 数据目录(包含模型和配置)tar -czf ollama-backup-$(date +%Y%m%d).tar.gz ~/.ollama/# 恢复tar -xzf ollama-backup-20260528.tar.gz -C ~/注意:只备份 Modelfile 和自定义配置即可,官方模型可以重新 pull,不必浪费存储空间。Ollama vs vLLM:生产场景选型这是面试中常见的追问方向。两者定位不同:| 维度 | Ollama | vLLM ||------|--------|------|| 定位 | 开发者本地部署 | 生产级高并发推理 || 安装复杂度 | 极低(一条命令) | 较高(需编译配置) || 并发能力 | 单机有限 | PagedAttention 优化,高并发强 || OpenAI 兼容 | 原生支持 | 原生支持 || 适合场景 | 内网工具、私有 Agent | 对外 API、高 QPS 服务 |简单判断:内部工具、低 QPS 场景选 Ollama;对外服务、高并发需求选 vLLM。很多团队两者并用——Ollama 做开发测试,vLLM 做生产推理。追问:Ollama 离线部署怎么做?模型下载到本地后,Ollama 所有推理功能完全离线运行。气隙环境(air-gapped)的部署步骤:在联网机器上 ollama pull 所需模型打包 ~/.ollama/models/ 目录传输到目标机器,解压到相同路径启动 ollama serve 即可使用金融、医疗、军工等数据敏感行业,Ollama 的离线能力是选择它的核心理由之一。
服务端阅读 05月28日 07:03

如何在 Python、JavaScript 等编程语言中集成 Ollama?

Python 集成 OllamaPython 是 Ollama 生态中最成熟的集成语言,官方提供了 ollama 库,同时也兼容 LangChain、LlamaIndex 等主流框架。安装与基础调用pip install ollama最简单的文本生成方式:import ollamaresponse = ollama.generate(model='llama3.1', prompt='用一句话解释什么是递归')print(response['response'])generate 方法适用于单轮补全场景,直接传入 prompt 即可。多轮对话对话场景使用 chat 方法,通过 messages 数组维护上下文:import ollamamessages = [ {'role': 'user', 'content': 'Python 的 GIL 是什么?'}, {'role': 'assistant', 'content': 'GIL 是全局解释器锁,它确保同一时刻只有一个线程执行 Python 字节码。'}, {'role': 'user', 'content': '那多线程还有意义吗?'}]response = ollama.chat(model='llama3.1', messages=messages)print(response['message']['content'])每轮对话都需要把完整的消息历史传入,模型本身不保存状态。流式响应对于长文本生成,流式输出可以显著改善用户体验:import ollamastream = ollama.chat( model='llama3.1', messages=[{'role': 'user', 'content': '写一篇关于量子计算的科普文章'}], stream=True)for chunk in stream: print(chunk['message']['content'], end='', flush=True)流式模式下,chat 返回的是一个生成器,每次 yield 一个 token 片段。结构化输出当你需要模型返回 JSON 格式的数据时,可以用 format 参数约束输出:import ollamaresponse = ollama.chat( model='llama3.1', messages=[{'role': 'user', 'content': '列出三个编程语言及其主要用途'}], format='json')import jsondata = json.loads(response['message']['content'])print(data)这在构建 API 服务或数据抽取管线时特别有用。LangChain 集成如果项目已经使用 LangChain,Ollama 可以直接作为 LLM 后端:from langchain_community.llms import Ollamafrom langchain.prompts import ChatPromptTemplatefrom langchain.schema import StrOutputParserllm = Ollama(model="llama3.1")# 链式调用prompt = ChatPromptTemplate.from_template("用{style}风格解释{concept}")chain = prompt | llm | StrOutputParser()result = chain.invoke({"style": "通俗易懂", "concept": "分布式系统"})print(result)LangChain 的 Agent、RAG 等高级能力都可以基于 Ollama 后端运行,无需 OpenAI API Key。JavaScript / Node.js 集成 Ollama前端和 Node.js 项目通过 ollama npm 包接入,API 风格与 Python 库保持一致。安装与基础调用npm install ollamaimport ollama from 'ollama'// 文本生成const response = await ollama.generate({ model: 'llama3.1', prompt: '用一句话解释什么是闭包'})console.log(response.response)// 多轮对话const chat = await ollama.chat({ model: 'llama3.1', messages: [ { role: 'user', content: 'Node.js 适合做什么?' }, { role: 'assistant', content: 'Node.js 适合 I/O 密集型应用,比如 Web 服务、实时通信。' }, { role: 'user', content: '那 CPU 密集型任务呢?' } ]})console.log(chat.message.content)流式响应import ollama from 'ollama'const stream = await ollama.chat({ model: 'llama3.1', messages: [{ role: 'user', content: '详细解释 JavaScript 的事件循环' }], stream: true})for await (const chunk of stream) { process.stdout.write(chunk.message.content)}浏览器端调用Ollama 默认只监听 localhost,浏览器页面跨域调用需要配置 OLLAMA_ORIGINS 环境变量:OLLAMA_ORIGINS="http://localhost:3000" ollama serve配置后即可在前端代码中直接调用:const response = await fetch('http://localhost:11434/api/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: 'llama3.1', messages: [{ role: 'user', content: 'Hello' }] })})Go 集成 OllamaGo 生态目前没有官方封装库,直接调用 REST API 即可,Ollama 的 API 设计简洁,手动调用并不复杂。HTTP 调用示例package mainimport ( "bytes" "encoding/json" "fmt" "io" "net/http")func main() { payload := map[string]interface{}{ "model": "llama3.1", "prompt": "Go 语言的协程和线程有什么区别?", "stream": false, } body, _ := json.Marshal(payload) resp, err := http.Post( "http://localhost:11434/api/generate", "application/json", bytes.NewBuffer(body), ) if err != nil { fmt.Println("请求失败:", err) return } defer resp.Body.Close() data, _ := io.ReadAll(resp.Body) fmt.Println(string(data))}流式响应处理Go 处理流式响应需要逐行读取 NDJSON:resp, _ := http.Post( "http://localhost:11434/api/generate", "application/json", bytes.NewBuffer(streamPayload),)defer resp.Body.Close()decoder := json.NewDecoder(resp.Body)for { var chunk map[string]interface{} if err := decoder.Decode(&chunk); err != nil { break } if content, ok := chunk["response"].(string); ok { fmt.Print(content) }}REST API 通用集成任何支持 HTTP 请求的语言都能直接调用 Ollama REST API,核心端点只有两个:文本生成 /api/generatecurl http://localhost:11434/api/generate -d '{ "model": "llama3.1", "prompt": "解释什么是微服务架构", "stream": false}'对话 /api/chatcurl http://localhost:11434/api/chat -d '{ "model": "llama3.1", "messages": [ {"role": "user", "content": "Redis 和 Memcached 怎么选?"} ], "stream": false}'OpenAI 兼容接口Ollama 还提供了 OpenAI 兼容的 API 端点 /v1/chat/completions,已有 OpenAI SDK 的项目只需修改 base_url 即可切换:from openai import OpenAIclient = OpenAI( base_url="http://localhost:11434/v1", api_key="ollama" # 任意值,本地不校验)response = client.chat.completions.create( model="llama3.1", messages=[{"role": "user", "content": "用 Python 写一个快速排序"}])print(response.choices[0].message.content)这个兼容层意味着所有使用 OpenAI SDK 的项目(包括 LangChain 的 OpenAI 集成)几乎零改动就能迁移到本地模型。生产环境注意事项网络暴露:默认仅监听 localhost,对外服务需设置 OLLAMA_HOST=0.0.0.0:11434,同时做好鉴权模型管理:通过 ollama pull 预下载模型,避免首次请求时冷启动耗时过长并发处理:Ollama 会按请求顺序处理,高并发场景建议配合消息队列GPU 资源:显存不足时模型会自动卸载到内存,响应延迟会显著上升,可通过 OLLAMA_KEEP_ALIVE 调整模型驻留时间
服务端阅读 05月28日 07:02

什么是 Ollama 的 Modelfile,如何创建自定义模型?

Modelfile 是什么Modelfile 是 Ollama 用来定义和构建自定义模型的配置文件,语法设计参考了 Dockerfile——从基础模型出发,逐层叠加参数、系统提示词和模板指令,最终打包成一个可复用的模型镜像。一个最简的 Modelfile 只需要一行:FROM llama3.1这就等于直接复制了一份 llama3.1。真正的自定义发生在你往里面添加指令之后。核心指令逐一拆解FROM — 指定基础模型FROM 是唯一必填指令,支持三种来源:# 从 Ollama 仓库拉取FROM llama3.1:8b# 从本地 GGUF 文件构建FROM ./my-model-q4_k_m.gguf# 从 Safetensors 目录构建FROM ./my-safetensors-dir从本地 GGUF 导入是 Ollama 的一个重要能力,意味着你可以把 HuggingFace 上下载的任何 GGUF 量化模型直接跑起来,不需要额外转换。PARAMETER — 调整推理参数PARAMETER 控制模型运行时的行为,每行设置一个参数:PARAMETER temperature 0.7PARAMETER top_p 0.9PARAMETER num_ctx 4096PARAMETER repeat_penalty 1.1PARAMETER stop "<|end|>"几个关键参数的含义:temperature:控制输出随机性,0 附近更确定,1 以上更有创意。代码生成建议 0.1-0.3,创意写作建议 0.7-1.0num_ctx:上下文窗口大小,默认 2048,增大后会占用更多显存repeat_penalty:重复惩罚,大于 1 时抑制重复输出,1.1 是常用值stop:指定停止生成的标记SYSTEM — 设定系统提示词SYSTEM 定义模型的"人格"和行为边界,是自定义模型最常用的指令:SYSTEM You are an expert Python developer. Answer concisely with code examples.多行内容用三引号包裹:SYSTEM """You are a senior code reviewer. Follow these rules:1. Identify bugs and security issues first2. Suggest specific fixes with code3. Comment on performance if relevant"""系统提示词写得好,模型表现可以提升一个档次。核心技巧:明确角色、给出具体规则、限定输出格式。TEMPLATE — 自定义对话模板TEMPLATE 用 Go 模板语法定义对话格式,控制模型如何接收和生成文本:TEMPLATE """{{- range .Messages }}{{- if eq .Role "user" }}<|user|>{{ .Content }}<|end|>{{- else if eq .Role "assistant" }}<|assistant|}>{{ .Content }}<|end|>{{- end }}{{- end }}<|assistant|}>"""可用的模板变量:.Messages — 完整对话历史.Message.Role — 消息角色(user/assistant/system).Message.Content — 消息内容.Prompt — 仅当前用户输入.Response — 模型已生成的回复多数情况下基础模型自带的默认模板就够用,只有在你导入 GGUF 文件且模板不兼容时才需要手动指定。MESSAGE — 注入示例对话MESSAGE 用来给模型提供 few-shot 示例,引导输出风格和格式:MESSAGE user 请用一句话解释什么是递归MESSAGE assistant 递归是函数在自身定义中调用自身的编程技巧。和 SYSTEM 配合使用效果更好:SYSTEM 定规则,MESSAGE 给示范。其他指令# 添加许可证LICENSE MIT# 应用 LoRA 微调适配器ADAPTER ./my-lora-adapter.binADAPTER 指令可以将 LoRA 微调结果叠加到基础模型上,前提是适配器和基础模型必须匹配。实战:创建自定义模型场景一:基于已有模型定制最常见的需求——拿一个通用模型,改系统提示词和参数,变成专用助手:# 创建 Modelfilecat > Modelfile << 'EOF'FROM llama3.1SYSTEM You are a coding assistant specialized in Python. Provide concise answers with working code examples.PARAMETER temperature 0.3PARAMETER num_ctx 8192EOF# 构建模型ollama create my-coder -f Modelfile# 运行ollama run my-coder构建完成后,my-coder 就是一个独立的模型,可以像其他模型一样直接运行。场景二:导入 HuggingFace 上的 GGUF 模型从 HuggingFace 下载 GGUF 文件后,两步就能跑起来:# Modelfile 只需一行cat > Modelfile << 'EOF'FROM ./Qwen2.5-7B-Instruct-Q4_K_M.ggufEOFollama create my-qwen -f Modelfileollama run my-qwen如果对话格式不对,需要加 TEMPLATE 指令手动指定。场景三:量化创建模型Ollama 支持在创建时对 FP16 模型进行量化,节省磁盘空间:cat > Modelfile << 'EOF'FROM ./my-model-f16.ggufEOF# 指定量化等级ollama create my-model-q4 --quantize q4_K_M -f Modelfile常用量化等级:q4KM(平衡质量和大小)、q5KM(更高质量)、q8_0(接近原质量但体积大)。注意量化过程需要额外磁盘空间,7B 模型的 FP16 文件约 14GB,量化时需要同等大小的临时空间。常用运维命令# 查看模型的 Modelfile 配置ollama show --modelfile my-coder# 列出所有本地模型ollama list# 删除模型ollama rm my-coder# 更新模型(修改 Modelfile 后重新 create 同名即可覆盖)ollama create my-coder -f Modelfileollama show --modelfile 非常实用,可以反查任何已有模型的完整配置,包括默认模板和参数,是学习和调试的好帮手。常见问题构建报错 "must specify a FROM line":Modelfile 缺少 FROM 指令,确保第一行是 FROM。导入 GGUF 后对话格式混乱:基础模型自带模板和 GGUF 文件的模板冲突,需要手动添加 TEMPLATE 指令指定正确的对话格式。自定义模型回答风格没变化:检查 SYSTEM 指令是否生效——用 ollama show --modelfile model-name 确认配置是否正确写入。部分小模型对系统提示词的遵从度有限,换用更大参数的模型可能效果更好。量化后模型质量下降明显:尝试更高的量化等级,如从 q4KM 换成 q5KM 或 q8_0。
服务端阅读 05月28日 07:01

如何在 Ollama 中实现多模型并发运行和资源管理?

Ollama 支持两级并发:多模型同时加载和单模型并行处理请求。核心配置通过环境变量控制,资源管理由内存驱动,空闲模型自动卸载。两级并发机制Ollama 的并发能力分两层:多模型并发加载:系统内存充足时,多个模型可同时驻留在 RAM/VRAM 中,各自独立处理请求单模型并行推理:单个模型为多个请求分配并行 KV-cache 槽位,共享模型权重,吞吐量成倍提升两者可以组合使用:2 个模型各开 4 个并行槽位,总共可同时处理 8 个请求。核心环境变量配置三个关键变量控制并发行为:| 变量 | 作用 | 默认值 ||------|------|--------|| OLLAMA_NUM_PARALLEL | 单模型并行处理请求数 | 1(内存充足时自动设为 4) || OLLAMA_MAX_LOADED_MODELS | 同时加载的最大模型数 | 3 × GPU 数量,CPU 推理为 3 || OLLAMA_MAX_QUEUE | 繁忙时排队等待的最大请求数 | 512 |systemd 部署配置sudo mkdir -p /etc/systemd/system/ollama.service.dsudo tee /etc/systemd/system/ollama.service.d/parallel.conf > /dev/null <<EOF[Service]Environment="OLLAMA_NUM_PARALLEL=4"Environment="OLLAMA_MAX_LOADED_MODELS=3"Environment="OLLAMA_MAX_QUEUE=256"EOFsudo systemctl daemon-reload && sudo systemctl restart ollamaDocker 部署配置docker run -d --gpus=all --network=host -v ollama_data:/root/.ollama -e OLLAMA_NUM_PARALLEL=4 -e OLLAMA_MAX_LOADED_MODELS=3 -e OLLAMA_MAX_QUEUE=256 ollama/ollama内存驱动的资源管理Ollama 的资源管理核心原则:内存决定一切。加载与卸载策略新请求到达时,Ollama 检查可用内存是否足够加载目标模型:内存充足:模型加载到 RAM(CPU 推理)或 VRAM(GPU 推理),请求立即处理内存不足:请求进入队列,等待已加载模型空闲后被卸载释放空间空闲模型在超时后自动卸载,释放的资源按需分配给新模型并行推理的内存开销并行推理的 KV-cache 按并行数线性增长:实际内存需求 ≈ 模型权重 + OLLAMA_NUM_PARALLEL × OLLAMA_CONTEXT_LENGTH × 每token内存例如:2K 上下文开 4 并行 = 8K 上下文的 KV-cache 开销。设置并行数前务必评估可用 VRAM。GPU 约束GPU 推理时,新模型必须完整装入 VRAM 才能并发加载。如果 VRAM 装不下第二个模型,Ollama 会排队等待。Windows Radeon GPU 因 ROCm VRAM 上报限制,默认最多加载 1 个模型。查看运行状态# 查看当前加载的模型、大小和运行位置ollama ps输出示例:NAME ID SIZE PROCESSOR UNTILdeepseek-r1:32b abc123def456 19.2GB 100% GPU 4 minutes from nowqwen3:8b 789ghi012jkl 4.7GB 100% GPU 2 minutes from nowPROCESSOR 列显示模型运行在 GPU 还是 CPU 上,UNTIL 显示空闲超时时间。模型卸载与切换# 手动卸载模型释放内存ollama stop deepseek-r1:32b# 加载新模型(内存不足时自动排队等待)ollama run llama3.3:70b手动卸载适用于需要立即腾出空间加载更大模型的场景。Modelfile 中的并行参数Modelfile 的 PARAMETER num_parallel 与环境变量 OLLAMA_NUM_PARALLEL 不同:OLLAMA_NUM_PARALLEL:全局设置,作用于整个 Ollama 实例PARAMETER num_parallel:模型级设置,仅对该模型生效,可覆盖全局值FROM deepseek-r1:32b# 该模型允许 2 个并行请求PARAMETER num_parallel 2# 批处理大小PARAMETER num_batch 512如果不同模型需要不同的并行度,可在各自 Modelfile 中单独配置,无需运行多个 Ollama 实例。多 GPU 支持Ollama 支持两种多 GPU 使用方式:模型分片:超大模型(如 70B)自动拆分到多块 GPU 上运行多模型分配:不同模型分别加载到不同 GPU,各自独立推理多模型并发时,Ollama 根据 VRAM 情况自动将模型分配到不同 GPU。队列管理与过载保护OLLAMA_MAX_QUEUE 控制排队上限。队列满时,新请求返回 HTTP 503:{"error": "server busy, please try again"}客户端应实现指数退避重试:import ollamaimport timedef generate_with_retry(model, prompt, max_retries=3): for attempt in range(max_retries): try: return ollama.generate(model=model, prompt=prompt) except Exception as e: if "busy" in str(e) and attempt < max_retries - 1: time.sleep(2 ** attempt) continue raise低延迟 API 场景可将队列设为 64-128,快速失败而非长时间排队。生产环境最佳实践并行数设置:不要盲目拉满。4 并行已能覆盖多数场景,8 以上需要充足 VRAM 支撑。先用 ollama ps 观察实际内存占用再调整。模型选择:选择能完整装入 VRAM 的量化模型(Q4/Q5),比勉强加载大模型再降级到 CPU 推理更高效。8GB VRAM 适合 7B 模型,24GB VRAM 适合 32B 模型。监控指标:关注 GPU 利用率和请求延迟。并行推理下 GPU 利用率应稳定在 75% 以上,总耗时约为单请求的 2-4 倍而非线性增长,说明并行生效。多实例方案:OLLAMA_NUM_PARALLEL 是实例级全局设置。如果不同模型需要不同并行度且 Modelfile 配置无法满足,可在不同端口运行多个 Ollama 实例,各自独立配置。
服务端阅读 05月28日 07:00

如何在 Ollama 中使用流式响应(streaming)来实时生成文本?

Ollama 的流式响应(streaming)允许模型逐 token 返回结果,而不是等待全部生成完毕后一次性返回。这在聊天界面、代码补全等场景下几乎是必须的能力——用户能立刻看到内容逐步出现,感知延迟大幅降低。核心原理Ollama 的流式响应基于 HTTP 长连接 + NDJSON(Newline Delimited JSON)格式。服务端每生成一个 token 就立即写一行 JSON 到响应体,客户端逐行读取并解析。关键参数是请求中的 "stream": true——默认情况下 REST API 的流式是开启的,但在各语言 SDK 中通常默认关闭。每行 JSON 结构大致如下:{"model":"llama3.2","response":"Hello","done":false}{"model":"llama3.2","response":" world","done":false}{"model":"llama3.2","response":"","done":true,"total_duration":2500000000}当 done 为 true 时,这条消息还会携带 total_duration、eval_count 等统计信息,标志着本次生成结束。流式 vs 非流式:什么时候用哪个?| 维度 | 流式(stream: true) | 非流式(stream: false) ||------|---------------------|------------------------|| 首字延迟 | 极低,token 级返回 | 需等待全部生成完毕 || 内存占用 | 逐 token 处理,峰值低 | 需缓存完整响应 || 用户体验 | 类 ChatGPT 逐字出现 | 长时间白屏等待 || 实现复杂度 | 需处理增量拼接和中断逻辑 | 一次请求-响应,简单直接 || 适用场景 | 聊天、代码补全、实时交互 | 批量处理、后台任务、需完整结果后再操作 |面试要点:非流式适合服务端内部调用或需要完整结果后再做后处理的场景;流式适合任何用户直接交互的界面。generate 端点的流式调用/api/generate 是最基础的文本生成端点,适用于单轮 prompt 场景:curl http://localhost:11434/api/generate -d '{ "model": "llama3.2", "prompt": "用 Python 实现快速排序", "stream": true}'Python 中用 requests 库处理:import requestsimport jsonresponse = requests.post( 'http://localhost:11434/api/generate', json={'model': 'llama3.2', 'prompt': '用 Python 实现快速排序', 'stream': True}, stream=True)full_text = ''for line in response.iter_lines(): if line: chunk = json.loads(line) full_text += chunk.get('response', '') print(chunk['response'], end='', flush=True) if chunk.get('done'): print(f'总耗时: {chunk["total_duration"] / 1e9:.2f}s')chat 端点的流式调用/api/chat 支持多轮对话,是构建聊天应用的首选端点:import requestsimport jsonmessages = [ {'role': 'user', 'content': '解释一下 Transformer 的自注意力机制'}]response = requests.post( 'http://localhost:11434/api/chat', json={'model': 'llama3.2', 'messages': messages, 'stream': True}, stream=True)for line in response.iter_lines(): if line: chunk = json.loads(line) if 'message' in chunk and chunk['message']['content']: print(chunk['message']['content'], end='', flush=True)注意 chat 端点的响应结构不同——文本在 chunk['message']['content'] 而非 chunk['response'],这是面试中常见的混淆点。使用官方 Python SDKOllama 官方提供了 ollama Python 包,流式接口更简洁:import ollama# generate 流式for chunk in ollama.generate(model='llama3.2', prompt='写一首关于编程的诗', stream=True): print(chunk['response'], end='', flush=True)# chat 流式for chunk in ollama.chat( model='llama3.2', messages=[{'role': 'user', 'content': '解释递归'}], stream=True): if chunk['message']['content']: print(chunk['message']['content'], end='', flush=True)SDK 内部已经处理了连接管理和 NDJSON 解析,代码量显著减少。但在生产环境中,直接使用 requests 给你更多控制权——比如自定义超时、重试策略和连接池。JavaScript/TypeScript 实现Node.js 环境下使用 fetch API 处理流式:async function streamChat(prompt) { const response = await fetch('http://localhost:11434/api/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: 'llama3.2', messages: [{ role: 'user', content: prompt }], stream: true }) }); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split(''); buffer = lines.pop(); // 保留不完整的行 for (const line of lines) { if (!line.trim()) continue; const chunk = JSON.parse(line); if (chunk.message?.content) { process.stdout.write(chunk.message.content); } } }}这里用 buffer 拼接是因为网络传输可能把一个 JSON 行拆成多个 chunk,直接按行 split 会解析失败——这是前端处理流式响应时最常踩的坑。错误处理与重连生产环境中流式连接会因网络波动、服务重启等原因中断,必须有健壮的错误处理:import requestsimport jsonimport timedef stream_with_retry(url, payload, max_retries=3, timeout=30): for attempt in range(max_retries): try: response = requests.post( url, json=payload, stream=True, timeout=timeout ) response.raise_for_status() full_text = '' for line in response.iter_lines(): if line: chunk = json.loads(line) if chunk.get('done'): return full_text full_text += chunk.get('response', '') return full_text except (requests.ConnectionError, requests.Timeout) as e: print(f'连接失败 (尝试 {attempt + 1}/{max_retries}): {e}') time.sleep(2 ** attempt) # 指数退避 except json.JSONDecodeError as e: print(f'JSON 解析错误: {e}') continue raise ConnectionError(f'重试 {max_retries} 次后仍然失败')# 使用result = stream_with_retry( 'http://localhost:11434/api/generate', {'model': 'llama3.2', 'prompt': 'Hello', 'stream': True})三个关键点:指数退避避免雪崩、超时设置防止无限挂起、JSON 解析错误需要跳过坏行而非直接放弃。工具调用的流式支持从 Ollama v0.8.0 开始,工具调用(tool calling)也支持流式返回了。这意味着模型在生成内容的同时可以实时发起工具调用,不再需要等待完整响应:import ollamatools = [{ 'type': 'function', 'function': { 'name': 'get_weather', 'description': '获取指定城市的天气', 'parameters': { 'type': 'object', 'properties': { 'city': {'type': 'string', 'description': '城市名称'} }, 'required': ['city'] } }}]for chunk in ollama.chat( model='llama3.2', messages=[{'role': 'user', 'content': '北京今天天气怎么样?'}], tools=tools, stream=True): if chunk.get('message', {}).get('tool_calls'): for tool_call in chunk['message']['tool_calls']: print(f'调用工具: {tool_call["function"]["name"]}') print(f'参数: {tool_call["function"]["arguments"]}')收到工具调用后,你需要执行对应函数,将结果作为 tool 角色的消息追加到 messages 中,再次调用 chat 接口让模型继续生成。生产环境的几个坑连接池管理:Ollama 默认并发连接数有限(通常与模型并发数相关)。如果每个请求都新建连接,高并发下会频繁超时。用 requests.Session() 复用连接:session = requests.Session()def stream_chat(prompt): return session.post( 'http://localhost:11434/api/chat', json={'model': 'llama3.2', 'messages': [{'role': 'user', 'content': prompt}], 'stream': True}, stream=True, timeout=60 )取消生成:用户中途停止接收时,直接关闭连接即可。Ollama 服务端会检测到连接断开并停止生成,不会浪费算力。在 Python 中调用 response.close(),在 JS 中调用 reader.cancel()。上下文窗口溢出:长对话流式生成到一半突然返回 done: true 但内容截断,通常是上下文窗口超限。需要在发送请求前计算 token 数,必要时裁剪历史消息。多模型并发:Ollama 同时只能加载有限数量的模型到 GPU。如果频繁切换模型,会出现加载等待,表现为流式首字延迟骤增。生产环境建议固定使用一个模型,或通过 OLLAMA_NUM_PARALLEL 环境变量调整并发数。
服务端阅读 05月28日 07:00

Ollama 支持哪些大语言模型,如何选择合适的模型?

Ollama 支持的主要模型系列截至 2026 年,Ollama 模型库已支持超过 100 个大语言模型,覆盖主流开源模型家族。以下是按厂商分类的核心模型:Meta Llama 系列llama3.1 — 8B / 70B / 405B,通用对话基线模型llama3.2 — 1B / 3B,轻量级端侧模型llama3.3 — 70B,Meta 当前最强开源模型,推理能力接近 Llama 3.1 405B阿里通义千问系列qwen2.5 — 7B / 14B / 32B / 72B,中文理解能力突出,128K 上下文qwen2.5-coder — 7B / 32B,代码生成与调试首选qwen3 — 8B / 14B 等,强推理 + 工具调用能力,2026 年热门模型深度求索系列deepseek-r1 — 7B / 8B / 32B,链式思维推理模型,数学和逻辑推理表现优异deepseek-v3 — 大参数通用模型Google Gemma 系列gemma2 — 9B / 27B,轻量高效gemma3 — 4B / 12B / 27B,支持多模态(文本+图片输入)Mistral AI 系列mistral — 7B,经典轻量模型mixtral — 8x7B / 8x22B,MoE 架构,兼顾速度与质量代码与专用模型codellama — 7B / 13B / 34B,多语言代码生成devstral-small — 软件工程专用,适合中等硬件phi4-mini — 微软轻量模型,低资源环境可用嵌入模型mxbai-embed-large — 文本嵌入,适合 RAG 系统nomic-embed-text — 长文本嵌入如何选择合适的模型选择模型的核心逻辑是:先看硬件,再看场景,最后实测。按硬件配置选择硬件是硬约束。模型参数量越大,所需内存越多。一个反复在显存和系统内存间交换的模型,生成速度会慢到难以使用——宁可跑一个小模型跑得流畅,也不要勉强跑大模型。| 可用内存 | 推荐参数量 | 代表模型 ||---------|-----------|---------|| 8GB | 1B-7B | qwen3:4b、llama3.2:3b、phi4-mini || 16GB | 7B-14B | qwen2.5:7b、llama3.1:8b、gemma3:12b || 32GB | 14B-32B | qwen2.5-coder:32b、deepseek-r1:32b || 64GB+ | 70B | llama3.3:70b、qwen2.5:72b |Mac 用户注意:Mac 使用统一内存,16GB 机型建议预留 4-6GB 给系统,实际可跑模型控制在 9B 以下。按使用场景选择通用对话与日常问答首选 qwen2.5:7b(中文场景)或 llama3.1:8b(英文场景)中文理解、成语运用和文化常识方面,Qwen 系列在同参数量下明显优于 Llama代码生成与调试首选 qwen2.5-coder:7b(16GB 内存)或 qwen2.5-coder:32b(32GB 内存)DeepSeek Coder 和 CodeLlama 是备选推理与数学首选 deepseek-r1:7b(轻量)或 deepseek-r1:32b(高质量)DeepSeek R1 的链式思维推理在数学和逻辑题上表现突出多模态(图片理解)首选 gemma3:4b(低硬件)或 gemma3:27b(高硬件)qwen2.5-vl:7b 适合结构化图片分析RAG 检索增强文本生成用 qwen2.5:7b,嵌入用 mxbai-embed-large量化版本选择Ollama 默认使用 Q4KM 量化。如果内存紧张,可以用更激进的量化:# 默认 Q4_K_M 量化ollama run qwen2.5:7b# 更小的 Q4 量化,速度快、精度微降ollama run qwen2.5:7b-q4_0# Q8 量化,接近原始精度但内存翻倍ollama run qwen2.5:7b-q8_0量化等级越低,模型体积越小、推理越快,但精度下降。实际体验中 Q4KM 到 Q4_0 的精度差异不大,但内存占用可减少 15%-20%。实操:快速验证模型是否适合你# 拉取模型ollama pull qwen2.5:7b# 运行并测试ollama run qwen2.5:7b# 在对话中输入测试提示>>> 请写一篇关于春天的短文,200字左右观察生成速度:流畅如打字(>15字/秒)说明硬件匹配;明显卡顿则换更小参数的模型或更激进的量化。建议同时拉取 2-3 个候选模型,用相同的提示词对比效果,实测比看评测更靠谱。查看所有可用模型访问 Ollama 官方模型库 https://ollama.com/library 可浏览全部模型及变体。新模型持续更新,建议定期查看。
前端阅读 05月28日 07:00

如何在 Jest 中测试 React 组件?常用的测试工具和查询方法有哪些?

在 Jest 中测试 React 组件,核心思路是:渲染组件 → 查询元素 → 断言行为。React 官方推荐的测试方案是 Jest + React Testing Library(RTL),本文聚焦面试中高频考察的知识点。React 组件测试的基本流程是什么?测试 React 组件通常分三步:渲染:使用 RTL 的 render 方法将组件挂载到虚拟 DOM查询:通过 screen 对象提供的方法定位页面元素断言:使用 Jest 的 expect 验证元素状态或行为import { render, screen } from '@testing-library/react';import Counter from './Counter';test('counter displays initial value', () => { render(<Counter initialCount={0} />); expect(screen.getByText('Count: 0')).toBeInTheDocument();});查询方法的优先级怎么选?RTL 的查询方法有三个前缀,区别在于元素不存在时的行为:| 前缀 | 元素存在 | 元素不存在 | 适用场景 ||------|---------|-----------|---------|| getBy* | 返回元素 | 抛出错误 | 断言元素一定存在 || queryBy* | 返回元素 | 返回 null | 断言元素不存在 || findBy* | 返回 Promise | Promise reject | 异步元素出现 |具体查询方法的推荐优先级:getByRole — 最优先,基于 ARIA 角色,如 button、textbox、headinggetByLabelText — 表单元素优先用,关联 label 文本getByPlaceholderText — 没有 label 时使用getByText — 非表单元素(按钮、链接、段落)常用getByTestId — 最后手段,需要手动添加 data-testid 属性// 推荐:通过角色查询screen.getByRole('button', { name: /submit/i });// 不推荐但有时必要:通过 testId 查询screen.getByTestId('submit-btn');面试关键点:优先使用 getByRole 是因为它验证了组件的可访问性,这与 RTL "测试用户视角" 的核心理念一致。如何测试用户交互?使用 fireEvent 或 userEvent 模拟用户操作。userEvent 更接近真实用户行为,推荐优先使用。import { render, screen } from '@testing-library/react';import userEvent from '@testing-library/user-event';test('clicking button increments counter', async () => { const user = userEvent.setup(); render(<Counter initialCount={0} />); await user.click(screen.getByRole('button', { name: /increment/i })); expect(screen.getByText('Count: 1')).toBeInTheDocument();});fireEvent 与 userEvent 的区别:fireEvent.click() 只触发 click 事件userEvent.click() 会依次触发 mousedown → mouseup → focus → click,更贴近真实操作userEvent.type() 会逐字符触发键盘事件,而 fireEvent.change() 直接修改值异步组件怎么测试?异步场景(接口请求、定时器、状态延迟更新)使用 waitFor 或 findBy* 处理。import { render, screen, waitFor } from '@testing-library/react';test('displays user data after loading', async () => { render(<UserProfile userId={1} />); // 方式一:findBy(推荐,更简洁) expect(await screen.findByText('John')).toBeInTheDocument(); // 方式二:waitFor(更灵活,可组合多个断言) await waitFor(() => { expect(screen.getByText('John')).toBeInTheDocument(); expect(screen.getByText('john@example.com')).toBeInTheDocument(); });});常见坑:waitFor 中不要用 queryBy*,因为它不抛错,断言不会失败,导致测试误通过。应使用 getBy*。如何 Mock 模块和 API 请求?面试中常考的 Mock 手段分两种:Jest.fn() — Mock 函数test('calls onSubmit with form data', async () => { const onSubmit = jest.fn(); const user = userEvent.setup(); render(<LoginForm onSubmit={onSubmit} />); await user.type(screen.getByLabelText(/email/i), 'test@example.com'); await user.click(screen.getByRole('button', { name: /login/i })); expect(onSubmit).toHaveBeenCalledWith({ email: 'test@example.com' });});jest.mock — Mock 模块// Mock API 请求模块jest.mock('../api', () => ({ fetchUser: jest.fn().mockResolvedValue({ name: 'John' })}));test('renders fetched user name', async () => { render(<UserProfile />); expect(await screen.findByText('John')).toBeInTheDocument();});对于更复杂的 API Mock 场景,可以使用 Mock Service Worker(MSW),它在 Service Worker 层拦截请求,不需要修改业务代码。React Hooks 怎么测试?自定义 Hook 使用 renderHook 进行测试:import { renderHook, act } from '@testing-library/react';import { useCounter } from './useCounter';test('useCounter increments and decrements', () => { const { result } = renderHook(() => useCounter(0)); expect(result.current.count).toBe(0); act(() => { result.current.increment(); }); expect(result.current.count).toBe(1); act(() => { result.current.decrement(); }); expect(result.current.count).toBe(0);});注意:状态更新必须包裹在 act() 中,否则 Jest 会报警告。renderHook 已从 RTL v13 起内置,不再需要 @testing-library/react-hooks 包。快照测试怎么用?什么场景下用?import renderer from 'react-test-renderer';test('Button matches snapshot', () => { const tree = renderer.create(<Button>Click</Button>).toJSON(); expect(tree).toMatchSnapshot();});快照测试的适用与不适用:适合:配置型组件(Theme、Layout),结构稳定的纯展示组件不适合:频繁变动的业务组件,否则每次改动都要更新快照,失去测试价值面试加分点:快照测试只是确认结构没变,并不验证行为是否正确,所以不能替代行为测试。测试 React 组件有哪些最佳实践?测试行为,不测实现 — 不测内部 state 的值,测用户看到的结果避免过度 Mock — Mock 越多,测试离真实场景越远查询方法按优先级选 — getByRole > getByLabelText > getByText > getByTestId异步用 findBy 优于 waitFor + getBy — 更简洁,语义更清晰使用 screen 而非 render 返回值 — 避免反复解构,代码更干净一个测试只验证一个行为 — 方便定位失败原因面试追问方向:如何测试 Context Provider 包裹的组件?如何处理第三方库的渲染行为?如何在 CI 中提升测试执行速度?这些是区分中级与高级的关键问题。
服务端阅读 05月28日 06:59

SSH 安全加固怎么做?生产服务器必改的 8 项配置与面试追问

SSH 安全加固是运维和后端面试中的高频考点,也是生产服务器上线前必须完成的配置。本文从实际生产场景出发,梳理 SSH 加固的核心配置项,每项给出"为什么做"和"怎么配",并在文末附上面试常见追问。修改默认端口:降低被扫描概率SSH 默认监听 22 端口,这是所有自动化扫描工具的首要目标。改成非标准端口后,扫描流量会大幅下降,日志噪音也明显减少。# /etc/ssh/sshd_configPort 2222修改端口后记得同步更新防火墙规则和 Fail2Ban 配置,否则改了端口反而把自己锁在外面。另外,改端口属于"降低攻击面"而非"增强安全性",不要因此放松其他加固措施。禁用 root 登录:权限最小化允许 root 直接 SSH 登录意味着一旦密钥或密码泄露,攻击者立即获得最高权限。正确的做法是用普通用户登录,再通过 sudo 提权。# /etc/ssh/sshd_configPermitRootLogin no部署前要确认普通用户的 sudo 权限已配置好,否则禁用 root 后将无法执行管理操作。强制公钥认证:干掉暴力破解密码认证最大的风险是暴力破解,即使设了复杂密码也难逃字典攻击。公钥认证用非对称加密,私钥不离开本地,攻击面极小。# /etc/ssh/sshd_configPasswordAuthentication noPubkeyAuthentication yes切换顺序很重要:先部署公钥到服务器,测试公钥登录成功,再禁用密码认证。如果反着来,你可能会丢掉唯一能登录的方式。限制登录用户:白名单比黑名单可靠不限制登录用户意味着系统上的所有账户(包括服务账户)都可以尝试 SSH 登录。用白名单方式只允许运维人员登录,是更安全的做法。# /etc/ssh/sshd_configAllowUsers deploy admin@10.0.0.0/8AllowGroups sshusersSSH 处理顺序是 DenyUsers → AllowUsers → DenyGroups → AllowGroups,建议用 AllowUsers 或 AllowGroups 做白名单,比 DenyUsers 黑名单更可控。启用多因素认证:密钥 + 动态口令密钥认证虽然安全,但如果私钥文件被盗,攻击者就能直接登录。多因素认证(MFA)要求同时持有密钥和动态口令,即使私钥泄露也无法单独使用。# /etc/ssh/sshd_configAuthenticationMethods publickey,keyboard-interactive:pam# 安装 Google Authenticator PAM 模块sudo apt-get install libpam-google-authenticator# 为用户生成 TOTP 密钥google-authenticator# /etc/pam.d/sshd 添加auth required pam_google_authenticator.so生产环境建议 MFA 仅对跳板机或入口机开启,内网机器可以只做密钥认证,平衡安全和效率。加密算法优化:淘汰弱算法OpenSSH 支持多种加密算法,其中一些已被证明存在安全缺陷(如 3DES、CBC 模式、SHA1)。只保留安全的算法可以防止降级攻击。# /etc/ssh/sshd_configCiphers aes256-gcm@openssh.com,chacha20-poly1305@openssh.com,aes256-ctrKexAlgorithms curve25519-sha256@libssh.org,diffie-hellman-group16-sha512MACs hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com可以用 ssh -Q cipher 查看当前 OpenSSH 支持的算法列表,用 sshd -T | grep -i cipher 查看实际生效配置。推荐使用 ssh-audit 工具对服务器做算法安全审计。连接限制与超时:防暴力破解和会话劫持暴力破解靠大量尝试碰运气,限制认证次数和连接速率能有效遏制。空闲会话不断开则可能被他人利用,设置超时是必要的。# /etc/ssh/sshd_configMaxAuthTries 3MaxSessions 2MaxStartups 10:30:100LoginGraceTime 60ClientAliveInterval 300ClientAliveCountMax 0MaxStartups 10:30:100 的含义:未完成认证的连接超过 10 个时,新连接有 30% 概率被拒绝;超过 100 个则全部拒绝。ClientAliveCountMax 0 表示连续 0 次无响应即断开,配合 ClientAliveInterval 300 实现 5 分钟无操作自动断开。网络层防护:防火墙 + Fail2Ban + TCP WrapperSSH 加固不能只靠 sshd_config,网络层防护是第二道防线。三层配合使用效果最好。防火墙限制来源 IP:# iptables: 仅允许内网段访问iptables -A INPUT -p tcp --dport 2222 -s 10.0.0.0/8 -j ACCEPTiptables -A INPUT -p tcp --dport 2222 -j DROP# ufw: 更简洁的写法ufw allow from 10.0.0.0/8 to any port 2222Fail2Ban 自动封禁:# /etc/fail2ban/jail.local[sshd]enabled = trueport = 2222filter = sshdlogpath = /var/log/auth.logmaxretry = 3bantime = 3600findtime = 600TCP Wrapper 兜底:# /etc/hosts.allowsshd: 10.0.0.0/8# /etc/hosts.denysshd: ALL三者防护逻辑不同:防火墙在网络层过滤数据包,Fail2Ban 根据行为模式动态封禁,TCP Wrapper 在应用层做访问控制。不要只依赖其中一种。面试追问与回答Q: 只改端口能防住攻击吗?不能。改端口只是降低了被自动化工具发现的概率,端口扫描器仍然可以遍历所有端口找到 SSH 服务。改端口是辅助手段,核心安全还是要靠密钥认证、禁用 root、限制来源 IP 这些实质性加固。Q: 密钥认证就够安全了吗?什么场景还需要 MFA?密钥认证比密码安全得多,但私钥文件一旦泄露(比如开发机被入侵),攻击者就能直接登录服务器。以下场景必须上 MFA:面向公网暴露的跳板机、拥有高权限的核心服务器、合规要求(等保三级及以上)的场景。Q: ClientAliveInterval 和 LoginGraceTime 区别是什么?两者作用阶段完全不同。LoginGraceTime 控制的是"连接建立后多久没完成认证就断开",防的是半开连接占用资源;ClientAliveInterval 控制的是"认证成功后多久没操作就断开",防的是空闲会话被利用。生产环境建议前者设 60 秒,后者设 300 秒。Q: 怎么验证 SSH 加固配置是否生效?分三步验证:用 sshd -T 检查配置是否被正确加载;用 ssh-audit 工具扫描服务器,它会给出算法强度和配置问题的评分;用另一台机器尝试以 root 登录、密码登录、从非白名单 IP 登录,确认都被拒绝。修改配置前务必保留一个已登录的会话,防止配置错误把自己锁在外面。
服务端阅读 05月28日 06:59

如何安装 Ollama?常用命令和实操技巧有哪些?

各平台安装方式Ollama 支持在 macOS、Linux 和 Windows 三个主流平台上安装,同时也提供 Docker 部署方案。macOS 安装通过 Homebrew 一键安装:brew install ollama也可以从 Ollama 官网下载 macOS 版本的安装包,拖入 Applications 文件夹即可完成安装。安装后菜单栏会出现 Ollama 图标,点击可查看服务状态。Linux 安装使用官方一键安装脚本:curl -fsSL https://ollama.com/install.sh | sh如果遇到权限问题,可以加 sudo 执行。安装完成后 Ollama 会自动注册为 systemd 服务,开箱即用。Windows 安装两种方式可选:# 方式一:通过 winget 安装winget install Ollama.Ollama方式二是从官网下载 OllamaSetup.exe,双击运行安装程序。安装完成后 Ollama 默认开机自启,如需关闭可在任务管理器的启动应用中禁用。Docker 部署服务器环境下推荐使用 Docker 部署:docker run -d -v /home/ollama:/root/.ollama -p 11434:11434 --name ollama ollama/ollama验证安装是否成功安装完成后执行以下命令确认:ollama --version也可以直接请求 API 端点检查服务状态:curl http://localhost:11434# 返回 "Ollama is running" 即表示服务正常核心命令速查模型管理运行模型(首次运行会自动下载):ollama run llama3.2ollama run mistralollama run codellama下载模型:ollama pull llama3.2ollama pull phi3:mini # 适合 8GB 内存的小型模型ollama pull llama3.1:70b # 需要 32GB+ 内存查看已安装模型:ollama list查看正在运行的模型:ollama ps删除模型:ollama rm llama3.2停止运行中的模型:ollama stop llama3.2查看模型详细信息:ollama show llama3.2复制模型:ollama cp llama3.2 my-llama服务管理启动 API 服务:ollama serve查看帮助信息:ollama -h自定义模型:ModelfileOllama 支持通过 Modelfile 创建自定义模型,类似于 Dockerfile 的工作方式:ollama create my-model -f ./ModelfileModelfile 示例:FROM llama3.2# 设置系统提示词SYSTEM You are a helpful coding assistant that always responds in Chinese.# 设置温度参数PARAMETER temperature 0.7# 设置模板TEMPLATE {{- .System }}{{- .Prompt }}创建后可以直接运行:ollama run my-modelAPI 调用方式Ollama 默认在 localhost:11434 提供 REST API 服务,支持两种主要接口。生成接口curl http://localhost:11434/api/generate -d '{ "model": "llama3.2", "prompt": "用 Python 实现快速排序", "stream": false}'对话接口curl http://localhost:11434/api/chat -d '{ "model": "llama3.2", "messages": [ {"role": "system", "content": "你是一个编程助手"}, {"role": "user", "content": "解释什么是 REST API"} ], "stream": false}'将 stream 设为 true 可以启用流式输出,适合前端逐字展示的场景。GPU 加速配置Ollama 默认会自动检测并使用可用的 GPU,无需额外配置。NVIDIA GPU:需要安装 NVIDIA 驱动,Ollama 自动调用 CUDA 加速,推荐 RTX 4060 及以上显卡AMD GPU:Linux 下自动支持 ROCm,macOS 使用 Metal 加速Apple Silicon:M 系列芯片通过 Metal 框架获得原生加速查看 GPU 使用情况:# Linux 下查看 NVIDIA GPU 状态nvidia-smi如果 GPU 未被识别,确认驱动已正确安装,并检查 OLLAMA_LLM_LIBRARY 环境变量是否被误设。常见问题排查端口被占用:默认端口 11434 冲突时,通过环境变量修改:export OLLAMA_HOST=0.0.0.0:11435ollama serve模型下载慢:配置代理加速:export OLLAMA_PROXY=http://your-proxy:port内存不足:优先选择量化后的小模型,如 phi3:mini 或 llama3.2,避免直接运行 70B 参数量的大模型。服务启动失败:Linux 下检查 systemd 服务状态:systemctl status ollamasystemctl restart ollama掌握以上安装方法和常用命令,就能在本地快速搭建大语言模型运行环境。建议从 ollama run llama3.2 开始体验,熟悉后再尝试 Modelfile 自定义和 API 集成。
前端阅读 05月28日 06:59

如何在 Jest 中 Mock fetch 和 Axios 测试 API 调用?

核心思路测试 API 调用的关键原则是隔离外部依赖——不发出真实网络请求,用 Mock 替代,验证的是"你的代码如何调用 API、如何处理响应",而非 API 本身的行为。Jest 提供了三种主要 Mock 手段:jest.mock() 模块级替换、jest.spyOn() 方法级监听、jest.fn() 手动创建假函数。理解三者的区别和适用场景,是这道题的答题主线。Mock Axios 的两种方式方式一:jest.mock() 替换整个模块jest.mock('axios') 会将 axios 模块中所有导出替换为 jest.fn(),适合需要完全控制模块行为的场景:import axios from 'axios';import { getUser } from './api';jest.mock('axios');test('getUser 应返回用户数据', async () => { const mockData = { id: 1, name: 'Tom' }; axios.get.mockResolvedValue({ data: mockData }); const result = await getUser(1); expect(result).toEqual(mockData); expect(axios.get).toHaveBeenCalledWith('/users/1');});mockResolvedValue 让 axios.get 返回一个 resolved Promise,模拟成功响应。toHaveBeenCalledWith 断言调用参数,确保请求地址正确。方式二:jest.spyOn() 监听原方法jest.spyOn 不替换模块,而是包装原方法,可以追踪调用并控制返回值,还能通过 mockRestore() 恢复原实现:import axios from 'axios';import { getUser } from './api';test('getUser 应返回用户数据', async () => { const spy = jest.spyOn(axios, 'get').mockResolvedValue({ data: { id: 1, name: 'Tom' } }); const result = await getUser(1); expect(result).toEqual({ id: 1, name: 'Tom' }); spy.mockRestore(); // 恢复 axios.get 原实现});何时选哪个? jest.mock() 适合整个测试文件都需要 mock 的场景;jest.spyOn() 适合只想在单个测试中临时 mock、其余测试保留真实行为的场景。Mock fetch 的两种方式方式一:jest.fn() 替换全局 fetchfetch 是全局对象上的方法,直接赋值即可替换:import { fetchPosts } from './api';beforeEach(() => { global.fetch = jest.fn(() => Promise.resolve({ ok: true, json: () => Promise.resolve([{ id: 1, title: 'Hello' }]), }) );});afterEach(() => { jest.restoreAllMocks();});test('fetchPosts 应返回帖子列表', async () => { const posts = await fetchPosts(); expect(posts).toEqual([{ id: 1, title: 'Hello' }]); expect(global.fetch).toHaveBeenCalledWith('/api/posts');});这里用 beforeEach / afterEach 管理 Mock 生命周期,避免测试间互相污染——这是面试中经常追问的考点。方式二:jest.spyOn() 监听全局 fetchtest('fetchPosts 处理响应数据', async () => { jest.spyOn(global, 'fetch').mockResolvedValue({ ok: true, json: () => Promise.resolve([{ id: 1 }]), }); const posts = await fetchPosts(); expect(posts).toEqual([{ id: 1 }]);});测试错误场景只测成功路径是不够的,面试官一定会问"网络请求失败了怎么办":test('getUser 应抛出网络错误', async () => { axios.get.mockRejectedValue(new Error('Network Error')); await expect(getUser(1)).rejects.toThrow('Network Error');});test('getUser 应处理 404 响应', async () => { axios.get.mockRejectedValue({ response: { status: 404, data: { message: 'Not Found' } }, }); await expect(getUser(999)).rejects.toMatchObject({ response: { status: 404 }, });});mockRejectedValue 模拟 Promise reject,覆盖网络异常和服务端错误两种情况。使用 MSW 做更真实的拦截当项目有大量 API 需要测试时,逐个 jest.mock 维护成本高。MSW(Mock Service Worker)在网络层拦截请求,不需要修改业务代码:import { rest } from 'msw';import { setupServer } from 'msw/node';const server = setupServer( rest.get('/api/users/:id', (req, res, ctx) => { return res(ctx.json({ id: req.params.id, name: 'Tom' })); }));beforeAll(() => server.listen());afterEach(() => server.resetHandlers());afterAll(() => server.close());test('getUser 通过 MSW 返回数据', async () => { const user = await getUser(1); expect(user).toEqual({ id: '1', name: 'Tom' });});MSW 的优势:可以在运行时动态修改响应(server.use()),测试超时、限流等边界场景;同一套 handler 可复用于单元测试和集成测试。关键差异速查| 场景 | 推荐方案 | 原因 ||------|---------|------|| Mock 整个第三方库 | jest.mock() | 一键替换所有导出 || 单个测试临时 Mock | jest.spyOn() | 可恢复,不影响其他测试 || Mock 全局 API(fetch) | jest.fn() / spyOn | fetch 是全局变量,需手动处理 || 大量 API 集成测试 | MSW | 网络层拦截,维护成本低 |面试追问方向jest.mock 和 jest.spyOn 的本质区别? mock 是替换,spyOn 是包装。mock 后原实现丢失,spyOn 可恢复。为什么要避免测试中发出真实请求? 网络不稳定、速度慢、可能产生脏数据、依赖外部服务可用性。Mock 污染怎么解决? beforeEach 重置、afterEach 调用 jest.restoreAllMocks()、每个测试独立设置数据。如何测试请求重试逻辑? 用 mockRejectedValueOnce 连续返回失败,最后一次返回成功,模拟重试后恢复。
服务端阅读 05月28日 06:58

REST API 的 CSRF 防护怎么做?认证方式不同策略完全不同

REST API 的 CSRF 防护和传统 Web 应用有明显差异。核心问题在于:REST API 可能被浏览器、移动应用、第三方服务等多种客户端调用,认证方式也不统一,防护策略必须因地制宜。REST API 为什么仍需关注 CSRF很多人认为 REST API 只用 JSON 就安全了,但事实并非如此。只要你的 API 满足以下条件,CSRF 风险就存在:使用 Cookie 进行身份认证(浏览器会自动携带)允许跨域请求(CORS 配置宽松)接受 application/json 以外的 Content-Type攻击者的思路很直接:构造一个恶意页面,让已登录用户在不知情的情况下向你的 API 发起请求。如果认证靠 Cookie,浏览器会自动带上,服务端无法区分请求来源。关键判断:你的认证方式决定防护策略Token 认证(JWT / OAuth)— 天然免疫Token 放在 Authorization 头中,浏览器不会自动发送,攻击者无法在跨站请求中携带。这是 REST API 最推荐的认证方式:// 服务端验证function authenticateJWT(req, res, next) { const token = req.headers.authorization?.replace("Bearer ", ""); if (!token) return res.status(401).json({ error: "未提供认证令牌" }); try { req.user = jwt.verify(token, JWT_SECRET); next(); } catch { res.status(401).json({ error: "令牌无效或已过期" }); }}需要注意:Token 存储在 localStorage 有 XSS 风险,存 HttpOnly Cookie 又会回到 CSRF 问题。生产中推荐短期 Token + 内存存储,刷新 Token 放 HttpOnly Cookie 并配合 CSRF 防护。Cookie 认证 — 必须防护Cookie 认证在前后端分离架构中仍然常见,尤其是需要与旧系统兼容时。这种场景下 CSRF 防护不可省略。Cookie 认证下的四种防护手段手段一:SameSite Cookie 属性最简单的第一道防线,设置 SameSite 限制跨站 Cookie 发送:app.use(session({ secret: process.env.SESSION_SECRET, cookie: { httpOnly: true, secure: true, sameSite: "strict" // 严格模式,完全禁止跨站发送 }}));strict 最安全但会影响从外部链接进入的体验,lax 是更常见的折中选择——允许 GET 请求的顶级导航携带 Cookie,阻止跨站 POST 请求。局限:并非所有浏览器都完整支持 SameSite(虽然主流浏览器已跟上),且移动端 WebView 行为不一致。不能作为唯一防线。手段二:CSRF Token + 自定义请求头这是最经典的防护方案,双重验证确保请求来源可信:// 生成并下发 CSRF Tokenapp.get("/api/csrf-token", (req, res) => { const token = crypto.randomBytes(32).toString("hex"); req.session.csrfToken = token; res.json({ csrfToken: token });});// 验证:通过自定义请求头传递 Tokenfunction csrfGuard(req, res, next) { // 安全方法跳过 if (["GET", "HEAD", "OPTIONS"].includes(req.method)) return next(); const headerToken = req.headers["x-csrf-token"]; if (!headerToken || headerToken !== req.session.csrfToken) { return res.status(403).json({ error: "CSRF 验证失败" }); } next();}前端配合:// 页面初始化时获取 Token,后续请求放在自定义头中const csrfToken = await fetch("/api/csrf-token").then(r => r.json());await fetch("/api/transfer", { method: "POST", headers: { "Content-Type": "application/json", "X-CSRF-Token": csrfToken.csrfToken }, body: JSON.stringify({ to: "user123", amount: 100 })});为什么用自定义头而非表单字段:跨域请求中浏览器不允许自定义头,攻击者无法通过表单或 iframe 伪造带 X-CSRF-Token 头的请求。这比把 Token 放在请求体中更可靠。手段三:Origin / Referer 头验证作为辅助手段,验证请求来源是否合法:function originGuard(req, res, next) { if (["GET", "HEAD", "OPTIONS"].includes(req.method)) return next(); const origin = req.headers.origin || req.headers.referer; const allowed = ["https://example.com", "https://app.example.com"]; if (!origin || !allowed.some(o => origin.startsWith(o))) { return res.status(403).json({ error: "请求来源非法" }); } next();}注意:Origin 头在某些旧浏览器中可能缺失,Referer 可能被隐私策略截断。只能作为补充,不能作为唯一防线。手段四:CORS 严格配置CORS 配置不当会直接暴露 API:app.use(cors({ origin: (origin, callback) => { const allowed = ["https://example.com", "https://app.example.com"]; // 移动端请求可能无 origin,需要其他认证方式保障 if (!origin || allowed.includes(origin)) { callback(null, true); } else { callback(new Error("CORS 拒绝")); } }, credentials: true, // 允许携带 Cookie methods: ["GET", "POST", "PUT", "DELETE"], allowedHeaders: ["Content-Type", "Authorization", "X-CSRF-Token"]}));关键点:credentials: true 时 origin 不能用 *,必须明确指定。Access-Control-Allow-Headers 必须包含 X-CSRF-Token,否则浏览器会阻止预检请求通过。混合客户端场景的分层防护实际项目中,API 往往同时服务浏览器和移动端,认证方式混合:function layeredAuth(req, res, next) { const authHeader = req.headers.authorization; if (authHeader?.startsWith("Bearer ")) { // JWT 认证 — 移动端/第三方,无需 CSRF 防护 try { req.user = jwt.verify(authHeader.slice(7), JWT_SECRET); return next(); } catch { return res.status(401).json({ error: "令牌无效" }); } } // Cookie 认证 — 浏览器端,需要 CSRF 防护 if (req.session?.userId) { if (["GET", "HEAD", "OPTIONS"].includes(req.method)) return next(); const csrfToken = req.headers["x-csrf-token"]; if (!csrfToken || csrfToken !== req.session.csrfToken) { return res.status(403).json({ error: "CSRF 验证失败" }); } req.user = { id: req.session.userId }; return next(); } res.status(401).json({ error: "需要身份认证" });}核心原则:根据认证方式决定是否启用 CSRF 防护,Token 认证跳过,Cookie 认证必检。面试回答要点面试中回答这个问题,抓住三个层次:先说判断依据:REST API 是否需要 CSRF 防护,取决于认证方式。Token 认证天然免疫,Cookie 认证必须防护。再说防护手段:SameSite Cookie 是基线,CSRF Token + 自定义请求头是核心,Origin 验证和 CORS 配置是补充,多层组合最可靠。最后说实践:混合客户端场景下,按认证方式分层处理,Token 走 JWT 验证、Cookie 走 CSRF 校验,不要一刀切。
服务端阅读 05月28日 06:55

Deno 的性能优化有哪些关键技巧?

Deno 基于 Rust 和 V8 引擎构建,在运行时层面已经做了大量性能优化,但实际项目中如果不理解其底层机制,很容易写出低效的代码。Deno 的性能优化涉及启动加速、运行时调优、内存管理、并发模型和 I/O 处理等多个维度,掌握这些技巧是面试中的常见考察点。Deno 的性能瓶颈通常出现在哪些环节?Deno 应用的性能瓶颈主要集中在三个环节:启动阶段的全量依赖加载、运行时的同步阻塞操作、以及内存中的大对象持有。面试中回答这个问题时,需要结合 Deno 的架构特点来分析——Deno 的 Rust 核心(crate deno_core)通过 opcall 机制与 V8 通信,每次 opcall 的开销在 2.x 版本中已优化到纳秒级,但频繁的跨边界调用仍然是性能敏感场景需要关注的重点。如何优化 Deno 的启动性能?Deno 的冷启动耗时主要花在模块解析和 TypeScript 编译上。优化启动性能的核心思路是减少启动时需要解析和编译的代码量。动态导入替代顶层全量加载:将非必要依赖改为按需加载,避免启动时解析整个依赖树。// 顶层全量加载:启动时解析所有依赖import { heavyProcessor } from "./heavy-processor.ts";// 动态导入:只在需要时加载async function processData(data: unknown) { const { heavyProcessor } = await import("./heavy-processor.ts"); return heavyProcessor(data);}利用 Deno 的编译缓存:Deno 会将编译结果缓存在 DENO_DIR 目录中(默认 ~/.cache/deno),二次启动直接读取缓存。在 CI/CD 环境中,可以通过挂载持久化的缓存目录来避免每次构建都重新编译。使用 deno compile 生成单文件可执行文件:对于部署场景,deno compile 将运行时和应用代码打包成单文件,省去运行时的模块解析和编译开销,启动速度可提升 30% 以上。V8 Snapshot 机制:Deno 在构建时就通过 V8 Snapshot 预先序列化了内置 API 的初始化状态,这意味着 Deno.readFile、fetch 等全局 API 在启动时无需重新初始化。自定义应用也可以利用 deno_core 的 snapshot 功能来预初始化重度依赖。Deno 运行时有哪些关键的性能优化手段?运行时优化的核心是减少不必要的计算开销和跨 Rust/V8 边界的调用次数。减少 opcall 频率:Deno 通过 opcall(类似系统调用的机制)在 V8 和 Rust 之间通信。虽然 2.x 版本已经将单次 opcall 开销从早期的微秒级降到纳秒级(约 40ns),但在高频调用场景下仍然需要注意。例如,逐行读取文件比一次性读取产生更多的 opcall:// 低效:多次 opcallconst file = await Deno.open("large.txt");const buf = new Uint8Array(1024);while (await file.read(buf)) { processChunk(buf);}// 高效:一次 opcall 读取全部内容const content = await Deno.readTextFile("large.txt");processContent(content);使用 Web 标准 API 而非 Deno 特有 API:Deno 的 Web 标准 API(如 fetch、ReadableStream、TextEncoder)经过高度优化,优先使用这些 API 可以获得更好的性能和可移植性。TypeScript 运行时类型检查优化:Deno 默认在开发模式下进行类型检查,但类型检查是 CPU 密集型操作。在生产环境中使用 --no-check 标志跳过类型检查,可显著减少启动和热重载时间:deno run --no-check app.ts选择高效的数据结构:在频繁查找场景下,Map 和 Set 的性能优于普通对象,因为 V8 对 Map/Set 有专门的优化路径。如何处理 Deno 中的内存优化?Deno 的内存管理依赖 V8 的垃圾回收器,但应用层面的不当使用仍会导致内存泄漏和 GC 压力过大。及时释放资源:Deno 的资源表(Resource Table)维护着所有打开的文件、网络连接等资源。未关闭的资源会持续占用文件描述符和内存,且资源表本身也会增长。始终在 finally 块中关闭资源:const file = await Deno.open("data.txt");try { const content = await Deno.readAll(file); return processContent(content);} finally { file.close(); // 确保资源从资源表中移除}流式处理替代全量加载:处理大文件时,使用 ReadableStream 进行流式处理,避免将整个文件加载到内存:async function processLargeFile(path: string) { const file = await Deno.open(path); const stream = file.readable .pipeThrough(new TextDecoderStream()) .pipeThrough(new TransformStream({ transform(chunk, controller) { controller.enqueue(processChunk(chunk)); } })); for await (const result of stream) { handleResult(result); }}WeakRef 和 FinalizationRegistry:对于缓存场景,使用 WeakRef 避免缓存对象阻止 GC 回收,配合 FinalizationRegistry 在对象被回收时清理关联资源。控制内存上限:通过 --v8-flags=--max-old-space-size=4096 限制 V8 堆内存上限,防止内存泄漏导致进程 OOM。同时可以通过 Deno.memoryUsage() 监控内存使用情况。Deno 的并发模型如何影响性能?Deno 的并发模型基于 V8 的事件循环,单线程内通过异步 I/O 实现非阻塞。对于 CPU 密集型任务,Deno 提供了 Web Worker 和 Deno.UnsafelyUseFinalizationRegistry 等机制。Web Worker 并行处理 CPU 密集型任务:// main.tsconst worker = new Worker( new URL("./worker.ts", import.meta.url), { type: "module" });worker.postMessage({ data: largeDataset });worker.onmessage = (e) => { console.log("Result:", e.data);};// worker.tsself.onmessage = (e) => { const result = heavyComputation(e.data); self.postMessage(result);};并发控制:高并发场景下,无限制地发起异步操作会导致事件循环压力过大、文件描述符耗尽。需要实现并发控制器:async function parallelLimit<T>( tasks: (() => Promise<T>)[], limit: number): Promise<T[]> { const results: T[] = []; const executing = new Set<Promise<void>>(); for (const [index, task] of tasks.entries()) { const p = task().then((result) => { results[index] = result; executing.delete(p); }); executing.add(p); results[index] = undefined as T; if (executing.size >= limit) { await Promise.race(executing); } } await Promise.all(executing); return results;}Deno.serve 的并发性能:Deno 2.x 中 Deno.serve(或标准库的 serve)是构建 HTTP 服务的主要 API,它基于 Rust 的 hyper 库实现,性能接近原生 HTTP 服务器。基准测试中,Deno 2.x 的 HTTP 吞吐量约 78k req/s(简单 JSON 响应),显著优于 Node.js 22 的约 65k req/s。Deno 的 I/O 性能如何优化?I/O 是大多数 Web 应用的核心瓶颈。Deno 的 I/O 操作通过 Rust 异步运行时(tokio)实现,天然支持非阻塞 I/O。始终使用异步 I/O:Deno 中同步 I/O API(如 Deno.readTextFileSync)会阻塞事件循环,仅适用于初始化阶段或 CLI 工具。生产环境必须使用异步版本。利用 HTTP/2:Deno 的 Deno.serve 支持 HTTP/2(通过 ALPN 协商),多路复用特性可以在单个连接上并行传输多个请求,减少连接建立开销:Deno.serve({ port: 8000, handler: async (req) => { return new Response(JSON.stringify({ status: "ok" }), { headers: { "Content-Type": "application/json" }, }); }, alpnProtocols: ["h2", "http/1.1"],});响应压缩:对响应体进行 gzip 或 brotli 压缩,显著减少传输体积。Deno 标准库提供了压缩工具:import { gzip } from "https://deno.land/std@0.224.0/encoding/gzip.ts";Deno.serve({ port: 8000, handler: async (req) => { const data = JSON.stringify(await getLargeDataset()); const acceptEncoding = req.headers.get("accept-encoding") ?? ""; if (acceptEncoding.includes("gzip")) { const compressed = await gzip(new TextEncoder().encode(data)); return new Response(compressed, { headers: { "Content-Encoding": "gzip", "Content-Type": "application/json", }, }); } return new Response(data, { headers: { "Content-Type": "application/json" }, }); },});KV 存储优化:Deno KV 是内置的键值数据库,基于 SQLite 实现。在高写入场景下,使用原子事务批量写入,减少事务开销:const kv = await Deno.openKv();// 批量原子写入const atomic = kv.atomic();for (const entry of entries) { atomic.set(entry.key, entry.value);}await atomic.commit();如何监控和诊断 Deno 的性能问题?性能优化的前提是准确测量。Deno 提供了多种监控和诊断手段。Deno.memoryUsage():实时获取 RSS、堆内存使用量等指标,用于内存监控和泄漏检测:const mem = Deno.memoryUsage();console.log(`RSS: ${(mem.rss / 1024 / 1024).toFixed(2)} MB`);console.log(`Heap: ${(mem.heapUsed / 1024 / 1024).toFixed(2)} / ${(mem.heapTotal / 1024 / 1024).toFixed(2)} MB`);性能测量 API:使用 Web 标准的 performance.now() 和 PerformanceObserver 进行精确计时:const start = performance.now();await someOperation();const duration = performance.now() - start;console.log(`Operation took ${duration.toFixed(2)}ms`);V8 内置分析器:通过 --v8-flags 启用 V8 的 CPU 分析器和堆分析器:# CPU 分析deno run --v8-flags=--prof app.ts# 堆快照deno run --v8-flags=--heap-prof app.tsDeno 命令行工具:deno bench 用于基准测试,deno task 用于任务编排。基准测试示例:// bench.tsDeno.bench("JSON parse 10k items", () => { JSON.parse(largeJsonString);});Deno.bench("Map vs Object lookup", () => { userMap.get("key-9999");});权限对性能的影响:Deno 的安全模型要求运行时检查权限,在高频 I/O 场景下会产生可测量的开销。如果确认环境安全,可以在生产环境中通过 --allow-all 减少权限检查开销,但需权衡安全性和性能。掌握以上优化技巧,需要理解一个核心原则:Deno 的性能优势来自 Rust 层的高效实现和 V8 的 JIT 优化,应用层的优化应围绕减少不必要的跨层调用、避免阻塞事件循环、合理管理内存和资源展开。面试中回答 Deno 性能优化问题时,结合底层原理和实际场景给出具体方案,比罗列通用优化技巧更有说服力。
服务端阅读 05月28日 06:55

如何在 MCP 中实现会话管理和上下文维护?

MCP 的会话管理和上下文维护直接影响多轮对话的连贯性和服务端的可扩展性。面试中这道题考察的是你对 MCP 协议层的理解深度,而不只是写一个内存字典。核心答案MCP 会话管理分三个层级:无状态会话(每次请求携带完整上下文)、服务端托管会话(服务器维护状态,客户端持 session ID)、客户端托管会话(上下文存储在客户端,随请求发出)。选择哪种取决于你的部署架构——单机用服务端托管最简单,K8s 水平扩展优先无状态或客户端托管。上下文维护的关键在于:会话生命周期管理(创建/恢复/迁移/销毁)、上下文窗口的裁剪与摘要、以及状态转换的合法性校验。下面逐个拆解。MCP 协议层的会话机制MCP 基于 JSON-RPC 2.0 通信,传输层支持 Stdio(本地进程)和 HTTP+SSE(远程服务)。每个 client-server 连接是一对一的,客户端负责会话的完整生命周期——包括超时、重连和关闭。2025 年引入 Streamable HTTP 后,MCP 服务可以部署为远程服务,但有状态会话在水平扩展时暴露了问题:服务端内存中的 session 导致负载均衡必须使用 sticky routing,跨 Pod 部署需要 Redis 做会话映射。这也是 2026 MCP 规范重点解决的架构问题。会话创建与生命周期import uuidimport jsonimport osfrom datetime import datetime, timedeltafrom typing import Dict, List, Optional, Anyfrom enum import Enumfrom collections import defaultdictclass SessionState(Enum): ACTIVE = "active" IDLE = "idle" SUSPENDED = "suspended" CLOSED = "closed"class Session: def __init__(self, session_id: str, initial_context: dict = None): self.session_id = session_id self.context = initial_context or {} self.state = SessionState.ACTIVE self.created_at = datetime.now() self.last_activity = datetime.now() self.message_history: List[dict] = [] self.metadata: dict = {} def to_dict(self) -> dict: return { "session_id": self.session_id, "context": self.context, "state": self.state.value, "created_at": self.created_at.isoformat(), "last_activity": self.last_activity.isoformat(), "message_history": self.message_history, "metadata": self.metadata, } @classmethod def from_dict(cls, data: dict) -> "Session": session = cls(data["session_id"], data.get("context", {})) session.state = SessionState(data["state"]) session.created_at = datetime.fromisoformat(data["created_at"]) session.last_activity = datetime.fromisoformat(data["last_activity"]) session.message_history = data.get("message_history", []) session.metadata = data.get("metadata", {}) return sessionclass SessionManager: def __init__(self, timeout_seconds: int = 3600): self.sessions: Dict[str, Session] = {} self.session_timeout = timeout_seconds def create_session(self, initial_context: dict = None) -> str: session_id = str(uuid.uuid4()) self.sessions[session_id] = Session(session_id, initial_context) return session_id def get_session(self, session_id: str) -> Optional[Session]: session = self.sessions.get(session_id) if not session: return None if self._is_expired(session): self.close_session(session_id) return None return session def update_session(self, session_id: str, **updates) -> None: session = self.get_session(session_id) if not session: raise ValueError(f"Session {session_id} does not exist or expired") for key, value in updates.items(): if hasattr(session, key): setattr(session, key, value) session.last_activity = datetime.now() def close_session(self, session_id: str) -> None: session = self.sessions.pop(session_id, None) if session: session.state = SessionState.CLOSED def _is_expired(self, session: Session) -> bool: elapsed = (datetime.now() - session.last_activity).total_seconds() return elapsed > self.session_timeout def cleanup_expired(self) -> int: expired_ids = [ sid for sid, s in self.sessions.items() if self._is_expired(s) ] for sid in expired_ids: self.close_session(sid) return len(expired_ids)这里和简单字典实现的关键区别:Session 是独立实体,支持序列化/反序列化,为后续持久化和迁移打基础。cleanup_expired 是生产环境必须的定时清理逻辑。上下文管理器class ContextManager: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager def set_context(self, session_id: str, key: str, value: Any) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context[key] = value session.last_activity = datetime.now() def get_context(self, session_id: str, key: str, default: Any = None) -> Any: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") return session.context.get(key, default) def update_context(self, session_id: str, updates: dict) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context.update(updates) session.last_activity = datetime.now() def clear_context(self, session_id: str) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") session.context = {} session.last_activity = datetime.now() def get_context_size(self, session_id: str) -> int: session = self.session_manager.get_session(session_id) if not session: return 0 return len(json.dumps(session.context, ensure_ascii=False).encode("utf-8")) def trim_context(self, session_id: str, max_bytes: int = 8192) -> dict: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") trimmed = {} current_size = 0 for key in reversed(list(session.context.keys())): value_json = json.dumps({key: session.context[key]}, ensure_ascii=False) entry_size = len(value_json.encode("utf-8")) if current_size + entry_size > max_bytes: break trimmed[key] = session.context[key] current_size += entry_size removed_keys = set(session.context.keys()) - set(trimmed.keys()) session.context = trimmed session.last_activity = datetime.now() return {"removed_keys": list(removed_keys), "remaining_bytes": current_size}trim_context 是上下文维护中容易被忽略但生产必须的逻辑——LLM 的上下文窗口有限,当上下文膨胀时需要裁剪。简单的 LRU 策略优先丢弃最旧的数据。消息历史管理class MessageHistoryManager: def __init__(self, session_manager: SessionManager, max_history: int = 100): self.session_manager = session_manager self.max_history = max_history def add_message( self, session_id: str, role: str, content: str, metadata: dict = None ) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") message = { "role": role, "content": content, "timestamp": datetime.now().isoformat(), "metadata": metadata or {}, } session.message_history.append(message) if len(session.message_history) > self.max_history: session.message_history = session.message_history[-self.max_history:] session.last_activity = datetime.now() def get_history(self, session_id: str, limit: int = None) -> List[dict]: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") history = session.message_history return history[-limit:] if limit else history def get_conversation_summary(self, session_id: str) -> dict: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") history = session.message_history return { "total_messages": len(history), "user_messages": sum(1 for m in history if m["role"] == "user"), "assistant_messages": sum(1 for m in history if m["role"] == "assistant"), "first_message_time": history[0]["timestamp"] if history else None, "last_message_time": history[-1]["timestamp"] if history else None, } def summarize_history(self, session_id: str, keep_recent: int = 10) -> str: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") if len(session.message_history) <= keep_recent: return "" old_messages = session.message_history[:-keep_recent] summary_parts = [] for msg in old_messages: summary_parts.append(f"[{msg['role']}]: {msg['content'][:100]}") summary = "\n".join(summary_parts) session.message_history = [ { "role": "system", "content": f"Earlier conversation summary:\n{summary}", "timestamp": old_messages[-1]["timestamp"], "metadata": {"type": "summary"}, } ] + session.message_history[-keep_recent:] session.last_activity = datetime.now() return summarysummarize_history 解决的是长对话场景下的上下文窗口溢出问题。保留最近消息的同时将早期内容压缩为摘要,这是实际 MCP 应用中常见的需求——特别是在对话持续几十轮的情况下。状态机管理class StateMachine: TRANSITIONS = { SessionState.ACTIVE: [SessionState.IDLE, SessionState.SUSPENDED, SessionState.CLOSED], SessionState.IDLE: [SessionState.ACTIVE, SessionState.CLOSED], SessionState.SUSPENDED: [SessionState.ACTIVE, SessionState.CLOSED], SessionState.CLOSED: [], } def __init__(self, session_manager: SessionManager): self.session_manager = session_manager def transition(self, session_id: str, new_state: SessionState) -> None: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") current = session.state if new_state not in self.TRANSITIONS.get(current, []): raise ValueError( f"Invalid transition: {current.value} -> {new_state.value}" ) session.state = new_state session.last_activity = datetime.now() def get_state(self, session_id: str) -> SessionState: session = self.session_manager.get_session(session_id) if not session: raise ValueError("Session does not exist") return session.state def can_transition(self, session_id: str, new_state: SessionState) -> bool: session = self.session_manager.get_session(session_id) if not session: return False return new_state in self.TRANSITIONS.get(session.state, [])状态机保证会话不会从 CLOSED 状态复活,也不会跳过中间状态直接 SUSPENDED。这在多实例部署时尤其重要——状态转换必须是原子的、可审计的。会话持久化内存存储只适合开发和单机部署。生产环境必须持久化,否则进程重启所有会话丢失。class SessionPersistence: def __init__(self, storage_path: str): self.storage_path = storage_path os.makedirs(storage_path, exist_ok=True) def save_session(self, session: Session) -> None: file_path = os.path.join(self.storage_path, f"{session.session_id}.json") with open(file_path, "w", encoding="utf-8") as f: json.dump(session.to_dict(), f, indent=2, ensure_ascii=False) def load_session(self, session_id: str) -> Optional[Session]: file_path = os.path.join(self.storage_path, f"{session_id}.json") if not os.path.exists(file_path): return None with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) return Session.from_dict(data) def delete_session(self, session_id: str) -> None: file_path = os.path.join(self.storage_path, f"{session_id}.json") if os.path.exists(file_path): os.remove(file_path) def list_sessions(self) -> List[str]: return [ f[:-5] for f in os.listdir(self.storage_path) if f.endswith(".json") ]class RedisSessionPersistence: def __init__(self, redis_url: str = "redis://localhost:6379", key_prefix: str = "mcp:session:"): import redis self.client = redis.from_url(redis_url) self.key_prefix = key_prefix self.default_ttl = 3600 def save_session(self, session: Session, ttl: int = None) -> None: key = f"{self.key_prefix}{session.session_id}" data = json.dumps(session.to_dict(), ensure_ascii=False) self.client.setex(key, ttl or self.default_ttl, data) def load_session(self, session_id: str) -> Optional[Session]: key = f"{self.key_prefix}{session_id}" data = self.client.get(key) if not data: return None return Session.from_dict(json.loads(data)) def delete_session(self, session_id: str) -> None: key = f"{self.key_prefix}{session_id}" self.client.delete(key) def extend_ttl(self, session_id: str, ttl: int = None) -> None: key = f"{self.key_prefix}{session_id}" self.client.expire(key, ttl or self.default_ttl)Redis 方案解决的是水平扩展场景:多个 MCP 服务实例共享同一个 Redis,任何实例都能读取任意会话的状态。TTL 自动过期比定时清理更可靠。extend_ttl 在用户活跃时续期,实现"空闲超时"而非"绝对超时"。会话监控与指标class SessionAnalytics: def __init__(self, session_manager: SessionManager): self.session_manager = session_manager self.metrics: Dict[str, int] = defaultdict(int) self.event_log: List[dict] = [] def track_event(self, event_type: str, session_id: str = None, detail: str = None) -> None: self.metrics[event_type] += 1 self.event_log.append({ "event_type": event_type, "session_id": session_id, "detail": detail, "timestamp": datetime.now().isoformat(), }) def get_metrics(self) -> dict: return dict(self.metrics) def get_session_statistics(self) -> dict: sessions = self.session_manager.sessions if not sessions: return { "total_sessions": 0, "active_sessions": 0, "idle_sessions": 0, "average_duration_seconds": 0.0, "total_messages": 0, } durations = [] active_count = 0 idle_count = 0 total_messages = 0 for session in sessions.values(): duration = (session.last_activity - session.created_at).total_seconds() durations.append(duration) total_messages += len(session.message_history) if session.state == SessionState.ACTIVE: active_count += 1 elif session.state == SessionState.IDLE: idle_count += 1 return { "total_sessions": len(sessions), "active_sessions": active_count, "idle_sessions": idle_count, "average_duration_seconds": sum(durations) / len(durations), "total_messages": total_messages, } def get_recent_events(self, limit: int = 50) -> List[dict]: return self.event_log[-limit:]监控不只是统计数字。event_log 记录每一次状态变更,可以用来排查"会话为什么丢失"这类线上问题。生产部署的三种会话模型2026 MCP 规范明确提出了三种会话模型,选择取决于架构:无状态模型:每次请求客户端发送完整上下文。负载均衡最友好,任何实例都能处理任何请求。缺点是网络开销大,上下文窗口大时不实际。服务端托管模型:服务端维护状态,客户端只持 session ID。实现最简单,但水平扩展需要 sticky routing 或共享存储(Redis)。适合中小规模部署。客户端托管模型:上下文存在客户端,随请求发出。结合无状态服务端,两者优势兼得——服务端无状态可水平扩展,客户端有完整控制权。缺点是客户端逻辑更复杂。实际选择建议:如果 MCP 服务只部署 1-2 个实例,服务端托管 + 文件持久化足够。如果需要 K8s 弹性伸缩,客户端托管或无状态模型更合适。Redis 持久化是折中方案——服务端托管但状态外置到 Redis。追问:MCP 会话和普通 HTTP Session 有什么区别?MCP 会话不是 HTTP Session。HTTP Session 基于浏览器 Cookie 机制,而 MCP 会话是 JSON-RPC 层面的连接生命周期管理。MCP 客户端在一个会话内可以发起多次请求/通知,会话 ID 关联的是对话上下文而非浏览器状态。此外,MCP 会话需要维护消息历史用于 LLM 的上下文窗口,这是普通 HTTP Session 不需要考虑的。
服务端阅读 05月28日 06:54

如何实现一个 MCP 服务器?有哪些最佳实践?

MCP 服务器的核心概念MCP(Model Context Protocol)是一个开放协议,让大语言模型能够通过标准化接口连接外部数据源和工具。理解服务器实现之前,需要先厘清三个核心概念:Tool(工具):服务器暴露给客户端的可调用函数,接受结构化参数并返回结果Resource(资源):服务器提供的可读数据源,客户端可以按 URI 访问Prompt(提示模板):服务器预设的提示词模板,客户端可以复用服务器与客户端之间通过 Capability 协商 建立连接——客户端在初始化时声明自己支持哪些能力,服务器也声明自己提供哪些能力,双方只使用共同支持的功能。选择传输方式MCP 支持两种传输协议,选择哪种直接影响部署架构:Stdio 传输:客户端以子进程方式启动服务器,通过标准输入/输出通信。适合本地工具集成,延迟低,无需处理网络和认证问题。Claude Desktop 和 Cursor 的本地服务器都走这条路径。Streamable HTTP 传输:服务器以独立 HTTP 服务运行,客户端通过 POST 请求通信。适合远程部署、多客户端共享、需要认证的场景。注意,旧版基于 SSE 的 HTTP 传输已被废弃,新项目应直接使用 Streamable HTTP。# Stdio 模式启动mcp.run(transport="stdio")# HTTP 模式启动mcp.run(transport="streamable-http", host="0.0.0.0", port=8080)用 Python 实现一个 MCP 服务器Python SDK(mcp 包)提供了 FastMCP 高阶 API,可以快速搭建服务器。安装依赖pip install "mcp[cli]"定义工具工具是 MCP 服务器的核心能力。每个工具需要声明名称、描述和参数 Schema:from mcp.server.fastmcp import FastMCPmcp = FastMCP("my-server")@mcp.tool()def search_docs(query: str, limit: int = 5) -> str: """在文档库中搜索相关内容,返回匹配结果摘要""" # 实际场景中对接搜索引擎或向量数据库 results = do_search(query, limit) return format_results(results)关键要点:工具函数的 docstring 会成为客户端看到的工具描述,直接影响模型能否正确调用,务必写得准确具体。参数类型注解会自动转换为 JSON Schema。定义资源资源用于暴露可读数据,客户端通过 URI 主动拉取:@mcp.resource("config://app/settings")def get_settings() -> str: """返回当前应用配置""" return json.dumps(load_settings(), ensure_ascii=False)@mcp.resource("data://users/{user_id}/profile")def get_user_profile(user_id: str) -> str: """按用户 ID 返回用户资料""" profile = fetch_profile(user_id) return json.dumps(profile, ensure_ascii=False)URI 模板支持路径参数(如 {user_id}),SDK 会自动解析并传入函数。定义提示模板提示模板是预设的 Prompt,客户端可以按名称获取:@mcp.prompt()def code_review(code: str) -> str: """代码审查提示模板""" return f"请审查以下代码,关注安全性和性能问题:\n\n```\n{code}\n```"启动服务器if __name__ == "__main__": mcp.run()mcp run 命令默认使用 stdio 传输。也可通过 CLI 指定:mcp run server.py --transport streamable-http用 TypeScript 实现一个 MCP 服务器TypeScript SDK 是官方维护的首选 SDK,生态最完整:import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";import { z } from "zod";const server = new McpServer({ name: "my-server", version: "1.0.0",});server.tool( "search_docs", "在文档库中搜索相关内容",{ query: z.string(), limit: z.number().default(5) },async ({ query, limit }) => { const results = await doSearch(query, limit); return { content: [{ type: "text", text: formatResults(results) }] }; });const transport = new StdioServerTransport();await server.connect(transport);TypeScript SDK 使用 zod 定义参数 Schema,类型安全且自动生成 JSON Schema。生产环境的最佳实践错误处理不要让异常穿透到客户端变成不友好的报错。在每个工具函数中做防御性处理:@mcp.tool()def query_database(sql: str) -> str: """执行只读 SQL 查询""" try: if not sql.strip().upper().startswith("SELECT"): return "错误:仅允许 SELECT 查询" result = execute_readonly_query(sql) return json.dumps(result, ensure_ascii=False) except ConnectionError: return "错误:数据库连接失败,请稍后重试" except Exception as e: return f"查询执行出错:{type(e).__name__}"注意上面捕获了具体异常类型,对用户屏蔽了内部实现细节。输入验证永远不要信任客户端传入的参数,即使模型通常不会构造恶意输入:@mcp.tool()def run_command(cmd: str) -> str: """执行预定义的系统命令(仅允许白名单命令)""" # 不要直接拼接执行用户输入! allowed = {"list_files", "check_status", "get_version"} if cmd not in allowed: return f"错误:不支持的命令,允许的命令:{', '.join(allowed)}" return execute_safe(cmd)认证与安全远程部署的 MCP 服务器必须实现认证。2026 协议推荐使用 OAuth 2.1:为每个客户端颁发独立的 access token,绑定最小权限 scope工具级别的权限控制:不同 scope 对应不同的工具访问权限传输层强制 HTTPS,禁止明文 HTTP本地 stdio 服务器依赖操作系统用户隔离,不需要额外认证异步与性能工具函数如果涉及 I/O 操作(网络请求、文件读写、数据库查询),应该使用异步实现:@mcp.tool()async def fetch_url(url: str) -> str: """获取 URL 内容""" async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(url) return resp.text[:5000] # 限制返回长度其他性能策略:对频繁调用的工具实现结果缓存,避免重复计算为耗时操作设置超时,防止客户端无限等待大量数据分页返回,不要一次吐出全部内容日志与监控生产服务器必须有可观测性:import logginglogger = logging.getLogger("mcp-server")@mcp.tool()def search_docs(query: str) -> str: logger.info(f"search_docs called: query={query!r}") result = do_search(query) logger.info(f"search_docs returned {len(result)} results") return result建议记录:每次工具调用的入参摘要、执行耗时、异常信息。远程服务器还应实现健康检查端点供负载均衡器探活。常见陷阱在工具中执行任意代码:绝不能用 eval()、exec() 或直接拼接 shell 命令。这是最严重的安全漏洞,恶意输入可以完全控制服务器。忽略 Capability 协商:服务器声明的能力必须与实际实现一致。声明了 tools capability 却没有注册任何工具,或者工具描述与实际行为不符,都会导致客户端调用失败。过度暴露工具:一个服务器注册几十个工具会占用客户端大量上下文窗口。按功能域拆分成多个独立服务器,每个只暴露相关工具,客户端按需连接。同步阻塞:在异步框架中使用同步的阻塞 I/O 会拖慢整个服务器。使用 asyncio.to_thread 将同步调用包装为异步,或直接使用异步库。面试追问Q:MCP 的 Stdio 传输和 HTTP 传输分别适合什么场景?Stdio 适合本地单客户端场景,客户端以子进程方式启动服务器,零配置、低延迟、无需认证。HTTP 适合远程部署、多客户端共享、需要认证和网关管理的生产环境。2026 协议推荐新项目使用 Streamable HTTP 替代已废弃的 SSE 传输。Q:如何在 MCP 服务器中防止工具被滥用?三层防线:输入验证拒绝非法参数,OAuth scope 控制工具访问权限,速率限制防止高频调用。核心原则是永远不信任客户端输入,白名单优于黑名单,最小权限优于便利性。Q:一个 MCP 服务器应该注册多少个工具?建议控制在 10 个以内。工具过多会占用客户端上下文窗口,影响模型选择准确性。按功能域拆分多个服务器,客户端按需连接,比一个臃肿服务器更易维护。
服务端阅读 05月28日 06:53

CSRF 防护的性能影响有哪些,如何进行优化?

CSRF 防护在生产环境中确实会引入性能开销,但合理的架构设计可以在安全与性能之间取得平衡。理解开销来源并采用分层优化策略,是高并发场景下的关键能力。CSRF 防护的性能开销来源Token 生成与验证CSRF Token 的生成依赖加密安全随机数生成器(CSPRNG)。以 Node.js 为例,crypto.randomBytes(32) 单次调用约 0.02ms,万次批量生成耗时约 234ms,即每秒可生成约 4 万个 Token。单次开销极低,但在 QPS 超过 5000 的高并发场景下,Token 生成会成为不可忽视的 CPU 消耗点。Token 验证的开销取决于存储方式。纯字符串比对耗时微乎其微,但涉及数据库查询时,每次验证需要 10-50ms 的 I/O 延迟。在请求量大的写接口上,这意味着数据库连接池容易被占满。会话加载与存储访问传统 CSRF 防护要求在每个状态变更请求中加载会话,验证 Token 是否匹配。Spring Security 5 及更早版本默认在每次请求时加载 CsrfToken,即使该请求不需要 CSRF 校验(如 GET 请求)。Spring Security 6 已改为延迟加载(Deferred CsrfToken),仅在需要时才访问会话存储,显著降低了不必要的开销。页面缓存失效CSRF Token 是用户级别且动态生成的,包含 Token 的页面无法被 CDN 或反向缓存。这是常被忽视的性能影响——一个本可以命中缓存的页面,因为嵌入了 CSRF Token 而必须回源渲染。Cloudflare 的技术分析指出,CSRF Token 是页面级缓存最大的阻碍之一,尤其对于包含表单的页面。核心优化策略策略一:选择合适的 Token 存储方案| 存储方式 | 读延迟 | 写延迟 | 扩展性 | 适用场景 ||---------|--------|--------|--------|---------|| 内存 | <1ms | <1ms | 低 | 单实例、低流量 || Redis | 1-5ms | 1-5ms | 高 | 分布式系统、高流量 || 数据库 | 10-50ms | 10-50ms | 中 | 简单应用、低流量 |生产环境推荐 Redis + 本地内存二级缓存。本地缓存命中时延迟 <0.1ms,未命中时回退到 Redis,兼顾性能与分布式一致性:class CachedTokenService { constructor(redisClient) { this.redis = redisClient; this.localCache = new Map(); this.localTTL = 300000; // 5 分钟本地缓存 this.maxLocalSize = 10000; } async getToken(userId) { // L1: 本地内存 const cached = this.localCache.get(userId); if (cached && Date.now() - cached.ts < this.localTTL) { return cached.token; } // L2: Redis const redisToken = await this.redis.get(`csrf:${userId}`); if (redisToken) { this._setLocal(userId, redisToken); return redisToken; } // L3: 生成新 Token const token = crypto.randomBytes(32).toString('hex'); await this.redis.setex(`csrf:${userId}`, 3600, token); this._setLocal(userId, token); return token; } _setLocal(userId, token) { if (this.localCache.size >= this.maxLocalSize) { const oldest = this.localCache.keys().next().value; this.localCache.delete(oldest); } this.localCache.set(userId, { token, ts: Date.now() }); }}策略二:使用 HMAC 无状态 Token 消除存储开销传统 Token 需要服务端存储,每次验证都访问存储层。HMAC-based Token 将签名嵌入 Token 本身,验证时只需重新计算签名比对,无需任何存储访问:const crypto = require('crypto');class HMACTokenService { constructor(secret) { this.secret = secret; } // 生成:sessionId + 时间戳 + HMAC签名 generate(sessionId) { const timestamp = Math.floor(Date.now() / 3600000); // 按小时粒度 const payload = `${sessionId}:${timestamp}`; const signature = crypto .createHmac('sha256', this.secret) .update(payload) .digest('hex'); return `${payload}:${signature}`; } // 验证:重新计算签名比对,无需存储 validate(token, sessionId) { const [sid, timestamp, signature] = token.split(':'); if (sid !== sessionId) return false; const payload = `${sid}:${timestamp}`; const expected = crypto .createHmac('sha256', this.secret) .update(payload) .digest('hex'); // 检查当前小时和上一个小时的签名,容忍 Token 在小时边界附近生成 const currentHour = Math.floor(Date.now() / 3600000); if (parseInt(timestamp) < currentHour - 1) return false; return crypto.timingSafeEqual( Buffer.from(signature), Buffer.from(expected) ); }}HMAC Token 的优势在于验证延迟从 1-50ms(存储访问)降到 <0.1ms(纯计算),且无需维护 Token 存储的过期清理。Spring Security 的 CookieCsrfTokenRepository 和 Django 的 django.middleware.csrf 都支持类似的签名验证模式。缺点是 Token 无法主动撤销,需依赖较短的有效期。策略三:避免不必要的 Token 生成关键原则:安全请求(GET、HEAD、OPTIONS)不需要 CSRF Token 验证。确保框架配置仅对状态变更请求启用校验。Spring Security 6 的 CsrfToken 延迟加载机制值得借鉴——Token 对象在请求处理链中懒初始化,仅当实际读取或校验时才触发生成和存储访问。Django 的 {% csrf_token %} 模板标签也有类似优化——仅在模板实际渲染该标签时才从会话中读取或生成 Token,GET 请求访问不含表单的页面时完全不触发 Token 逻辑。策略四:解耦页面缓存与 Token 渲染将 CSRF Token 从缓存的 HTML 中剥离,通过独立接口按需获取:<!-- 可被 CDN 缓存的 HTML 模板 --><form action="/transfer" method="POST"> <input type="hidden" name="_csrf" value="" id="csrfInput"> <!-- 其他表单字段 --></form><script> // 页面加载后异步获取 Token fetch('/api/csrf-token', { credentials: 'same-origin' }) .then(r => r.json()) .then(data => { document.getElementById('csrfInput').value = data.token; });</script>这样页面主体可以被 CDN 缓存,Token 通过轻量 API 单独获取。另一种方案是使用 ESI(Edge Side Includes),在 CDN 边缘节点将 Token 片段注入缓存的页面,Nginx 和 Varnish 均支持此特性。策略五:优先采用免 Token 方案现代浏览器提供了不依赖 Token 的 CSRF 防护手段,可以显著降低服务端开销:SameSite Cookie 属性:将 Cookie 设置为 SameSite=Strict 或 SameSite=Lax,浏览器自动阻止跨站请求携带 Cookie。Chrome 80+ 默认 SameSite=Lax,覆盖了大部分 CSRF 攻击场景。这是目前成本最低的防护方式,OWASP 将其列为推荐的 CSRF 防御手段之一。Fetch Metadata 请求头:Chrome 和 Firefox 支持的 Sec-Fetch-Site、Sec-Fetch-Mode 等头,服务端可据此判断请求来源:function isSafeRequest(req) { const site = req.headers['sec-fetch-site']; const mode = req.headers['sec-fetch-mode']; if (site === 'same-origin') return true; if (site === 'none' && mode === 'navigate') return true; return false;}Origin / Referer 头校验:对于有 Origin 头的请求,验证其值是否在白名单内。这种方法无需 Token 存储,开销几乎为零。组合策略实践:生产环境推荐 SameSite Cookie 作为基础防护层,对安全等级更高的操作(如支付、转账)叠加 Token 验证。这样大部分普通请求的 CSRF 防护零开销,仅关键路径承担 Token 成本。性能监控指标生产环境需关注以下 CSRF 相关指标:Token 生成耗时:P99 应 <5ms,超出则检查 CSPRNG 实现Token 验证耗时:含存储访问时 P99 应 <10ms,HMAC 模式应 <0.5ms缓存命中率:本地缓存命中率 >90% 为健康,低于 70% 需扩大缓存容量会话加载频率:对比总请求数与 CSRF 校验请求数,比值过高说明延迟加载未生效Token 长度与安全性的平衡| 配置 | 长度 | 熵 | 性能 | 适用场景 ||-----|------|-----|------|---------|| minimal | 16 字节 | 64 bit | 最优 | 性能敏感、已有其他防护层 || balanced | 32 字节 | 128 bit | 良好 | 通用场景(推荐) || secure | 64 字节 | 256 bit | 可接受 | 安全等级最高的场景 |128 bit 熵(32 字节 hex)是绝大多数场景的最佳选择——碰撞概率可忽略,性能影响极小。追问:CSRF 防护和 CORS 是什么关系?CSRF 和 CORS 解决的是不同层面的跨域问题。CORS 控制的是浏览器是否允许读取跨域响应,CSRF 控制的是浏览器是否自动携带凭据发起跨域请求。一个请求可能被 CORS 阻止但仍然构成 CSRF 风险(如 form 表单提交不受 CORS 约束)。SameSite Cookie 同时减少了 CSRF 和 CORS 的攻击面,但不互为替代。正确做法是同时配置 CORS 白名单和 CSRF 防护,两者互补而非互斥。
前端阅读 05月28日 06:53

Puppeteer 如何与测试框架集成实现 E2E 和 CI/CD?

Puppeteer 可以与 Jest、Mocha、Vitest 等主流测试框架深度集成,完成端到端测试、视觉回归测试和性能测试,再通过 GitHub Actions、Docker 等工具接入 CI/CD 流水线。以下是生产环境中经过验证的集成方式和最佳实践。与 Jest 集成Jest 是与 Puppeteer 搭配最多的测试框架,jest-puppeteer 提供了开箱即用的预设配置。安装核心依赖:npm install --save-dev puppeteer jest jest-puppeteer @types/puppeteerjest-puppeteer 的配置文件:// jest-puppeteer.config.jsmodule.exports = { launch: { headless: 'new', args: ['--no-sandbox', '--disable-setuid-sandbox'], }, browserContext: 'incognito', exitOnPageError: true,};Jest 配置文件:// jest.config.jsmodule.exports = { preset: 'jest-puppeteer', testMatch: ['**/e2e/**/*.test.js'], setupFilesAfterEnv: ['./e2e/setup.js'], testTimeout: 30000,};setup 文件中处理每个测试的前置条件:// e2e/setup.jsbeforeEach(async () => { await page.setViewport({ width: 1280, height: 720 }); await page.goto('http://localhost:3000', { waitUntil: 'networkidle0' });});编写一个完整的登录 E2E 测试:// e2e/auth.test.jsdescribe('用户登录流程', () => { test('使用正确的凭据登录成功', async () => { await page.type('[data-testid="username"]', 'testuser'); await page.type('[data-testid="password"]', 'password123'); await page.click('[data-testid="login-btn"]'); await page.waitForSelector('[data-testid="dashboard"]'); const welcome = await page.$eval( '[data-testid="welcome-msg"]', el => el.textContent ); expect(welcome).toContain('欢迎回来'); }); test('使用错误密码登录失败', async () => { await page.type('[data-testid="username"]', 'testuser'); await page.type('[data-testid="password"]', 'wrongpassword'); await page.click('[data-testid="login-btn"]'); await page.waitForSelector('[data-testid="error-msg"]'); const error = await page.$eval( '[data-testid="error-msg"]', el => el.textContent ); expect(error).toContain('用户名或密码错误'); });});注意这里使用 data-testid 选择器而非 CSS 类名或 ID,这能让测试不依赖 UI 样式变更,提升稳定性。与 Mocha 集成Mocha 的灵活性更高,适合需要自定义测试生命周期的团队。Puppeteer 的浏览器生命周期需要手动管理。// test/setup.jsconst puppeteer = require('puppeteer');const { expect } = require('chai');let browser;before(async () => { browser = await puppeteer.launch({ headless: 'new', args: ['--no-sandbox'], });});after(async () => { await browser.close();});beforeEach(async function () { this.page = await browser.newPage(); await this.page.goto('http://localhost:3000');});afterEach(async function () { await this.page.close();});Mocha 测试用例:// test/user.spec.jsdescribe('用户注册功能', function () { this.timeout(15000); it('填写完整信息后注册成功', async function () { const { page } = this; await page.click('[data-testid="register-link"]'); await page.type('[data-testid="reg-username"]', 'newuser'); await page.type('[data-testid="reg-email"]', 'new@example.com'); await page.type('[data-testid="reg-password"]', 'StrongP@ss1'); await page.click('[data-testid="reg-submit"]'); await page.waitForSelector('[data-testid="reg-success"]'); const text = await page.$eval( '[data-testid="reg-success"]', el => el.textContent ); expect(text).to.include('注册成功'); });});Mocha 的 this.timeout() 需要显式设置,Puppeteer 测试通常需要 10-30 秒的超时时间,不要使用箭头函数,否则无法访问 Mocha 的 this 上下文。Page Object 模式无论使用 Jest 还是 Mocha,当测试用例超过 20 个时,必须引入 Page Object 模式。它把页面元素定位和操作封装成独立类,测试用例只关心业务逻辑。// pages/LoginPage.jsclass LoginPage { constructor(page) { this.page = page; this.usernameInput = '[data-testid="username"]'; this.passwordInput = '[data-testid="password"]'; this.submitBtn = '[data-testid="login-btn"]'; this.errorMessage = '[data-testid="error-msg"]'; } async login(username, password) { await this.page.type(this.usernameInput, username); await this.page.type(this.passwordInput, password); await this.page.click(this.submitBtn); } async getErrorMessage() { await this.page.waitForSelector(this.errorMessage); return this.page.$eval(this.errorMessage, el => el.textContent); }}module.exports = LoginPage;测试用例中使用 Page Object:const LoginPage = require('../pages/LoginPage');test('登录失败显示错误提示', async () => { const loginPage = new LoginPage(page); await loginPage.login('testuser', 'wrongpass'); const error = await loginPage.getErrorMessage(); expect(error).toContain('用户名或密码错误');});如果 UI 改了 data-testid 的值,只需修改 LoginPage 一处,所有引用它的测试自动更新。这就是 Page Object 的核心价值。视觉回归测试视觉回归测试能捕获 CSS 改动导致的 UI 偏移,Puppeteer 结合 Percy 或 jest-image-snapshot 可以自动完成截图对比。使用 jest-image-snapshot 的方式:npm install --save-dev jest-image-snapshotconst { toMatchImageSnapshot } = require('jest-image-snapshot');expect.extend({ toMatchImageSnapshot });test('首页视觉一致性', async () => { await page.goto('http://localhost:3000'); await page.waitForSelector('#main-content'); const screenshot = await page.screenshot({ fullPage: true }); expect(screenshot).toMatchImageSnapshot({ failureThreshold: 0.03, failureThresholdType: 'percent', });});failureThreshold 设为 3% 是比较合理的起点,太严格会导致大量误报,太宽松又漏掉真正的 UI 变化。首次运行会生成基准截图,后续运行自动对比。性能测试Puppeteer 可以通过 Chrome DevTools Protocol 采集性能指标,结合 Lighthouse 做更全面的审计。const puppeteer = require('puppeteer');test('首页核心性能指标', async () => { const browser = await puppeteer.launch(); const page = await browser.newPage(); await page.goto('http://localhost:3000', { waitUntil: 'networkidle0' }); // 采集 Core Web Vitals const metrics = await page.evaluate(() => { return new Promise(resolve => { new PerformanceObserver(list => { const entries = list.getEntries(); const lastEntry = entries[entries.length - 1]; resolve({ LCP: lastEntry.startTime, FID: 0, CLS: 0, }); }).observe({ type: 'largest-contentful-paint', buffered: true }); }); }); expect(metrics.LCP).toBeLessThan(2500); await browser.close();});性能测试中 waitUntil: 'networkidle0' 很关键,确保页面资源加载完成后再采集数据。CI/CD 集成CI/CD 是 Puppeteer 测试从本地走向生产的关键环节。主要解决三个问题:浏览器安装、无头模式运行、测试稳定性。GitHub Actions 配置:name: E2E Testson: [push, pull_request]jobs: e2e: runs-on: ubuntu-latest container: image: node:18-slim steps: - uses: actions/checkout@v4 - name: Install Chrome dependencies run: | apt-get update apt-get install -y chromium libx11-xcb1 libxcomposite1 libxdamage1 libxi6 libxtst6 libnss3 libatk1.0-0 - name: Install dependencies run: npm ci - name: Run E2E tests run: npm run test:e2e env: CI: true PUPPETEER_EXECUTABLE_PATH: /usr/bin/chromiumDocker 配置:FROM node:18-slimRUN apt-get update && apt-get install -y \ chromium \ --no-install-recommends && \ rm -rf /var/lib/apt/lists/*ENV PUPPETEER_SKIP_CHROMIUM_DOWNLOAD=trueENV PUPPETEER_EXECUTABLE_PATH=/usr/bin/chromiumWORKDIR /appCOPY package*.json ./RUN npm ciCOPY . .CMD ["npm", "run", "test:e2e"]Docker 环境下必须设置 PUPPETEER_SKIP_CHROMIUM_DOWNLOAD=true,使用系统安装的 Chromium,避免 Puppeteer 自带浏览器在容器中启动失败。--no-sandbox 参数在 Docker 中也是必需的,因为容器默认以 root 运行,Chrome 要求沙箱模式下不能是 root。测试稳定性实践Puppeteer 测试在 CI 环境中失败率较高,以下是提升稳定性的关键手段。等待策略:永远不要使用 waitForTimeout,改用显式等待。// 错误做法:硬编码等待await page.waitForTimeout(3000);// 正确做法:等待元素可见await page.waitForSelector('[data-testid="result"]', { visible: true });// 等待网络空闲await page.waitForNavigation({ waitUntil: 'networkidle2' });// 等待特定请求完成await page.waitForResponse( resp => resp.url().includes('/api/user') && resp.status() === 200);测试隔离:每个测试用例使用独立的浏览器上下文,避免 Cookie 和 Storage 污染。beforeEach(async () => { context = await browser.createIncognitoBrowserContext(); page = await context.newPage(); await page.goto('http://localhost:3000');});afterEach(async () => { await context.close();});失败截图:测试失败时自动保存截图,方便排查 CI 中的问题。afterEach(async function () { if (this.currentTest.state === 'failed') { const timestamp = Date.now(); const testName = this.currentTest.title.replace(/\s+/g, '_'); await page.screenshot({ path: `screenshots/${testName}_${timestamp}.png`, fullPage: true, }); }});重试机制:CI 环境中网络和资源加载不稳定,给 E2E 测试加一层重试。// jest.config.jsmodule.exports = { preset: 'jest-puppeteer', retryTimes: 2,};测试分层与并行执行当测试规模增长后,需要按速度和稳定性分层运行,并利用并行加速。// jest.config.jsmodule.exports = { preset: 'jest-puppeteer', projects: [ { displayName: 'smoke', testMatch: ['**/e2e/smoke/**/*.test.js'], retryTimes: 1, }, { displayName: 'critical', testMatch: ['**/e2e/critical/**/*.test.js'], retryTimes: 2, }, { displayName: 'full', testMatch: ['**/e2e/full/**/*.test.js'], retryTimes: 2, maxWorkers: 4, }, ],};smoke 测试只覆盖核心路径(登录、关键业务流程),在每次提交时运行;critical 测试覆盖主要功能,在 PR 合并时运行;full 测试覆盖所有场景,在每日构建时运行。并行执行时注意 maxWorkers 不要超过 CPU 核心数,每个 worker 会启动一个浏览器实例,过度并行反而会因资源争抢导致测试超时。从 Puppeteer 迁移到 Playwright 的考虑如果团队需要跨浏览器测试(Firefox、Safari),或者对自动等待、网络拦截有更高要求,Playwright 是更合适的选择。Playwright 由微软维护,API 设计参考了 Puppeteer 并做了大量改进。迁移路径:Puppeteer 的 page 对象与 Playwright 的 page 对象 API 相似但不完全兼容。最稳妥的方式是先保留 Puppeteer 的集成测试,新测试用 Playwright 编写,逐步替换。不要一次性迁移,风险太大。如果你的项目只需要 Chrome 测试,Puppeteer 仍然是最轻量的选择。
服务端阅读 05月28日 06:51

MCP性能优化有哪些核心策略?从协议层到工程实践的完整方案

MCP 的性能瓶颈在哪里?优化之前,先定位瓶颈。MCP(Model Context Protocol)基于 JSON-RPC 2.0 协议通信,性能问题主要集中在三个层面:通信延迟:每次工具调用都是一次完整的请求-响应周期,批量场景下延迟叠加严重序列化开销:JSON 编解码在高吞吐场景下消耗 15%-20% 的 CPU 时间,远高于 FlatBuffers 等二进制方案上下文膨胀:随着对话轮次增加,上下文窗口不断膨胀,传输和解析成本线性增长实测数据:在单节点 5000 QPS 场景下,JSON-RPC 2.0 的序列化开销比 Cap'n Proto 高约 18%,这对延迟敏感型应用影响显著。协议层:减少通信开销批量工具调用MCP 支持在单次请求中发起多个工具调用,减少网络往返:{ "jsonrpc": "2.0", "method": "tools/call", "params": { "calls": [ {"name": "read_file", "arguments": {"path": "/config.yaml"}}, {"name": "list_directory", "arguments": {"path": "/data"}} ] }}批量调用可将 5 次独立请求的延迟从 ~500ms 压缩到 ~120ms(假设单次 RTT 100ms)。二进制序列化替代MCP 社区正在推进 Transport Evolution 工作组,探索 JSON-RPC 2.0 的二进制扩展。在生产环境中,可以引入中间层将 JSON 序列化替换为 MessagePack 或 Protocol Buffers:import msgpackdef serialize_mcp_message(msg: dict) -> bytes: """用 MessagePack 替代 JSON 序列化,减少 40%-60% 体积""" return msgpack.packb(msg, use_bin_type=True)def deserialize_mcp_message(data: bytes) -> dict: return msgpack.unpackb(data, raw=False)连接复用与流式响应优先使用 HTTP/2 或 SSE(Server-Sent Events)传输层,避免反复建立 TCP 连接。对长时间运行的工具调用,使用流式响应配合 progressToken 逐步返回结果,而不是阻塞等待完整输出。缓存层:避免重复计算结果缓存对幂等性工具调用(如读取配置文件、查询静态数据)实施结果缓存。LRU 缓存在工具调用场景下命中率通常可达 60% 以上:from functools import lru_cachefrom typing import Anyclass ToolCache: def __init__(self, maxsize: int = 512): self._cache = lru_cache(maxsize=maxsize) @lru_cache(maxsize=512) def get_tool_result(self, tool_name: str, args_key: str) -> Any: """缓存工具执行结果,args_key 为参数的稳定哈希""" return self._execute_tool(tool_name, args_key)元数据缓存工具列表(tools/list)和资源描述(resources/list)变化频率低,应在客户端侧缓存,避免每次会话重新拉取。MCP Server Cards(.well-known URL)可进一步加速服务发现,在 100+ 服务器环境中降低 30% 初始连接延迟。智能失效策略缓存不是永久的。结合 TTL 和事件驱动两种失效机制:短周期数据用 TTL(如 60s),配置变更通过 notifications/resources/updated 主动推送失效。并发层:提升吞吐量异步 I/O 模型MCP Server 应采用异步编程模型。Python 推荐 asyncio,Node.js 天然异步,Go 用 goroutine:import asyncioasync def handle_batch_calls(calls: list[dict]) -> list[dict]: """并行执行独立的工具调用""" tasks = [execute_single_call(call) for call in calls] results = await asyncio.gather(*tasks, return_exceptions=True) return [ r if not isinstance(r, Exception) else {"error": str(r)} for r in results ]无锁并发控制高并发场景下,锁竞争会成为瓶颈。优先使用无锁数据结构(如 asyncio.Queue)或乐观并发控制,避免线程/协程间的阻塞等待。C++ 实现可考虑 lock-free queue 或 RCU(Read-Copy-Update)模式。连接池管理对外部资源(数据库、HTTP API)建立连接池,设置合理的 min_size 和 max_size。连接池耗尽时排队等待而非无限创建新连接,防止资源泄漏拖垮整个服务。上下文管理:控制膨胀上下文膨胀是 MCP 特有的性能问题。随着对话推进,发送给模型的上下文越来越大,直接影响 token 计费和响应延迟。上下文裁剪策略滑动窗口:只保留最近 N 轮对话,丢弃早期历史摘要压缩:对超过阈值的早期对话生成摘要,替代原文传入相关性过滤:根据当前查询,只检索相关的上下文片段(RAG 思路)def prune_context(messages: list[dict], max_tokens: int = 4000) -> list[dict]: """滑动窗口 + 摘要压缩""" total = sum(estimate_tokens(m["content"]) for m in messages) if total <= max_tokens: return messages # 保留最近 3 轮,对更早的内容生成摘要 recent = messages[-6:] # 最近 3 轮(user+assistant 各1) older = messages[:-6] summary = generate_summary(older) return [{"role": "system", "content": f"历史摘要:{summary}"}] + recent资源按需加载不要在初始化时加载全部资源。使用 resources/read 按需获取,配合 URI 模板(如 file:///data/{category}/{id})实现懒加载。监控与调优:数据驱动关键指标| 指标 | 告警阈值 | 说明 ||------|----------|------|| 工具调用 P99 延迟 | > 500ms | 单次调用耗时过长 || 序列化耗时占比 | > 15% | 需要考虑二进制替代 || 上下文 token 数 | > 8000 | 膨胀严重,需裁剪 || 缓存命中率 | 1% | 排查工具实现问题 |基准测试优化前后都要跑基准测试。推荐使用 locust 或 k6 模拟并发工具调用场景,记录 QPS、P50/P95/P99 延迟的变化。APM 集成接入 OpenTelemetry 或 SkyWalking,对工具调用的完整链路进行追踪。重点关注:从 MCP 客户端发出请求到收到响应的全链路耗时分解。生产部署优化水平扩展MCP Server 设计为无状态,支持水平扩展。在 Kubernetes 中配置 HPA(Horizontal Pod Autoscaler),基于 CPU 使用率或自定义指标(如请求队列深度)自动扩缩容。健康检查与故障隔离实现 /health 端点,配合负载均衡器的健康检查自动剔除异常实例。设置合理的超时时间,避免慢请求阻塞整个连接池。区域部署对延迟敏感的场景,在多个地理区域部署 MCP Server 实例,客户端就近接入。MCP Gateway 可统一入口,内部路由到最近的实例。性能优化不是一次性工程。从定位瓶颈开始,依次优化协议层、缓存层、并发层和上下文管理,用监控数据验证每一步的效果,才能持续保持 MCP 系统在高负载下的稳定响应。
服务端阅读 05月28日 06:51

MCP 协议的核心架构包含哪些关键组件?

MCP 协议的三层架构MCP(Model Context Protocol)采用传输层、会话层、能力层三层解耦设计,各层职责独立、协同工作,保障协议的兼容性与扩展性。传输层负责底层通信,支持 Stdio、HTTP Streamable、WebSocket 等多种传输方式。Stdio 适用于本地进程间通信,HTTP Streamable 是 2026 年规范升级后的推荐方式,解决了早期长连接不稳定的问题。会话层负责连接鉴权与心跳维护。2026 年 3 月规范更新后,Auth 认证机制从草案进入正式版,支持 OAuth 2.0 标准流程,取代了此前被广泛诟病的明文密码方案。能力层负责工具声明、调用和事件通知,是协议的核心业务层。三大核心角色:Host、Client、ServerMCP 的运行时架构由三个角色组成:Host(主机):发起连接的 LLM 应用程序,如 Claude Desktop、VS Code 中的 AI 扩展。一个 Host 可以管理多个 Client。Client(客户端):在 Host 内部运行,与 Server 保持 1:1 连接,负责将请求转换为结构化的 JSON-RPC 2.0 消息,管理会话生命周期(超时、重连、中断)。Server(服务器):向外暴露能力,提供工具、资源和提示。Server 可以连接数据库、文件系统、API 等外部服务。三者关系:Host 包含多个 Client,每个 Client 连接一个 Server,形成星形拓扑。核心原语:Tools、Resources、Prompts能力层定义了三种原语,是 Server 向 Client 暴露能力的唯一方式:Tools(工具)——可执行的操作,类似定义明确的函数。每个工具包含名称、描述和 JSON Schema 定义的参数。LLM 根据工具描述自主决定何时调用。{ "name": "query_database", "description": "执行 SQL 查询并返回结果", "inputSchema": { "type": "object", "properties": { "sql": { "type": "string", "description": "SQL 查询语句" } }, "required": ["sql"] }}Resources(资源)——可读取的上下文对象,如文件内容、数据库记录、日志条目。Client 主动拉取,Server 不主动推送。Prompts(提示模板)——结构化的工作流模板,预设工具调用序列和参数,引导 LLM 按特定步骤完成任务。核心工作流程完整的 MCP 交互分为四步:工具注册:Server 启动后声明自身提供的工具、资源和提示,包含参数定义与权限信息。连接鉴权:Client 与 Server 建立连接,完成身份校验(OAuth 2.0 流程)。能力发现:Client 获取 Server 支持的工具清单与调用规则,LLM 据此决定可调用哪些工具。工具调用与结果返回:LLM 发出工具调用指令,Server 执行后将结果通过 JSON-RPC 响应返回 Client。MCP 协议的核心优势MCP 相比自定义工具接口的差异化价值在于:标准化交互:统一的 JSON-RPC 2.0 消息格式,无需为每个工具单独开发适配逻辑。跨框架兼容:支持 LangChain、CrewAI 等主流 Agent 框架互通,一套 Server 实现多框架接入。动态能力发现:工具即插即用,Server 上线后 Client 自动感知新增能力。内置安全机制:OAuth 2.0 鉴权 + 加密传输 + 权限粒度控制。2026 年 4 月,Meta、Docker 相继宣布支持 MCP,Linux Foundation 成立 AAIF(AI Agent Interoperability Foundation)接管协议规范,MCP 正在成为 AI Agent 互联互通的事实标准。追问:MCP 与 Function Calling 有什么区别?Function Calling 是单模型厂商的私有能力,调用格式和参数定义因模型而异,无法跨模型复用。MCP 是开放协议,定义了标准化的能力声明、发现和调用机制,Server 实现一次即可被任何支持 MCP 的 Client 接入。类比来说,Function Calling 像"专车接口",MCP 像"USB 接口"——前者绑定特定实现,后者面向通用适配。