12.8.2023
Spot the Bug | WorkerPool edition
We have a WorkerPool in a project that is supposed to process a queue in parallel.
The bug is that the queue starts all jobs instead of the desired 30. Why?
import { Injectable, Optional } from '@nestjs/common';
export type Task<T> = () => Promise<T>;
class Worker<T> {
public workerPromise: Promise<T>;
private resolve: (resolvedValue: T) => void;
private reject: (error: Error) => void;
constructor(private task: Task<T>) {
this.workerPromise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
public run(): Promise<T> {
const taskPromise = this.task();
taskPromise.then(this.resolve, this.reject);
return taskPromise;
}
}
@Injectable()
export class WorkerPool<T> {
private queuedTasks: Array<Worker<T>> = [];
private runningTasks = 0;
constructor(@Optional() private concurrency: number = 30) {}
public queue(tasks: Task<T>[]): Promise<T[]> {
const workers = tasks.map((task) => new Worker(task));
for (const worker of workers) {
this.queuedTasks.push(worker);
}
this.flush();
return Promise.all(workers.map(({ workerPromise }) => workerPromise));
}
private flush() {
const { queuedTasks, runningTasks } = this;
while (queuedTasks.length && runningTasks < this.concurrency) {
this.runTask(queuedTasks.shift()!);
}
}
private completeTask() {
this.runningTasks--;
this.flush();
}
private runTask(worker: Worker<T>) {
this.runningTasks++;
const done = () => {
return this.completeTask();
};
worker.run().then(done, done);
}
}
Solution
`const { queuedTasks, runningTasks } = this;` Destructering the variables ensures that a local copy of the variables is created within the function. If the class variables are updated, this has no effect. In the example with the queue, the while loop will never become `true`.Standort Hannover
newcubator GmbH
Bödekerstraße 22
30161 Hannover
Standort Dortmund
newcubator GmbH
Westenhellweg 85-89
44137 Dortmund