GraphQL 订阅(Subscriptions)实现详解
GraphQL 订阅允许客户端实时接收服务器推送的数据更新,是构建实时应用的关键功能。以下是 GraphQL 订阅的详细实现方案。
1. 订阅基础概念
订阅的工作原理
- 客户端通过 WebSocket 建立持久连接
- 客户端发送订阅查询
- 服务器保持连接并监听事件
- 当事件发生时,服务器推送数据到客户端
- 客户端接收并处理更新
订阅 vs 轮询
| 特性 | 订阅 | 轮询 |
|---|
| 实时性 | 高 | 低 |
| 服务器负载 | 低(事件驱动) | 高(持续查询) |
| 网络开销 | 低(按需推送) | 高(定期请求) |
| 实现复杂度 | 高 | 低 |
| 适用场景 | 实时更新 | 定期检查 |
2. 服务器端实现
使用 graphql-subscriptions
const { PubSub } = require('graphql-subscriptions');
const pubsub = new PubSub();
const POST_CREATED = 'POST_CREATED';
const POST_UPDATED = 'POST_UPDATED';
const COMMENT_ADDED = 'COMMENT_ADDED';
const typeDefs = `
type Post {
id: ID!
title: String!
content: String!
author: User!
createdAt: DateTime!
}
type Comment {
id: ID!
text: String!
author: User!
post: Post!
createdAt: DateTime!
}
type Subscription {
postCreated: Post!
postUpdated(postId: ID!): Post!
commentAdded(postId: ID!): Comment!
}
`;
const resolvers = {
Subscription: {
postCreated: {
subscribe: () => pubsub.asyncIterator([POST_CREATED])
},
postUpdated: {
subscribe: (_, { postId }) => {
const asyncIterator = pubsub.asyncIterator([POST_UPDATED]);
return {
[Symbol.asyncIterator]() {
return (async function* () {
for await (const event of asyncIterator) {
// 过滤:只返回指定帖子的更新
if (event.postUpdated.id === postId) {
yield event;
}
}
})();
}
};
}
},
commentAdded: {
subscribe: (_, { postId }) => {
const asyncIterator = pubsub.asyncIterator([COMMENT_ADDED]);
return {
[Symbol.asyncIterator]() {
return (async function* () {
for await (const event of asyncIterator) {
if (event.commentAdded.postId === postId) {
yield event;
}
}
})();
}
};
}
}
},
Mutation: {
createPost: async (_, { input }) => {
const post = await Post.create(input);
pubsub.publish(POST_CREATED, { postCreated: post });
return post;
},
updatePost: async (_, { id, input }) => {
const post = await Post.update(id, input);
pubsub.publish(POST_UPDATED, { postUpdated: post });
return post;
},
addComment: async (_, { input }) => {
const comment = await Comment.create(input);
pubsub.publish(COMMENT_ADDED, { commentAdded: comment });
return comment;
}
}
};
使用 Redis PubSub
const { RedisPubSub } = require('graphql-redis-subscriptions');
const pubsub = new RedisPubSub({
connection: {
host: 'localhost',
port: 6379,
retry_strategy: (options) => {
if (options.error && options.error.code === 'ECONNREFUSED') {
return new Error('Redis connection refused');
}
if (options.total_retry_time > 1000 * 60 * 60) {
return new Error('Redis retry time exhausted');
}
if (options.attempt > 10) {
return undefined;
}
return Math.min(options.attempt * 100, 3000);
}
}
});
// 使用方式与内存 PubSub 相同
const resolvers = {
Subscription: {
postCreated: {
subscribe: () => pubsub.asyncIterator([POST_CREATED])
}
}
};
3. Apollo Server 订阅实现
配置 Apollo Server
const { ApolloServer } = require('apollo-server-express');
const { createServer } = require('http');
const { WebSocketServer } = require('ws');
const { useServer } = require('graphql-ws/lib/use/ws');
const server = new ApolloServer({
typeDefs,
resolvers,
context: ({ req, connection }) => {
// HTTP 请求的 context
if (req) {
return { token: req.headers.authorization };
}
// WebSocket 连接的 context
if (connection) {
return { token: connection.context.authorization };
}
}
});
const httpServer = createServer(server);
// 创建 WebSocket 服务器
const wsServer = new WebSocketServer({
server: httpServer,
path: '/graphql'
});
useServer(
{
schema: server.schema,
context: (ctx) => {
// 验证连接
const token = ctx.connectionParams?.authorization;
if (!token) {
throw new Error('Unauthorized');
}
return { token };
},
onConnect: (ctx) => {
console.log('Client connected');
return { authorization: ctx.connectionParams?.authorization };
},
onDisconnect: (ctx, code, reason) => {
console.log('Client disconnected', { code, reason });
}
},
wsServer
);
server.listen().then(({ url }) => {
console.log(`Server ready at ${url}`);
});
4. 客户端实现
Apollo Client 订阅
import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { WebSocketLink } from '@apollo/client/link/ws';
import { getMainDefinition } from '@apollo/client/utilities';
// HTTP 链接
const httpLink = new HttpLink({
uri: 'http://localhost:4000/graphql'
});
// WebSocket 链接
const wsLink = new WebSocketLink({
uri: 'ws://localhost:4000/graphql',
options: {
reconnect: true,
connectionParams: {
authToken: localStorage.getItem('token')
},
lazy: true,
connectionCallback: (error) => {
if (error) {
console.error('WebSocket connection error:', error);
} else {
console.log('WebSocket connected');
}
}
}
});
// 分割链接
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query);
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
);
},
wsLink,
httpLink
);
const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache()
});
// 使用订阅
import { gql, useSubscription } from '@apollo/client';
const POST_CREATED_SUBSCRIPTION = gql`
subscription OnPostCreated {
postCreated {
id
title
content
author {
name
}
createdAt
}
}
`;
function PostList() {
const { data, loading, error } = useSubscription(POST_CREATED_SUBSCRIPTION);
if (loading) return <div>Loading...</div>;
if (error) return <div>Error: {error.message}</div>;
return (
<div>
<h3>New Post Created:</h3>
<p>{data.postCreated.title}</p>
</div>
);
}
React Hooks 订阅
import { useSubscription, useMutation } from '@apollo/client';
function ChatRoom({ roomId }) {
const MESSAGE_ADDED = gql`
subscription OnMessageAdded($roomId: ID!) {
messageAdded(roomId: $roomId) {
id
text
author {
name
}
createdAt
}
}
`;
const SEND_MESSAGE = gql`
mutation SendMessage($roomId: ID!, $text: String!) {
sendMessage(roomId: $roomId, text: $text) {
id
text
}
}
`;
const { data: messageData, loading } = useSubscription(MESSAGE_ADDED, {
variables: { roomId }
});
const [sendMessage] = useMutation(SEND_MESSAGE);
const handleSendMessage = (text) => {
sendMessage({ variables: { roomId, text } });
};
return (
<div>
{loading ? (
<div>Connecting...</div>
) : (
<div>
<MessageList messages={messageData?.messageAdded} />
<MessageInput onSend={handleSendMessage} />
</div>
)}
</div>
);
}
5. 订阅过滤
基于参数的过滤
const resolvers = {
Subscription: {
notification: {
subscribe: (_, { userId, types }) => {
const asyncIterator = pubsub.asyncIterator(['NOTIFICATION']);
return {
[Symbol.asyncIterator]() {
return (async function* () {
for await (const event of asyncIterator) {
const notification = event.notification;
// 过滤用户
if (userId && notification.userId !== userId) {
continue;
}
// 过滤类型
if (types && !types.includes(notification.type)) {
continue;
}
yield event;
}
})();
}
};
}
}
}
};
基于权限的过滤
const resolvers = {
Subscription: {
userUpdate: {
subscribe: async (_, __, context) => {
// 验证用户权限
if (!context.user) {
throw new Error('Unauthorized');
}
const asyncIterator = pubsub.asyncIterator(['USER_UPDATE']);
return {
[Symbol.asyncIterator]() {
return (async function* () {
for await (const event of asyncIterator) {
const update = event.userUpdate;
// 只返回当前用户的更新
if (update.userId !== context.user.id) {
continue;
}
// 只返回有权限查看的字段
const filteredUpdate = filterSensitiveFields(update, context.user.role);
yield { userUpdate: filteredUpdate };
}
})();
}
};
}
}
}
};
6. 订阅错误处理
连接错误处理
const wsLink = new WebSocketLink({
uri: 'ws://localhost:4000/graphql',
options: {
reconnect: true,
retryAttempts: 5,
connectionParams: async () => {
const token = await getAuthToken();
return { token };
},
on: {
connected: () => console.log('WebSocket connected'),
error: (error) => {
console.error('WebSocket error:', error);
// 尝试重新连接
},
closed: (event) => {
console.log('WebSocket closed:', event);
// 清理资源
}
}
}
});
订阅错误处理
function useSubscriptionWithErrorHandling(query, options) {
const { data, error, loading } = useSubscription(query, options);
useEffect(() => {
if (error) {
console.error('Subscription error:', error);
// 根据错误类型处理
if (error.networkError) {
// 网络错误,尝试重连
handleNetworkError(error);
} else if (error.graphQLErrors) {
// GraphQL 错误
handleGraphQLError(error);
}
}
}, [error]);
return { data, error, loading };
}
7. 订阅性能优化
批量发布
class BatchPublisher {
constructor(pubsub, eventName, batchSize = 10, flushInterval = 100) {
this.pubsub = pubsub;
this.eventName = eventName;
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.batch = [];
this.flushTimer = null;
}
add(event) {
this.batch.push(event);
if (this.batch.length >= this.batchSize) {
this.flush();
} else if (!this.flushTimer) {
this.flushTimer = setTimeout(() => this.flush(), this.flushInterval);
}
}
flush() {
if (this.batch.length === 0) return;
// 批量发布
this.pubsub.publish(this.eventName, { batch: this.batch });
this.batch = [];
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
}
}
// 使用批量发布器
const batchPublisher = new BatchPublisher(pubsub, 'BATCH_EVENTS');
// 添加事件到批次
batchPublisher.add({ type: 'event1', data: {} });
batchPublisher.add({ type: 'event2', data: {} });
订阅节流
function useThrottledSubscription(query, options, throttleMs = 1000) {
const { data, loading } = useSubscription(query, options);
const [throttledData, setThrottledData] = useState(null);
const lastUpdate = useRef(0);
useEffect(() => {
if (data) {
const now = Date.now();
if (now - lastUpdate.current > throttleMs) {
setThrottledData(data);
lastUpdate.current = now;
}
}
}, [data, throttleMs]);
return { data: throttledData, loading };
}
8. 订阅监控
连接监控
const connectionMetrics = {
activeConnections: 0,
totalConnections: 0,
disconnections: 0
};
const wsServer = new WebSocketServer({
server: httpServer,
path: '/graphql'
});
useServer(
{
schema: server.schema,
onConnect: () => {
connectionMetrics.totalConnections++;
connectionMetrics.activeConnections++;
console.log('Connection metrics:', connectionMetrics);
},
onDisconnect: () => {
connectionMetrics.activeConnections--;
connectionMetrics.disconnections++;
console.log('Connection metrics:', connectionMetrics);
}
},
wsServer
);
订阅指标
const subscriptionMetrics = new Map();
function trackSubscription(eventName) {
if (!subscriptionMetrics.has(eventName)) {
subscriptionMetrics.set(eventName, {
count: 0,
lastPublished: null
});
}
const metrics = subscriptionMetrics.get(eventName);
metrics.count++;
metrics.lastPublished = new Date();
}
// 在发布事件时追踪
pubsub.publish(POST_CREATED, { postCreated: post });
trackSubscription(POST_CREATED);
9. 订阅最佳实践
| 实践 | 说明 |
|---|
| 使用 Redis PubSub | 支持分布式部署 |
| 实现连接认证 | 确保订阅安全 |
| 添加错误处理 | 提高稳定性 |
| 实现过滤机制 | 减少不必要的数据推送 |
| 监控连接状态 | 及时发现问题 |
| 使用批量发布 | 提高性能 |
| 实现重连机制 | 提高可靠性 |
| 限制订阅数量 | 防止资源耗尽 |
| 设置超时时间 | 避免僵尸连接 |
| 记录订阅日志 | 便于调试和分析 |
10. 常见问题及解决方案
| 问题 | 原因 | 解决方案 |
|---|
| 连接频繁断开 | 网络不稳定、超时 | 实现自动重连、增加超时时间 |
| 订阅延迟高 | 服务器负载高、处理慢 | 优化性能、使用批量发布 |
| 内存泄漏 | 未正确清理订阅 | 确保取消订阅、清理资源 |
| 数据不一致 | 缓存未更新 | 实现缓存失效机制 |
| 安全问题 | 未验证连接 | 实现连接认证和授权 |