5月28日 09:27

GraphQL Subscriptions 如何实现实时数据推送?

核心回答

GraphQL 订阅基于 WebSocket 实现持久连接,服务端通过 PubSub 模式在事件触发时主动向客户端推送数据,区别于 Query/Mutation 的请求-响应模式。完整实现涉及三个关键环节:传输层(WebSocket 或 SSE)、PubSub 引擎(内存 / Redis / 消息队列)、订阅解析器(过滤与鉴权)。

实现原理与通信流程

订阅的生命周期分为五步:

  1. 客户端通过 WebSocket 握手建立持久连接
  2. 客户端发送 subscription 操作文档和变量
  3. 服务端将订阅注册到 PubSub 引擎的对应 topic
  4. 当触发事件(如 Mutation 写入数据),PubSub 发布消息
  5. 服务端通过 AsyncIterator 将匹配的事件数据推送到客户端

与轮询相比,订阅的实时性高、服务端负载低(事件驱动而非定时查询),但实现复杂度更高,需要处理连接管理、断线重连、资源回收等问题。

服务端实现

基础 PubSub 方案

javascript
const { PubSub } = require('graphql-subscriptions'); const pubsub = new PubSub(); const POST_CREATED = 'POST_CREATED'; const typeDefs = ` type Subscription { postCreated: Post! commentAdded(postId: ID!): Comment! } `; const resolvers = { Subscription: { postCreated: { subscribe: () => pubsub.asyncIterator([POST_CREATED]) }, commentAdded: { subscribe: (_, { postId }) => { const iterator = pubsub.asyncIterator(['COMMENT_ADDED']); return withFilter(iterator, (payload) => payload.commentAdded.postId === postId ); } } }, Mutation: { createPost: async (_, { input }) => { const post = await Post.create(input); pubsub.publish(POST_CREATED, { postCreated: post }); return post; } } };

内存 PubSub 仅适用于单实例部署,多实例必须切换到 Redis 或消息队列方案。

Redis PubSub 分布式方案

javascript
const { RedisPubSub } = require('graphql-redis-subscriptions'); const pubsub = new RedisPubSub({ connection: { host: process.env.REDIS_HOST, port: 6379, retry_strategy: (options) => { if (options.total_retry_time > 1000 * 60 * 60) return new Error('Retry exhausted'); return Math.min(options.attempt * 100, 3000); } } });

对于更大规模系统,可使用 Kafka、NATS 或 RabbitMQ 作为消息中间件,适用于微服务架构下的跨服务事件分发。

Apollo Server WebSocket 配置

javascript
const { WebSocketServer } = require('ws'); const { useServer } = require('graphql-ws/lib/use/ws'); const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' }); useServer({ schema: server.schema, context: async (ctx) => { const token = ctx.connectionParams?.authorization; if (!token) throw new Error('Unauthorized'); return { user: await verifyToken(token) }; }, onConnect: () => console.log('Client connected'), onDisconnect: () => console.log('Client disconnected') }, wsServer);

注意:Apollo Server v4 推荐使用 graphql-ws 协议替代旧版 subscriptions-transport-ws,后者已停止维护。

客户端实现

Apollo Client 订阅配置

javascript
import { split, HttpLink } from '@apollo/client'; import { GraphQLWsLink } from '@apollo/client/link/subscriptions'; import { createClient } from 'graphql-ws'; import { getMainDefinition } from '@apollo/client/utilities'; const httpLink = new HttpLink({ uri: '/graphql' }); const wsLink = new GraphQLWsLink(createClient({ url: 'ws://localhost:4000/graphql', connectionParams: { authToken: localStorage.getItem('token') } })); const splitLink = split( ({ query }) => { const def = getMainDefinition(query); return def.kind === 'OperationDefinition' && def.operation === 'subscription'; }, wsLink, httpLink );

组件内使用订阅

javascript
const POST_CREATED = gql` subscription OnPostCreated { postCreated { id title author { name } } } `; function PostList() { const { data, loading } = useSubscription(POST_CREATED); if (loading) return <p>等待数据...</p>; return <PostCard post={data.postCreated} />; }

订阅过滤与鉴权

过滤是订阅的必备能力,分为两层:

参数过滤:根据订阅参数筛选事件,例如只接收特定帖子的评论。使用 withFilter 工具函数可简化实现。

javascript
const { withFilter } = require('graphql-subscriptions'); subscribe: withFilter( () => pubsub.asyncIterator(['COMMENT_ADDED']), (payload, variables) => payload.commentAdded.postId === variables.postId );

权限过滤:在 subscribe 解析器中校验用户身份,只推送该用户有权查看的数据。对于敏感字段,应在推送前过滤掉无权访问的字段。

错误处理与重连

javascript
const wsClient = createClient({ url: 'ws://localhost:4000/graphql', retryAttempts: 5, shouldRetry: (err) => err.code !== 4001, on: { error: (err) => console.error('WebSocket error:', err), closed: () => console.log('Connection closed') } });

常见错误类型及处理策略:

错误类型原因处理方式
连接断开网络波动/服务重启自动重连 + 指数退避
认证失败Token 过期重新获取 Token 后重连
订阅超时服务端负载过高设置超时阈值 + 降级轮询
内存泄漏组件卸载未取消订阅useEffect 清理函数取消订阅

性能优化要点

  • 批量发布:高频事件合并推送,减少 WebSocket 帧数量
  • 节流控制:客户端对订阅数据做 throttle,避免 UI 频繁重渲染
  • 连接数限制:服务端设置单客户端最大订阅数,防止资源耗尽
  • 僵尸连接回收:设置心跳检测和空闲超时,清理失活连接
  • 分布式部署:多实例场景必须使用 Redis PubSub 或消息队列,内存方案无法跨进程通信

WebSocket vs SSE 如何选择

维度WebSocketSSE
通信方向双向仅服务端推送
协议开销较高(握手)低(基于 HTTP)
浏览器支持全部除 IE 外全部
适用场景需要双向通信纯推送场景
连接管理复杂简单

SSE 适合只需要服务端推送、不需要客户端通过同一连接发送数据的场景,实现更轻量。GraphQL 社区已有 graphql-sse 库支持 SSE 传输。

追问:生产环境有哪些坑?

  1. 连接数爆炸:每个订阅占用一个 WebSocket 连接,高并发下需要网关层做连接复用或限流
  2. 数据一致性:订阅推送的数据可能与客户端缓存不一致,需配合 update 函数手动修正缓存
  3. 灰度发布:Schema 变更时,旧客户端的订阅可能断开,需做好版本兼容
  4. 监控盲区:订阅不像 HTTP 请求有明确的请求/响应日志,需要单独建立连接和推送的监控指标
标签:GraphQL