WebSocket 概述
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。NestJS 通过 @nestjs/websockets 和 @nestjs/platform-socket.io 包提供了对 WebSocket 的完整支持,使开发者能够轻松构建实时应用程序。
NestJS WebSocket 基础
安装依赖
bashnpm install @nestjs/websockets @nestjs/platform-socket.io
创建 WebSocket 网关
网关(Gateway)是 NestJS 中处理 WebSocket 连接的类,类似于控制器处理 HTTP 请求。
typescriptimport { WebSocketGateway, WebSocketServer, SubscribeMessage, OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; @WebSocketGateway({ cors: { origin: '*', }, }) export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server: Server; afterInit(server: Server) { console.log('WebSocket server initialized'); } handleConnection(client: Socket) { console.log(`Client connected: ${client.id}`); } handleDisconnect(client: Socket) { console.log(`Client disconnected: ${client.id}`); } @SubscribeMessage('message') handleMessage(client: Socket, payload: any): void { this.server.emit('message', payload); } }
注册网关
typescriptimport { Module } from '@nestjs/common'; import { ChatGateway } from './chat.gateway'; @Module({ providers: [ChatGateway], }) export class ChatModule {}
WebSocket 网关装饰器
@WebSocketGateway()
配置 WebSocket 网关。
typescript@WebSocketGateway({ namespace: '/chat', // 命名空间 cors: { // CORS 配置 origin: '*', }, path: '/ws', // 路径 transports: ['websocket'], // 传输方式 }) export class ChatGateway {}
@WebSocketServer()
注入 WebSocket 服务器实例。
typescript@WebSocketServer() server: Server;
@SubscribeMessage()
订阅 WebSocket 消息。
typescript@SubscribeMessage('message') handleMessage(client: Socket, payload: any): void { // 处理消息 }
@MessageBody()
提取消息体。
typescript@SubscribeMessage('message') handleMessage(@MessageBody() data: any): void { console.log(data); }
@ConnectedSocket()
获取连接的 Socket 实例。
typescript@SubscribeMessage('message') handleMessage(@ConnectedSocket() client: Socket): void { console.log(client.id); }
网关生命周期钩子
OnGatewayInit
typescriptafterInit(server: Server) { console.log('Gateway initialized'); }
OnGatewayConnection
typescripthandleConnection(client: Socket) { console.log(`Client connected: ${client.id}`); }
OnGatewayDisconnect
typescripthandleDisconnect(client: Socket) { console.log(`Client disconnected: ${client.id}`); }
实时聊天应用示例
聊天网关
typescriptimport { WebSocketGateway, WebSocketServer, SubscribeMessage, OnGatewayConnection, OnGatewayDisconnect } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; interface Message { user: string; text: string; timestamp: Date; } @WebSocketGateway({ cors: { origin: '*', }, }) export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server: Server; private connectedClients: Map<string, Socket> = new Map(); handleConnection(client: Socket) { this.connectedClients.set(client.id, client); this.server.emit('user-joined', { userId: client.id, userCount: this.connectedClients.size, }); } handleDisconnect(client: Socket) { this.connectedClients.delete(client.id); this.server.emit('user-left', { userId: client.id, userCount: this.connectedClients.size, }); } @SubscribeMessage('join-room') handleJoinRoom(client: Socket, room: string): void { client.join(room); client.emit('joined-room', room); } @SubscribeMessage('leave-room') handleLeaveRoom(client: Socket, room: string): void { client.leave(room); client.emit('left-room', room); } @SubscribeMessage('send-message') handleMessage(client: Socket, payload: Message): void { const message: Message = { ...payload, timestamp: new Date(), }; this.server.emit('message', message); } @SubscribeMessage('private-message') handlePrivateMessage(client: Socket, payload: { to: string; message: Message }): void { const recipient = this.connectedClients.get(payload.to); if (recipient) { recipient.emit('private-message', payload.message); } } }
前端客户端
typescriptimport { io, Socket } from 'socket.io-client'; class ChatClient { private socket: Socket; constructor(url: string) { this.socket = io(url); } connect() { this.socket.on('connect', () => { console.log('Connected to server'); }); this.socket.on('message', (message: Message) => { console.log('Received message:', message); }); this.socket.on('user-joined', (data) => { console.log('User joined:', data); }); this.socket.on('user-left', (data) => { console.log('User left:', data); }); } sendMessage(message: Message) { this.socket.emit('send-message', message); } sendPrivateMessage(to: string, message: Message) { this.socket.emit('private-message', { to, message }); } joinRoom(room: string) { this.socket.emit('join-room', room); } leaveRoom(room: string) { this.socket.emit('leave-room', room); } disconnect() { this.socket.disconnect(); } }
使用 WebSocket 守卫
typescriptimport { WebSocketGateway, SubscribeMessage, OnGatewayConnection } from '@nestjs/websockets'; import { UseGuards } from '@nestjs/common'; import { WsGuard } from './ws.guard'; @WebSocketGateway() @UseGuards(WsGuard) export class ChatGateway implements OnGatewayConnection { @SubscribeMessage('message') handleMessage(client: Socket, payload: any): void { // 只有通过验证的客户端才能处理消息 } }
WebSocket 守卫示例
typescriptimport { CanActivate, ExecutionContext, Injectable, UnauthorizedException } from '@nestjs/common'; import { Socket } from 'socket.io'; @Injectable() export class WsGuard implements CanActivate { canActivate(context: ExecutionContext): boolean { const client: Socket = context.switchToWs().getClient(); const token = client.handshake.auth.token; if (!token) { throw new UnauthorizedException(); } // 验证 token return this.validateToken(token); } private validateToken(token: string): boolean { // 实现 token 验证逻辑 return true; } }
使用 WebSocket 拦截器
typescriptimport { WebSocketGateway, SubscribeMessage } from '@nestjs/websockets'; import { UseInterceptors } from '@nestjs/common'; import { LoggingInterceptor } from './logging.interceptor'; @WebSocketGateway() @UseInterceptors(LoggingInterceptor) export class ChatGateway { @SubscribeMessage('message') handleMessage(client: Socket, payload: any): void { // 消息会被拦截器处理 } }
使用 WebSocket 管道
typescriptimport { WebSocketGateway, SubscribeMessage, MessageBody } from '@nestjs/websockets'; import { UsePipes } from '@nestjs/common'; import { ValidationPipe } from './validation.pipe'; @WebSocketGateway() @UsePipes(new ValidationPipe()) export class ChatGateway { @SubscribeMessage('message') handleMessage(@MessageBody() message: Message): void { // 消息会被管道验证 } }
实时通知系统
通知网关
typescriptimport { WebSocketGateway, WebSocketServer, OnGatewayConnection, OnGatewayDisconnect } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; interface Notification { type: 'info' | 'warning' | 'error' | 'success'; title: string; message: string; timestamp: Date; } @WebSocketGateway({ namespace: '/notifications', cors: { origin: '*' }, }) export class NotificationGateway implements OnGatewayConnection, OnGatewayDisconnect { @WebSocketServer() server: Server; private userSockets: Map<string, Set<string>> = new Map(); handleConnection(client: Socket) { const userId = client.handshake.query.userId as string; if (!this.userSockets.has(userId)) { this.userSockets.set(userId, new Set()); } this.userSockets.get(userId).add(client.id); } handleDisconnect(client: Socket) { const userId = client.handshake.query.userId as string; const sockets = this.userSockets.get(userId); if (sockets) { sockets.delete(client.id); if (sockets.size === 0) { this.userSockets.delete(userId); } } } sendNotificationToUser(userId: string, notification: Notification) { const sockets = this.userSockets.get(userId); if (sockets) { sockets.forEach(socketId => { this.server.to(socketId).emit('notification', notification); }); } } sendBroadcastNotification(notification: Notification) { this.server.emit('notification', notification); } }
在服务中使用通知网关
typescriptimport { Injectable } from '@nestjs/common'; import { NotificationGateway } from './notification.gateway'; @Injectable() export class NotificationService { constructor(private notificationGateway: NotificationGateway) {} sendOrderNotification(userId: string, orderId: string) { this.notificationGateway.sendNotificationToUser(userId, { type: 'info', title: '订单更新', message: `您的订单 ${orderId} 已更新`, timestamp: new Date(), }); } sendSystemAlert(message: string) { this.notificationGateway.sendBroadcastNotification({ type: 'warning', title: '系统通知', message, timestamp: new Date(), }); } }
最佳实践
- 命名空间:使用命名空间组织不同类型的 WebSocket 连接
- 房间:使用房间功能管理用户组
- 认证:在连接时进行身份验证
- 错误处理:妥善处理连接错误和消息错误
- 资源清理:在断开连接时清理资源
- 消息验证:使用管道验证传入的消息
- 日志记录:记录连接和断开事件
- 性能监控:监控连接数和消息吞吐量
总结
NestJS WebSocket 和实时功能提供了:
- 完整的 WebSocket 支持
- 灵活的网关系统
- 丰富的装饰器和钩子
- 与 NestJS 生态系统的无缝集成
- 易于构建实时应用程序
掌握 NestJS WebSocket 功能是构建实时应用程序的关键。通过合理使用网关、命名空间、房间和认证机制,可以构建出高性能、可扩展的实时应用,如聊天应用、实时通知系统、协作工具等。