6月1日 09:12

How to implement a WebSocket server in Node.js?

Node.js provides multiple ways to implement WebSocket servers, the most commonly used is the ws library.

Implementing WebSocket Server with ws Library

Basic Server Implementation

javascript
const WebSocket = require('ws'); // Create WebSocket server const wss = new WebSocket.Server({ port: 8080 }); console.log('WebSocket server running on ws://localhost:8080'); wss.on('connection', (ws, request) => { console.log('New client connected'); // Get client information const ip = request.socket.remoteAddress; console.log('Client IP:', ip); // Send welcome message ws.send(JSON.stringify({ type: 'welcome', message: 'Welcome to WebSocket server' })); // Receive messages ws.on('message', (message) => { console.log('Received message:', message.toString()); try { const data = JSON.parse(message); handleMessage(ws, data); } catch (error) { ws.send(JSON.stringify({ type: 'error', message: 'Invalid message format' })); } }); // Handle errors ws.on('error', (error) => { console.error('WebSocket error:', error); }); // Connection closed ws.on('close', (code, reason) => { console.log('Client disconnected, code:', code, 'reason:', reason.toString()); }); }); function handleMessage(ws, data) { switch (data.type) { case 'chat': broadcastMessage(data); break; case 'ping': ws.send(JSON.stringify({ type: 'pong' })); break; default: ws.send(JSON.stringify({ type: 'error', message: 'Unknown message type' })); } } function broadcastMessage(data, excludeWs = null) { wss.clients.forEach((client) => { if (client !== excludeWs && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(data)); } }); }

Integration with HTTP Server

javascript
const http = require('http'); const WebSocket = require('ws'); // Create HTTP server const server = http.createServer((req, res) => { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('WebSocket server is running'); }); // Create WebSocket server, bind to HTTP server const wss = new WebSocket.Server({ server }); wss.on('connection', (ws) => { console.log('WebSocket connection established'); ws.on('message', (message) => { console.log('Received:', message.toString()); ws.send(`Server reply: ${message}`); }); }); // Start server const PORT = 3000; server.listen(PORT, () => { console.log(`Server running on http://localhost:${PORT}`); console.log(`WebSocket running on ws://localhost:${PORT}`); });

Authentication Middleware

javascript
const WebSocket = require('ws'); const jwt = require('jsonwebtoken'); const JWT_SECRET = 'your-secret-key'; function authenticateClient(request, callback) { // Get token from URL parameters const token = new URL(request.url, 'http://localhost').searchParams.get('token'); if (!token) { return callback(new Error('Missing authentication token')); } // Verify token jwt.verify(token, JWT_SECRET, (err, decoded) => { if (err) { return callback(new Error('Invalid token')); } // Attach user info to request object request.user = decoded; callback(null, true); }); } const wss = new WebSocket.Server({ port: 8080, verifyClient: authenticateClient }); wss.on('connection', (ws, request) => { const user = request.user; console.log(`User ${user.username} connected`); ws.send(JSON.stringify({ type: 'authenticated', user: user.username })); });

Room Functionality Implementation

javascript
const WebSocket = require('ws'); const wss = new WebSocket.Server({ port: 8080 }); // Store rooms and users const rooms = new Map(); wss.on('connection', (ws, request) => { let currentRoom = null; let userId = null; ws.on('message', (message) => { const data = JSON.parse(message.toString()); switch (data.type) { case 'join': handleJoin(ws, data.roomId, data.userId); break; case 'leave': handleLeave(ws); break; case 'message': handleMessage(ws, data); break; } }); function handleJoin(ws, roomId, uid) { // If already in a room, leave first if (currentRoom) { handleLeave(ws); } currentRoom = roomId; userId = uid; // Create room (if not exists) if (!rooms.has(roomId)) { rooms.set(roomId, new Map()); } const room = rooms.get(roomId); room.set(userId, ws); // Notify other users in the room broadcastToRoom(roomId, { type: 'user_joined', userId: userId }, ws); ws.send(JSON.stringify({ type: 'joined', roomId: roomId, users: Array.from(room.keys()) })); } function handleLeave(ws) { if (currentRoom && rooms.has(currentRoom)) { const room = rooms.get(currentRoom); room.delete(userId); // Notify other users in the room broadcastToRoom(currentRoom, { type: 'user_left', userId: userId }); // Delete room if empty if (room.size === 0) { rooms.delete(currentRoom); } } currentRoom = null; userId = null; } function handleMessage(ws, data) { if (!currentRoom) { ws.send(JSON.stringify({ type: 'error', message: 'Please join a room first' })); return; } broadcastToRoom(currentRoom, { type: 'chat', userId: userId, message: data.message, timestamp: Date.now() }); } function broadcastToRoom(roomId, message, excludeWs = null) { if (!rooms.has(roomId)) return; const room = rooms.get(roomId); const data = JSON.stringify(message); room.forEach((clientWs) => { if (clientWs !== excludeWs && clientWs.readyState === WebSocket.OPEN) { clientWs.send(data); } }); } ws.on('close', () => { handleLeave(ws); }); });

Heartbeat Detection Implementation

javascript
const WebSocket = require('ws'); const wss = new WebSocket.Server({ port: 8080 }); const HEARTBEAT_INTERVAL = 30000; // 30 seconds const HEARTBEAT_TIMEOUT = 5000; // 5 second timeout wss.on('connection', (ws) => { ws.isAlive = true; ws.lastPong = Date.now(); // Listen for pong messages ws.on('pong', () => { ws.isAlive = true; ws.lastPong = Date.now(); }); // Listen for messages ws.on('message', (message) => { const data = JSON.parse(message.toString()); if (data.type === 'ping') { ws.send(JSON.stringify({ type: 'pong' })); } }); }); // Periodically check connection status const interval = setInterval(() => { wss.clients.forEach((ws) => { // Check if timeout if (Date.now() - ws.lastPong > HEARTBEAT_TIMEOUT) { console.log('Connection timeout, closing connection'); return ws.terminate(); } // Send ping if (ws.isAlive === false) { console.log('Connection not responding, closing connection'); return ws.terminate(); } ws.isAlive = false; ws.ping(); }); }, HEARTBEAT_INTERVAL); wss.on('close', () => { clearInterval(interval); });

Load Balancing Support

javascript
const WebSocket = require('ws'); const Redis = require('ioredis'); const redis = new Redis(); const wss = new WebSocket.Server({ port: 8080 }); // Current server ID const SERVER_ID = 'server-1'; // Subscribe to Redis channel const subscriber = new Redis(); subscriber.subscribe('websocket_broadcast'); // Handle messages from other servers subscriber.on('message', (channel, message) => { const data = JSON.parse(message); // Only process messages not sent by self if (data.serverId !== SERVER_ID) { broadcastToRoom(data.roomId, data.message); } }); wss.on('connection', (ws, request) => { const userId = getUserId(request); const roomId = getRoomId(request); // Register connection to Redis redis.hset('websocket_connections', userId, JSON.stringify({ serverId: SERVER_ID, roomId: roomId })); ws.on('message', (message) => { const data = JSON.parse(message.toString()); // Broadcast to clients on current server broadcastToRoom(roomId, data); // Publish to Redis, notify other servers redis.publish('websocket_broadcast', JSON.stringify({ serverId: SERVER_ID, roomId: roomId, message: data })); }); ws.on('close', () => { // Delete connection from Redis redis.hdel('websocket_connections', userId); }); }); function broadcastToRoom(roomId, message) { wss.clients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(message)); } }); } function getUserId(request) { // Get user ID from request return request.headers['x-user-id']; } function getRoomId(request) { // Get room ID from request return request.headers['x-room-id']; }

Best Practices

  1. Error Handling: Properly handle all possible errors
  2. Resource Cleanup: Clean up related resources when connection closes
  3. Authentication Authorization: Implement comprehensive authentication mechanisms
  4. Heartbeat Detection: Periodically check connection status
  5. Message Validation: Validate all received messages
  6. Logging: Record important events and errors
  7. Performance Monitoring: Monitor server performance metrics
  8. Load Balancing: Support multi-server deployment
标签:WebSocket