乐闻世界logo
搜索文章和话题

GraphQL 订阅(Subscriptions)如何实现

2月21日 17:00

GraphQL 订阅(Subscriptions)实现详解

GraphQL 订阅允许客户端实时接收服务器推送的数据更新,是构建实时应用的关键功能。以下是 GraphQL 订阅的详细实现方案。

1. 订阅基础概念

订阅的工作原理

  1. 客户端通过 WebSocket 建立持久连接
  2. 客户端发送订阅查询
  3. 服务器保持连接并监听事件
  4. 当事件发生时,服务器推送数据到客户端
  5. 客户端接收并处理更新

订阅 vs 轮询

特性订阅轮询
实时性
服务器负载低(事件驱动)高(持续查询)
网络开销低(按需推送)高(定期请求)
实现复杂度
适用场景实时更新定期检查

2. 服务器端实现

使用 graphql-subscriptions

javascript
const { PubSub } = require('graphql-subscriptions'); const pubsub = new PubSub(); const POST_CREATED = 'POST_CREATED'; const POST_UPDATED = 'POST_UPDATED'; const COMMENT_ADDED = 'COMMENT_ADDED'; const typeDefs = ` type Post { id: ID! title: String! content: String! author: User! createdAt: DateTime! } type Comment { id: ID! text: String! author: User! post: Post! createdAt: DateTime! } type Subscription { postCreated: Post! postUpdated(postId: ID!): Post! commentAdded(postId: ID!): Comment! } `; const resolvers = { Subscription: { postCreated: { subscribe: () => pubsub.asyncIterator([POST_CREATED]) }, postUpdated: { subscribe: (_, { postId }) => { const asyncIterator = pubsub.asyncIterator([POST_UPDATED]); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { // 过滤:只返回指定帖子的更新 if (event.postUpdated.id === postId) { yield event; } } })(); } }; } }, commentAdded: { subscribe: (_, { postId }) => { const asyncIterator = pubsub.asyncIterator([COMMENT_ADDED]); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { if (event.commentAdded.postId === postId) { yield event; } } })(); } }; } } }, Mutation: { createPost: async (_, { input }) => { const post = await Post.create(input); pubsub.publish(POST_CREATED, { postCreated: post }); return post; }, updatePost: async (_, { id, input }) => { const post = await Post.update(id, input); pubsub.publish(POST_UPDATED, { postUpdated: post }); return post; }, addComment: async (_, { input }) => { const comment = await Comment.create(input); pubsub.publish(COMMENT_ADDED, { commentAdded: comment }); return comment; } } };

使用 Redis PubSub

javascript
const { RedisPubSub } = require('graphql-redis-subscriptions'); const pubsub = new RedisPubSub({ connection: { host: 'localhost', port: 6379, retry_strategy: (options) => { if (options.error && options.error.code === 'ECONNREFUSED') { return new Error('Redis connection refused'); } if (options.total_retry_time > 1000 * 60 * 60) { return new Error('Redis retry time exhausted'); } if (options.attempt > 10) { return undefined; } return Math.min(options.attempt * 100, 3000); } } }); // 使用方式与内存 PubSub 相同 const resolvers = { Subscription: { postCreated: { subscribe: () => pubsub.asyncIterator([POST_CREATED]) } } };

3. Apollo Server 订阅实现

配置 Apollo Server

javascript
const { ApolloServer } = require('apollo-server-express'); const { createServer } = require('http'); const { WebSocketServer } = require('ws'); const { useServer } = require('graphql-ws/lib/use/ws'); const server = new ApolloServer({ typeDefs, resolvers, context: ({ req, connection }) => { // HTTP 请求的 context if (req) { return { token: req.headers.authorization }; } // WebSocket 连接的 context if (connection) { return { token: connection.context.authorization }; } } }); const httpServer = createServer(server); // 创建 WebSocket 服务器 const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' }); useServer( { schema: server.schema, context: (ctx) => { // 验证连接 const token = ctx.connectionParams?.authorization; if (!token) { throw new Error('Unauthorized'); } return { token }; }, onConnect: (ctx) => { console.log('Client connected'); return { authorization: ctx.connectionParams?.authorization }; }, onDisconnect: (ctx, code, reason) => { console.log('Client disconnected', { code, reason }); } }, wsServer ); server.listen().then(({ url }) => { console.log(`Server ready at ${url}`); });

4. 客户端实现

Apollo Client 订阅

javascript
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client'; import { WebSocketLink } from '@apollo/client/link/ws'; import { getMainDefinition } from '@apollo/client/utilities'; // HTTP 链接 const httpLink = new HttpLink({ uri: 'http://localhost:4000/graphql' }); // WebSocket 链接 const wsLink = new WebSocketLink({ uri: 'ws://localhost:4000/graphql', options: { reconnect: true, connectionParams: { authToken: localStorage.getItem('token') }, lazy: true, connectionCallback: (error) => { if (error) { console.error('WebSocket connection error:', error); } else { console.log('WebSocket connected'); } } } }); // 分割链接 const splitLink = split( ({ query }) => { const definition = getMainDefinition(query); return ( definition.kind === 'OperationDefinition' && definition.operation === 'subscription' ); }, wsLink, httpLink ); const client = new ApolloClient({ link: splitLink, cache: new InMemoryCache() }); // 使用订阅 import { gql, useSubscription } from '@apollo/client'; const POST_CREATED_SUBSCRIPTION = gql` subscription OnPostCreated { postCreated { id title content author { name } createdAt } } `; function PostList() { const { data, loading, error } = useSubscription(POST_CREATED_SUBSCRIPTION); if (loading) return <div>Loading...</div>; if (error) return <div>Error: {error.message}</div>; return ( <div> <h3>New Post Created:</h3> <p>{data.postCreated.title}</p> </div> ); }

React Hooks 订阅

javascript
import { useSubscription, useMutation } from '@apollo/client'; function ChatRoom({ roomId }) { const MESSAGE_ADDED = gql` subscription OnMessageAdded($roomId: ID!) { messageAdded(roomId: $roomId) { id text author { name } createdAt } } `; const SEND_MESSAGE = gql` mutation SendMessage($roomId: ID!, $text: String!) { sendMessage(roomId: $roomId, text: $text) { id text } } `; const { data: messageData, loading } = useSubscription(MESSAGE_ADDED, { variables: { roomId } }); const [sendMessage] = useMutation(SEND_MESSAGE); const handleSendMessage = (text) => { sendMessage({ variables: { roomId, text } }); }; return ( <div> {loading ? ( <div>Connecting...</div> ) : ( <div> <MessageList messages={messageData?.messageAdded} /> <MessageInput onSend={handleSendMessage} /> </div> )} </div> ); }

5. 订阅过滤

基于参数的过滤

javascript
const resolvers = { Subscription: { notification: { subscribe: (_, { userId, types }) => { const asyncIterator = pubsub.asyncIterator(['NOTIFICATION']); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { const notification = event.notification; // 过滤用户 if (userId && notification.userId !== userId) { continue; } // 过滤类型 if (types && !types.includes(notification.type)) { continue; } yield event; } })(); } }; } } } };

基于权限的过滤

javascript
const resolvers = { Subscription: { userUpdate: { subscribe: async (_, __, context) => { // 验证用户权限 if (!context.user) { throw new Error('Unauthorized'); } const asyncIterator = pubsub.asyncIterator(['USER_UPDATE']); return { [Symbol.asyncIterator]() { return (async function* () { for await (const event of asyncIterator) { const update = event.userUpdate; // 只返回当前用户的更新 if (update.userId !== context.user.id) { continue; } // 只返回有权限查看的字段 const filteredUpdate = filterSensitiveFields(update, context.user.role); yield { userUpdate: filteredUpdate }; } })(); } }; } } } };

6. 订阅错误处理

连接错误处理

javascript
const wsLink = new WebSocketLink({ uri: 'ws://localhost:4000/graphql', options: { reconnect: true, retryAttempts: 5, connectionParams: async () => { const token = await getAuthToken(); return { token }; }, on: { connected: () => console.log('WebSocket connected'), error: (error) => { console.error('WebSocket error:', error); // 尝试重新连接 }, closed: (event) => { console.log('WebSocket closed:', event); // 清理资源 } } } });

订阅错误处理

javascript
function useSubscriptionWithErrorHandling(query, options) { const { data, error, loading } = useSubscription(query, options); useEffect(() => { if (error) { console.error('Subscription error:', error); // 根据错误类型处理 if (error.networkError) { // 网络错误,尝试重连 handleNetworkError(error); } else if (error.graphQLErrors) { // GraphQL 错误 handleGraphQLError(error); } } }, [error]); return { data, error, loading }; }

7. 订阅性能优化

批量发布

javascript
class BatchPublisher { constructor(pubsub, eventName, batchSize = 10, flushInterval = 100) { this.pubsub = pubsub; this.eventName = eventName; this.batchSize = batchSize; this.flushInterval = flushInterval; this.batch = []; this.flushTimer = null; } add(event) { this.batch.push(event); if (this.batch.length >= this.batchSize) { this.flush(); } else if (!this.flushTimer) { this.flushTimer = setTimeout(() => this.flush(), this.flushInterval); } } flush() { if (this.batch.length === 0) return; // 批量发布 this.pubsub.publish(this.eventName, { batch: this.batch }); this.batch = []; if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } } } // 使用批量发布器 const batchPublisher = new BatchPublisher(pubsub, 'BATCH_EVENTS'); // 添加事件到批次 batchPublisher.add({ type: 'event1', data: {} }); batchPublisher.add({ type: 'event2', data: {} });

订阅节流

javascript
function useThrottledSubscription(query, options, throttleMs = 1000) { const { data, loading } = useSubscription(query, options); const [throttledData, setThrottledData] = useState(null); const lastUpdate = useRef(0); useEffect(() => { if (data) { const now = Date.now(); if (now - lastUpdate.current > throttleMs) { setThrottledData(data); lastUpdate.current = now; } } }, [data, throttleMs]); return { data: throttledData, loading }; }

8. 订阅监控

连接监控

javascript
const connectionMetrics = { activeConnections: 0, totalConnections: 0, disconnections: 0 }; const wsServer = new WebSocketServer({ server: httpServer, path: '/graphql' }); useServer( { schema: server.schema, onConnect: () => { connectionMetrics.totalConnections++; connectionMetrics.activeConnections++; console.log('Connection metrics:', connectionMetrics); }, onDisconnect: () => { connectionMetrics.activeConnections--; connectionMetrics.disconnections++; console.log('Connection metrics:', connectionMetrics); } }, wsServer );

订阅指标

javascript
const subscriptionMetrics = new Map(); function trackSubscription(eventName) { if (!subscriptionMetrics.has(eventName)) { subscriptionMetrics.set(eventName, { count: 0, lastPublished: null }); } const metrics = subscriptionMetrics.get(eventName); metrics.count++; metrics.lastPublished = new Date(); } // 在发布事件时追踪 pubsub.publish(POST_CREATED, { postCreated: post }); trackSubscription(POST_CREATED);

9. 订阅最佳实践

实践说明
使用 Redis PubSub支持分布式部署
实现连接认证确保订阅安全
添加错误处理提高稳定性
实现过滤机制减少不必要的数据推送
监控连接状态及时发现问题
使用批量发布提高性能
实现重连机制提高可靠性
限制订阅数量防止资源耗尽
设置超时时间避免僵尸连接
记录订阅日志便于调试和分析

10. 常见问题及解决方案

问题原因解决方案
连接频繁断开网络不稳定、超时实现自动重连、增加超时时间
订阅延迟高服务器负载高、处理慢优化性能、使用批量发布
内存泄漏未正确清理订阅确保取消订阅、清理资源
数据不一致缓存未更新实现缓存失效机制
安全问题未验证连接实现连接认证和授权
标签:GraphQL