乐闻世界logo
搜索文章和话题

如何在Node.js中实现WebSocket服务器?

2月18日 18:59

Node.js提供了多种方式实现WebSocket服务器,最常用的是ws库。

使用ws库实现WebSocket服务器

基础服务器实现

javascript
const WebSocket = require('ws'); // 创建WebSocket服务器 const wss = new WebSocket.Server({ port: 8080 }); console.log('WebSocket服务器运行在 ws://localhost:8080'); wss.on('connection', (ws, request) => { console.log('新客户端连接'); // 获取客户端信息 const ip = request.socket.remoteAddress; console.log('客户端IP:', ip); // 发送欢迎消息 ws.send(JSON.stringify({ type: 'welcome', message: '欢迎连接到WebSocket服务器' })); // 接收消息 ws.on('message', (message) => { console.log('收到消息:', message.toString()); try { const data = JSON.parse(message); handleMessage(ws, data); } catch (error) { ws.send(JSON.stringify({ type: 'error', message: '消息格式错误' })); } }); // 处理错误 ws.on('error', (error) => { console.error('WebSocket错误:', error); }); // 连接关闭 ws.on('close', (code, reason) => { console.log('客户端断开连接, code:', code, 'reason:', reason.toString()); }); }); function handleMessage(ws, data) { switch (data.type) { case 'chat': broadcastMessage(data); break; case 'ping': ws.send(JSON.stringify({ type: 'pong' })); break; default: ws.send(JSON.stringify({ type: 'error', message: '未知消息类型' })); } } function broadcastMessage(data, excludeWs = null) { wss.clients.forEach((client) => { if (client !== excludeWs && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } }); }

与HTTP服务器集成

javascript
const http = require('http'); const WebSocket = require('ws'); // 创建HTTP服务器 const server = http.createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('WebSocket服务器运行中'); }); // 创建WebSocket服务器,绑定到HTTP服务器 const wss = new WebSocket.Server({ server }); wss.on('connection', (ws) => { console.log('WebSocket连接建立'); ws.on('message', (message) => { console.log('收到:', message.toString()); ws.send(`服务器回复: ${message}`); }); }); // 启动服务器 const PORT = 3000; server.listen(PORT, () => { console.log(`服务器运行在 http://localhost:${PORT}`); console.log(`WebSocket运行在 ws://localhost:${PORT}`); });

认证中间件

javascript
const WebSocket = require('ws'); const jwt = require('jsonwebtoken'); const JWT_SECRET = 'your-secret-key'; function authenticateClient(request, callback) { // 从URL参数获取token const token = new URL(request.url, 'http://localhost').searchParams.get('token'); if (!token) { return callback(new Error('缺少认证token')); } // 验证token jwt.verify(token, JWT_SECRET, (err, decoded) => { if (err) { return callback(new Error('无效的token')); } // 将用户信息附加到request对象 request.user = decoded; callback(null, true); }); } const wss = new WebSocket.Server({ port: 8080, verifyClient: authenticateClient }); wss.on('connection', (ws, request) => { const user = request.user; console.log(`用户 ${user.username} 已连接`); ws.send(JSON.stringify({ type: 'authenticated', user: user.username })); });

房间功能实现

javascript
const WebSocket = require('ws'); const wss = new WebSocket.Server({ port: 8080 }); // 存储房间和用户 const rooms = new Map(); wss.on('connection', (ws, request) => { let currentRoom = null; let userId = null; ws.on('message', (message) => { const data = JSON.parse(message.toString()); switch (data.type) { case 'join': handleJoin(ws, data.roomId, data.userId); break; case 'leave': handleLeave(ws); break; case 'message': handleMessage(ws, data); break; } }); function handleJoin(ws, roomId, uid) { // 如果已在房间,先离开 if (currentRoom) { handleLeave(ws); } currentRoom = roomId; userId = uid; // 创建房间(如果不存在) if (!rooms.has(roomId)) { rooms.set(roomId, new Map()); } const room = rooms.get(roomId); room.set(userId, ws); // 通知房间内其他用户 broadcastToRoom(roomId, { type: 'user_joined', userId: userId }, ws); ws.send(JSON.stringify({ type: 'joined', roomId: roomId, users: Array.from(room.keys()) })); } function handleLeave(ws) { if (currentRoom && rooms.has(currentRoom)) { const room = rooms.get(currentRoom); room.delete(userId); // 通知房间内其他用户 broadcastToRoom(currentRoom, { type: 'user_left', userId: userId }); // 如果房间为空,删除房间 if (room.size === 0) { rooms.delete(currentRoom); } } currentRoom = null; userId = null; } function handleMessage(ws, data) { if (!currentRoom) { ws.send(JSON.stringify({ type: 'error', message: '请先加入房间' })); return; } broadcastToRoom(currentRoom, { type: 'chat', userId: userId, message: data.message, timestamp: Date.now() }); } function broadcastToRoom(roomId, message, excludeWs = null) { if (!rooms.has(roomId)) return; const room = rooms.get(roomId); const data = JSON.stringify(message); room.forEach((clientWs) => { if (clientWs !== excludeWs && clientWs.readyState === WebSocket.OPEN) { clientWs.send(data); } }); } ws.on('close', () => { handleLeave(ws); }); });

心跳检测实现

javascript
const WebSocket = require('ws'); const wss = new WebSocket.Server({ port: 8080 }); const HEARTBEAT_INTERVAL = 30000; // 30秒 const HEARTBEAT_TIMEOUT = 5000; // 5秒超时 wss.on('connection', (ws) => { ws.isAlive = true; ws.lastPong = Date.now(); // 监听pong消息 ws.on('pong', () => { ws.isAlive = true; ws.lastPong = Date.now(); }); // 监听消息 ws.on('message', (message) => { const data = JSON.parse(message.toString()); if (data.type === 'ping') { ws.send(JSON.stringify({ type: 'pong' })); } }); }); // 定期检查连接状态 const interval = setInterval(() => { wss.clients.forEach((ws) => { // 检查是否超时 if (Date.now() - ws.lastPong > HEARTBEAT_TIMEOUT) { console.log('连接超时,关闭连接'); return ws.terminate(); } // 发送ping if (ws.isAlive === false) { console.log('连接无响应,关闭连接'); return ws.terminate(); } ws.isAlive = false; ws.ping(); }); }, HEARTBEAT_INTERVAL); wss.on('close', () => { clearInterval(interval); });

负载均衡支持

javascript
const WebSocket = require('ws'); const Redis = require('ioredis'); const redis = new Redis(); const wss = new WebSocket.Server({ port: 8080 }); // 当前服务器ID const SERVER_ID = 'server-1'; // 订阅Redis频道 const subscriber = new Redis(); subscriber.subscribe('websocket_broadcast'); // 处理来自其他服务器的消息 subscriber.on('message', (channel, message) => { const data = JSON.parse(message); // 只处理不是自己发送的消息 if (data.serverId !== SERVER_ID) { broadcastToRoom(data.roomId, data.message); } }); wss.on('connection', (ws, request) => { const userId = getUserId(request); const roomId = getRoomId(request); // 注册连接到Redis redis.hset('websocket_connections', userId, JSON.stringify({ serverId: SERVER_ID, roomId: roomId })); ws.on('message', (message) => { const data = JSON.parse(message.toString()); // 广播到当前服务器的客户端 broadcastToRoom(roomId, data); // 发布到Redis,通知其他服务器 redis.publish('websocket_broadcast', JSON.stringify({ serverId: SERVER_ID, roomId: roomId, message: data })); }); ws.on('close', () => { // 从Redis删除连接 redis.hdel('websocket_connections', userId); }); }); function broadcastToRoom(roomId, message) { wss.clients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(message)); } }); } function getUserId(request) { // 从请求中获取用户ID return request.headers['x-user-id']; } function getRoomId(request) { // 从请求中获取房间ID return request.headers['x-room-id']; }

最佳实践

  1. 错误处理:妥善处理所有可能的错误
  2. 资源清理:连接关闭时清理相关资源
  3. 认证授权:实现完善的认证机制
  4. 心跳检测:定期检查连接状态
  5. 消息验证:验证所有接收到的消息
  6. 日志记录:记录重要事件和错误
  7. 性能监控:监控服务器性能指标
  8. 负载均衡:支持多服务器部署
标签:WebSocket