乐闻世界logo
搜索文章和话题

Deno 的任务系统如何工作?

2月21日 16:08

Deno 的任务系统(Task System)提供了一种在后台运行异步任务的方式,类似于浏览器中的 Web Workers。这个功能对于执行 CPU 密集型任务或需要并行处理的场景非常有用。

任务系统概述

Deno 的任务系统允许你创建独立的工作线程,这些线程可以并行执行代码,不会阻塞主线程。每个任务都有自己的内存空间,通过消息传递与主线程通信。

基本用法

1. 创建简单任务

typescript
// main.ts const worker = new Worker(new URL("./worker.ts", import.meta.url).href, { type: "module", }); worker.postMessage({ type: "start", data: 42 }); worker.onmessage = (event) => { console.log("Received from worker:", event.data); worker.terminate(); }; worker.onerror = (error) => { console.error("Worker error:", error); };
typescript
// worker.ts self.onmessage = (event) => { console.log("Worker received:", event.data); const result = event.data.data * 2; self.postMessage({ type: "result", data: result }); };

运行:

bash
deno run --allow-read main.ts

2. 使用 Promise 封装 Worker

typescript
// main.ts function runWorker<T>(workerFile: string, data: any): Promise<T> { return new Promise((resolve, reject) => { const worker = new Worker(new URL(workerFile, import.meta.url).href, { type: "module", }); worker.postMessage(data); worker.onmessage = (event) => { resolve(event.data); worker.terminate(); }; worker.onerror = (error) => { reject(error); worker.terminate(); }; }); } async function main() { try { const result = await runWorker<number>("./worker.ts", { number: 10 }); console.log("Result:", result); } catch (error) { console.error("Error:", error); } } main();
typescript
// worker.ts self.onmessage = (event) => { const { number } = event.data; // 模拟耗时计算 let result = 0; for (let i = 0; i < number * 1000000; i++) { result += i; } self.postMessage(result); };

实际应用示例

1. 图像处理

typescript
// image-processor.ts self.onmessage = async (event) => { const { imageData, operation } = event.data; let result; switch (operation) { case "grayscale": result = applyGrayscale(imageData); break; case "invert": result = applyInvert(imageData); break; case "blur": result = applyBlur(imageData); break; default: throw new Error(`Unknown operation: ${operation}`); } self.postMessage({ result }); }; function applyGrayscale(data: Uint8ClampedArray): Uint8ClampedArray { const result = new Uint8ClampedArray(data.length); for (let i = 0; i < data.length; i += 4) { const r = data[i]; const g = data[i + 1]; const b = data[i + 2]; const gray = 0.299 * r + 0.587 * g + 0.114 * b; result[i] = gray; result[i + 1] = gray; result[i + 2] = gray; result[i + 3] = data[i + 3]; } return result; } function applyInvert(data: Uint8ClampedArray): Uint8ClampedArray { const result = new Uint8ClampedArray(data.length); for (let i = 0; i < data.length; i += 4) { result[i] = 255 - data[i]; result[i + 1] = 255 - data[i + 1]; result[i + 2] = 255 - data[i + 2]; result[i + 3] = data[i + 3]; } return result; } function applyBlur(data: Uint8ClampedArray): Uint8ClampedArray { // 简化的模糊算法 return data; // 实际实现会更复杂 }
typescript
// main.ts import { runWorker } from "./worker-utils.ts"; async function processImage(imagePath: string) { const imageData = await Deno.readFile(imagePath); const grayscaleResult = await runWorker<Uint8ClampedArray>( "./image-processor.ts", { imageData, operation: "grayscale" } ); await Deno.writeFile(`${imagePath}.grayscale.png`, grayscaleResult); const invertResult = await runWorker<Uint8ClampedArray>( "./image-processor.ts", { imageData, operation: "invert" } ); await Deno.writeFile(`${imagePath}.invert.png`, invertResult); console.log("Image processing complete"); } processImage("input.png");

2. 并行数据处理

typescript
// data-processor.ts self.onmessage = (event) => { const { data, chunkIndex, totalChunks } = event.data; console.log(`Processing chunk ${chunkIndex}/${totalChunks}`); // 模拟数据处理 const processed = data.map((item: number) => ({ value: item, processed: true, timestamp: Date.now(), })); self.postMessage({ chunkIndex, processed }); };
typescript
// main.ts import { runWorker } from "./worker-utils.ts"; async function processDataInParallel(data: number[], chunkSize: number = 1000) { const chunks: number[][] = []; for (let i = 0; i < data.length; i += chunkSize) { chunks.push(data.slice(i, i + chunkSize)); } console.log(`Processing ${chunks.length} chunks in parallel`); const promises = chunks.map((chunk, index) => runWorker("./data-processor.ts", { data: chunk, chunkIndex: index, totalChunks: chunks.length, }) ); const results = await Promise.all(promises); // 合并结果 const processedData = results .sort((a, b) => a.chunkIndex - b.chunkIndex) .flatMap((result) => result.processed); console.log(`Processed ${processedData.length} items`); return processedData; } // 生成测试数据 const testData = Array.from({ length: 10000 }, (_, i) => i); processDataInParallel(testData, 1000);

3. 文件批量处理

typescript
// file-processor.ts self.onmessage = async (event) => { const { filePath, operation } = event.data; try { const content = await Deno.readTextFile(filePath); let result: string; switch (operation) { case "uppercase": result = content.toUpperCase(); break; case "lowercase": result = content.toLowerCase(); break; case "reverse": result = content.split("").reverse().join(""); break; case "count": result = String(content.length); break; default: throw new Error(`Unknown operation: ${operation}`); } self.postMessage({ filePath, result, success: true }); } catch (error) { self.postMessage({ filePath, error: error.message, success: false }); } };
typescript
// main.ts import { runWorker } from "./worker-utils.ts"; async function processFilesInParallel( files: string[], operation: string ) { console.log(`Processing ${files.length} files with operation: ${operation}`); const promises = files.map((file) => runWorker("./file-processor.ts", { filePath: file, operation }) ); const results = await Promise.all(promises); results.forEach((result) => { if (result.success) { console.log(`${result.filePath}: ${result.result.substring(0, 50)}...`); } else { console.error(`${result.filePath}: ${result.error}`); } }); return results; } // 获取当前目录所有 .txt 文件 const files = Array.from(Deno.readDirSync(".")) .filter((entry) => entry.isFile && entry.name.endsWith(".txt")) .map((entry) => entry.name); processFilesInParallel(files, "uppercase");

4. 密码哈希计算

typescript
// password-hasher.ts self.onmessage = async (event) => { const { password, algorithm = "SHA-256" } = event.data; const encoder = new TextEncoder(); const data = encoder.encode(password); const hashBuffer = await crypto.subtle.digest(algorithm, data); const hashArray = Array.from(new Uint8Array(hashBuffer)); const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join(""); self.postMessage({ password, hash: hashHex, algorithm }); };
typescript
// main.ts import { runWorker } from "./worker-utils.ts"; async function hashPasswords(passwords: string[]) { console.log(`Hashing ${passwords.length} passwords`); const promises = passwords.map((password) => runWorker("./password-hasher.ts", { password }) ); const results = await Promise.all(promises); results.forEach((result) => { console.log(`${result.password}: ${result.hash}`); }); return results; } const passwords = ["password123", "admin", "user123", "test"]; hashPasswords(passwords);

高级用法

1. Worker 池

typescript
// worker-pool.ts export class WorkerPool { private workers: Worker[] = []; private taskQueue: Array<{ data: any; resolve: (value: any) => void; reject: (error: any) => void }> = []; private maxWorkers: number; constructor(workerFile: string, maxWorkers: number = 4) { this.maxWorkers = maxWorkers; for (let i = 0; i < maxWorkers; i++) { const worker = new Worker(new URL(workerFile, import.meta.url).href, { type: "module", }); worker.onmessage = (event) => { const task = this.taskQueue.shift(); if (task) { task.resolve(event.data); this.assignNextTask(worker); } }; worker.onerror = (error) => { const task = this.taskQueue.shift(); if (task) { task.reject(error); this.assignNextTask(worker); } }; this.workers.push(worker); } } private assignNextTask(worker: Worker) { const task = this.taskQueue[0]; if (task) { worker.postMessage(task.data); } } async execute(data: any): Promise<any> { return new Promise((resolve, reject) => { this.taskQueue.push({ data, resolve, reject }); // 查找空闲的 worker const idleWorker = this.workers.find((w) => !this.taskQueue.includes(w)); if (idleWorker) { this.assignNextTask(idleWorker); } }); } terminate() { this.workers.forEach((worker) => worker.terminate()); this.workers = []; } }

使用 Worker 池:

typescript
// main.ts import { WorkerPool } from "./worker-pool.ts"; const pool = new WorkerPool("./data-processor.ts", 4); async function processWithPool(data: number[]) { const promises = data.map((item) => pool.execute({ data: item })); const results = await Promise.all(promises); pool.terminate(); return results; } processWithPool([1, 2, 3, 4, 5, 6, 7, 8]);

2. 错误处理和重试

typescript
// worker-with-retry.ts export async function runWorkerWithRetry<T>( workerFile: string, data: any, maxRetries: number = 3 ): Promise<T> { let lastError: Error | undefined; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { return await runWorker<T>(workerFile, data); } catch (error) { lastError = error as Error; console.error(`Attempt ${attempt} failed: ${error.message}`); if (attempt < maxRetries) { await new Promise((resolve) => setTimeout(resolve, 1000 * attempt)); } } } throw lastError; }

最佳实践

  1. 合理使用 Worker:只在 CPU 密集型任务中使用 Worker
  2. 控制并发:限制同时运行的 Worker 数量
  3. 正确清理:使用完成后终止 Worker
  4. 错误处理:妥善处理 Worker 错误
  5. 消息大小:避免传递过大的消息
  6. 类型安全:使用 TypeScript 确保消息类型正确

Deno 的任务系统为并行处理和后台任务提供了强大的支持,能够显著提高应用程序的性能和响应能力。

标签:Deno