WebAssembly 的多线程和并发编程通过 Web Workers 和共享内存实现:
1. Web Workers 基础
- WebAssembly 本身是单线程的
- 使用 Web Workers 实现多线程
- 每个 Worker 有独立的 WebAssembly 实例
- Workers 之间通过消息传递通信
2. 共享内存
javascript// 创建共享内存 const sharedMemory = new WebAssembly.Memory({ initial: 10, maximum: 100, shared: true }); // 主线程 const mainBuffer = new Int32Array(sharedMemory.buffer); // Worker 线程 const worker = new Worker('worker.js'); worker.postMessage({ memory: sharedMemory }, [sharedMemory.buffer]);
3. Worker 代码示例
javascript// worker.js self.onmessage = function(e) { const { memory, wasmModule } = e.data; // 在 Worker 中加载 WebAssembly WebAssembly.instantiate(wasmModule, { env: { memory } }) .then(results => { const { process } = results.instance.exports; // 处理数据 const buffer = new Int32Array(memory.buffer); const result = process(0, buffer.length); // 发送结果回主线程 self.postMessage({ result }); }); };
4. 主线程代码
javascript// main.js const sharedMemory = new WebAssembly.Memory({ initial: 10, maximum: 100, shared: true }); // 创建多个 Workers const workers = []; for (let i = 0; i < 4; i++) { const worker = new Worker('worker.js'); worker.postMessage({ memory: sharedMemory, wasmModule: wasmBinary }, [sharedMemory.buffer]); workers.push(worker); } // 接收 Worker 结果 workers.forEach(worker => { worker.onmessage = function(e) { console.log('Worker result:', e.data.result); }; });
5. 原子操作
javascript// 使用 Atomics 进行同步 const buffer = new Int32Array(sharedMemory.buffer); // 原子加 Atomics.add(buffer, 0, 1); // 原子比较并交换 const expected = 0; const newValue = 1; const success = Atomics.compareExchange(buffer, 0, expected, newValue); // 原子等待 Atomics.wait(buffer, 0, 0); // 原子通知 Atomics.notify(buffer, 0, 1);
6. 生产者-消费者模式
javascript// 生产者 Worker function producerWorker() { const buffer = new Int32Array(sharedMemory.buffer); const head = 0; const tail = 4; while (true) { // 等待空间 while (buffer[head] === buffer[tail]) { Atomics.wait(buffer, head, buffer[head]); } // 生产数据 buffer[buffer[head] % 10 + 8] = Math.random(); buffer[head] = buffer[head] + 1; // 通知消费者 Atomics.notify(buffer, head, 1); } } // 消费者 Worker function consumerWorker() { const buffer = new Int32Array(sharedMemory.buffer); const head = 0; const tail = 4; while (true) { // 等待数据 while (buffer[head] === buffer[tail]) { Atomics.wait(buffer, tail, buffer[tail]); } // 消费数据 const data = buffer[buffer[tail] % 10 + 8]; console.log('Consumed:', data); buffer[tail] = buffer[tail] + 1; // 通知生产者 Atomics.notify(buffer, tail, 1); } }
7. 并行计算
javascript// 并行矩阵乘法 async function parallelMatrixMultiply(A, B, numWorkers = 4) { const sharedMemory = new WebAssembly.Memory({ initial: 20, maximum: 100, shared: true }); const workers = []; const rowsPerWorker = Math.ceil(A.length / numWorkers); // 创建 Workers for (let i = 0; i < numWorkers; i++) { const worker = new Worker('matrix-worker.js'); const startRow = i * rowsPerWorker; const endRow = Math.min(startRow + rowsPerWorker, A.length); worker.postMessage({ memory: sharedMemory, startRow, endRow, A, B }, [sharedMemory.buffer]); workers.push(worker); } // 等待所有 Workers 完成 const results = await Promise.all( workers.map(worker => new Promise(resolve => { worker.onmessage = (e) => resolve(e.data.result); }) ) ); return results.flat(); }
8. 任务队列
javascript// 任务队列实现 class TaskQueue { constructor(numWorkers = 4) { this.workers = []; this.taskQueue = []; this.activeWorkers = 0; for (let i = 0; i < numWorkers; i++) { const worker = new Worker('task-worker.js'); worker.onmessage = (e) => this.handleWorkerMessage(worker, e); this.workers.push(worker); } } addTask(task) { return new Promise((resolve, reject) => { this.taskQueue.push({ task, resolve, reject }); this.processQueue(); }); } processQueue() { while (this.taskQueue.length > 0 && this.activeWorkers < this.workers.length) { const { task, resolve, reject } = this.taskQueue.shift(); const worker = this.workers[this.activeWorkers]; this.activeWorkers++; worker.postMessage({ task, id: Date.now() }); worker.currentResolve = resolve; worker.currentReject = reject; } } handleWorkerMessage(worker, e) { const { result, error } = e.data; if (error) { worker.currentReject(error); } else { worker.currentResolve(result); } this.activeWorkers--; this.processQueue(); } }
9. 性能优化
- 合理分配任务:避免负载不均衡
- 减少同步开销:尽量减少原子操作
- 批量处理:减少消息传递次数
- 内存复用:避免频繁的内存分配
- Worker 池:复用 Worker 实例
10. 最佳实践
- 使用共享内存减少数据复制
- 合理使用原子操作进行同步
- 避免死锁和竞态条件
- 监控 Worker 性能和资源使用
- 实现优雅的错误处理和恢复
11. 调试多线程代码
- 使用 Chrome DevTools 的 Worker 调试功能
- 记录 Worker 之间的消息传递
- 监控共享内存的状态
- 使用性能分析工具识别瓶颈
12. 挑战和限制
- 调试复杂度增加
- 需要处理同步和并发问题
- 浏览器对 Worker 数量有限制
- 移动设备性能可能受限