| 在 promise 函数里,我们首先通过调用 .getInactiveWorkerId() 来检查是否存在空闲的 worker 可以来处理数据: private getInactiveWorkerId(): number {    for (let i = 0; i < this.numberOfThreads; i += 1) {      if (!this.activeWorkersById[i]) {        return i;      }    }    return -1;  } 
 接下来,我们创建一个 queueItem,在其中保存传递给 .run() 方法的 getData 函数以及回调。在回调中,我们要么 resolve 或者 reject promise,这取决于 worker 是否将错误传递给回调。 如果 availableWorkerId 的值是 -1,意味着当前没有可用的 worker,我们将 queueItem 添加到 queue。如果有可用的 worker,则调用 .runWorker() 方法来执行 worker。 在 .runWorker() 方法中,我们必须把当前 worker 的 activeWorkersById 设置为使用状态;为 message 和 error 事件设置事件监听器(并在之后清理它们);最后将数据发送给 worker。 private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {   const worker = this.workersById[workerId];   this.activeWorkersById[workerId] = true;   const messageCallback = (result: N) => {     queueItem.callback(null, result);     cleanUp();   };   const errorCallback = (error: any) => {     queueItem.callback(error);     cleanUp();   };   const cleanUp = () => {     worker.removeAllListeners('message');     worker.removeAllListeners('error');     this.activeWorkersById[workerId] = false;     if (!this.queue.length) {       return null;     }     this.runWorker(workerId, this.queue.shift());   };   worker.once('message', messageCallback);   worker.once('error', errorCallback);   worker.postMessage(await queueItem.getData());  } 
 首先,通过使用传递的 workerId,我们从 workersById 中获得 worker 引用。然后,在 activeWorkersById 中,将 [workerId] 属性设置为true,这样我们就能知道在 worker 在忙,不要运行其他任务。 接下来,分别创建 messageCallback 和 errorCallback 用来在消息和错误事件上调用,然后注册所述函数来监听事件并将数据发送给 worker。 在回调中,我们调用 queueItem 的回调,然后调用 cleanUp 函数。在 cleanUp 函数中,要删除事件侦听器,,因为我们会多次重用同一个 worker。如果没有删除监听器的话就会发生内存泄漏,内存会被慢慢耗尽。 在 activeWorkersById 状态中,我们将 [workerId] 属性设置为 false,并检查队列是否为空。如果不是,就从 queue 中删除第一个项目,并用另一个 queueItem 再次调用 worker。 接着创建一个在收到 message 事件中的数据后进行一些计算的 worker: import { isMainThread, parentPort } from 'worker_threads';  if (isMainThread) {   throw new Error('Its not a worker');  }  const doCalcs = (data: any) => {   const collection = [];   for (let i = 0; i < 1000000; i += 1) {     collection[i] = Math.round(Math.random() * 100000);   }   return collection.sort((a, b) => {     if (a > b) {       return 1;     }     return -1;   });  };  parentPort.on('message', (data: any) => {   const result = doCalcs(data);   parentPort.postMessage(result);  }); 
 worker 创建了一个包含 100 万个随机数的数组,然后对它们进行排序。只要能够多花费一些时间才能完成,做些什么事情并不重要。 (编辑:南平站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |