In one of our projects, we have a WorkerPool
intended to process a queue in parallel. We recently stumbled across a bug where instead of starting the desired 30 jobs, the queue ends up starting all the jobs. Ever wondered why?
1import { Injectable, Optional } from '@nestjs/common';
2
3export type Task<T> = () => Promise<T>;
4
5class Worker<T> {
6 public workerPromise: Promise<T>;
7 private resolve: (resolvedValue: T) => void;
8 private reject: (error: Error) => void;
9
10 constructor(private task: Task<T>) {
11 this.workerPromise = new Promise<T>((resolve, reject) => {
12 this.resolve = resolve;
13 this.reject = reject;
14 });
15 }
16
17 public run(): Promise<T> {
18 const taskPromise = this.task();
19 taskPromise.then(this.resolve, this.reject);
20 return taskPromise;
21 }
22}
23
24@Injectable()
25export class WorkerPool<T> {
26 private queuedTasks: Array<Worker<T>> = [];
27 private runningTasks = 0;
28
29 constructor(@Optional() private concurrency: number = 30) {}
30
31 public queue(tasks: Task<T>[]): Promise<T[]> {
32 const workers = tasks.map((task) => new Worker(task));
33
34 for (const worker of workers) {
35 this.queuedTasks.push(worker);
36 }
37
38 this.flush();
39
40 return Promise.all(workers.map(({ workerPromise }) => workerPromise));
41 }
42
43 private flush() {
44 const { queuedTasks, runningTasks } = this;
45
46 while (queuedTasks.length && runningTasks < this.concurrency) {
47 this.runTask(queuedTasks.shift()!);
48 }
49 }
50
51 private completeTask() {
52 this.runningTasks--;
53 this.flush();
54 }
55
56 private runTask(worker: Worker<T>) {
57 this.runningTasks++;
58 const done = () => {
59 return this.completeTask();
60 };
61 worker.run().then(done, done);
62 }
63}
After digging into it, we found the culprit. The snippet const { queuedTasks, runningTasks } = this;
creates a local copy of queuedTasks
and runningTasks
. Any updates to the class variables have no impact on these local versions. So, in a scenario like this where the queue is involved, the while
loop will never resolve to true
.
Avoid destructuring the variables within the function as it creates a local copy and any updates to the class variables won't affect the local copy. In the case of the queue, it won't update the condition for the while
loop causing it to never resolve to true
.