5月28日 07:00

如何在 Ollama 中使用流式响应(streaming)来实时生成文本?

Ollama 的流式响应(streaming)允许模型逐 token 返回结果,而不是等待全部生成完毕后一次性返回。这在聊天界面、代码补全等场景下几乎是必须的能力——用户能立刻看到内容逐步出现,感知延迟大幅降低。

核心原理

Ollama 的流式响应基于 HTTP 长连接 + NDJSON(Newline Delimited JSON)格式。服务端每生成一个 token 就立即写一行 JSON 到响应体,客户端逐行读取并解析。关键参数是请求中的 "stream": true——默认情况下 REST API 的流式是开启的,但在各语言 SDK 中通常默认关闭。

每行 JSON 结构大致如下:

json
{"model":"llama3.2","response":"Hello","done":false} {"model":"llama3.2","response":" world","done":false} {"model":"llama3.2","response":"","done":true,"total_duration":2500000000}

donetrue 时,这条消息还会携带 total_durationeval_count 等统计信息,标志着本次生成结束。

流式 vs 非流式:什么时候用哪个?

维度流式(stream: true)非流式(stream: false)
首字延迟极低,token 级返回需等待全部生成完毕
内存占用逐 token 处理,峰值低需缓存完整响应
用户体验类 ChatGPT 逐字出现长时间白屏等待
实现复杂度需处理增量拼接和中断逻辑一次请求-响应,简单直接
适用场景聊天、代码补全、实时交互批量处理、后台任务、需完整结果后再操作

面试要点:非流式适合服务端内部调用或需要完整结果后再做后处理的场景;流式适合任何用户直接交互的界面。

generate 端点的流式调用

/api/generate 是最基础的文本生成端点,适用于单轮 prompt 场景:

bash
curl http://localhost:11434/api/generate -d '{ "model": "llama3.2", "prompt": "用 Python 实现快速排序", "stream": true }'

Python 中用 requests 库处理:

python
import requests import json response = requests.post( 'http://localhost:11434/api/generate', json={'model': 'llama3.2', 'prompt': '用 Python 实现快速排序', 'stream': True}, stream=True ) full_text = '' for line in response.iter_lines(): if line: chunk = json.loads(line) full_text += chunk.get('response', '') print(chunk['response'], end='', flush=True) if chunk.get('done'): print(f' 总耗时: {chunk["total_duration"] / 1e9:.2f}s')

chat 端点的流式调用

/api/chat 支持多轮对话,是构建聊天应用的首选端点:

python
import requests import json messages = [ {'role': 'user', 'content': '解释一下 Transformer 的自注意力机制'} ] response = requests.post( 'http://localhost:11434/api/chat', json={'model': 'llama3.2', 'messages': messages, 'stream': True}, stream=True ) for line in response.iter_lines(): if line: chunk = json.loads(line) if 'message' in chunk and chunk['message']['content']: print(chunk['message']['content'], end='', flush=True)

注意 chat 端点的响应结构不同——文本在 chunk['message']['content'] 而非 chunk['response'],这是面试中常见的混淆点。

使用官方 Python SDK

Ollama 官方提供了 ollama Python 包,流式接口更简洁:

python
import ollama # generate 流式 for chunk in ollama.generate(model='llama3.2', prompt='写一首关于编程的诗', stream=True): print(chunk['response'], end='', flush=True) # chat 流式 for chunk in ollama.chat( model='llama3.2', messages=[{'role': 'user', 'content': '解释递归'}], stream=True ): if chunk['message']['content']: print(chunk['message']['content'], end='', flush=True)

SDK 内部已经处理了连接管理和 NDJSON 解析,代码量显著减少。但在生产环境中,直接使用 requests 给你更多控制权——比如自定义超时、重试策略和连接池。

JavaScript/TypeScript 实现

Node.js 环境下使用 fetch API 处理流式:

javascript
async function streamChat(prompt) { const response = await fetch('http://localhost:11434/api/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model: 'llama3.2', messages: [{ role: 'user', content: prompt }], stream: true }) }); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split(' '); buffer = lines.pop(); // 保留不完整的行 for (const line of lines) { if (!line.trim()) continue; const chunk = JSON.parse(line); if (chunk.message?.content) { process.stdout.write(chunk.message.content); } } } }

这里用 buffer 拼接是因为网络传输可能把一个 JSON 行拆成多个 chunk,直接按行 split 会解析失败——这是前端处理流式响应时最常踩的坑。

错误处理与重连

生产环境中流式连接会因网络波动、服务重启等原因中断,必须有健壮的错误处理:

python
import requests import json import time def stream_with_retry(url, payload, max_retries=3, timeout=30): for attempt in range(max_retries): try: response = requests.post( url, json=payload, stream=True, timeout=timeout ) response.raise_for_status() full_text = '' for line in response.iter_lines(): if line: chunk = json.loads(line) if chunk.get('done'): return full_text full_text += chunk.get('response', '') return full_text except (requests.ConnectionError, requests.Timeout) as e: print(f'连接失败 (尝试 {attempt + 1}/{max_retries}): {e}') time.sleep(2 ** attempt) # 指数退避 except json.JSONDecodeError as e: print(f'JSON 解析错误: {e}') continue raise ConnectionError(f'重试 {max_retries} 次后仍然失败') # 使用 result = stream_with_retry( 'http://localhost:11434/api/generate', {'model': 'llama3.2', 'prompt': 'Hello', 'stream': True} )

三个关键点:指数退避避免雪崩、超时设置防止无限挂起、JSON 解析错误需要跳过坏行而非直接放弃。

工具调用的流式支持

从 Ollama v0.8.0 开始,工具调用(tool calling)也支持流式返回了。这意味着模型在生成内容的同时可以实时发起工具调用,不再需要等待完整响应:

python
import ollama tools = [{ 'type': 'function', 'function': { 'name': 'get_weather', 'description': '获取指定城市的天气', 'parameters': { 'type': 'object', 'properties': { 'city': {'type': 'string', 'description': '城市名称'} }, 'required': ['city'] } } }] for chunk in ollama.chat( model='llama3.2', messages=[{'role': 'user', 'content': '北京今天天气怎么样?'}], tools=tools, stream=True ): if chunk.get('message', {}).get('tool_calls'): for tool_call in chunk['message']['tool_calls']: print(f'调用工具: {tool_call["function"]["name"]}') print(f'参数: {tool_call["function"]["arguments"]}')

收到工具调用后,你需要执行对应函数,将结果作为 tool 角色的消息追加到 messages 中,再次调用 chat 接口让模型继续生成。

生产环境的几个坑

连接池管理:Ollama 默认并发连接数有限(通常与模型并发数相关)。如果每个请求都新建连接,高并发下会频繁超时。用 requests.Session() 复用连接:

python
session = requests.Session() def stream_chat(prompt): return session.post( 'http://localhost:11434/api/chat', json={'model': 'llama3.2', 'messages': [{'role': 'user', 'content': prompt}], 'stream': True}, stream=True, timeout=60 )

取消生成:用户中途停止接收时,直接关闭连接即可。Ollama 服务端会检测到连接断开并停止生成,不会浪费算力。在 Python 中调用 response.close(),在 JS 中调用 reader.cancel()

上下文窗口溢出:长对话流式生成到一半突然返回 done: true 但内容截断,通常是上下文窗口超限。需要在发送请求前计算 token 数,必要时裁剪历史消息。

多模型并发:Ollama 同时只能加载有限数量的模型到 GPU。如果频繁切换模型,会出现加载等待,表现为流式首字延迟骤增。生产环境建议固定使用一个模型,或通过 OLLAMA_NUM_PARALLEL 环境变量调整并发数。

标签:Ollama