12.8.2023

Spot the Bug | WorkerPool edition

We have a WorkerPool in a project that is supposed to process a queue in parallel.

The bug is that the queue starts all jobs instead of the desired 30. Why?

import { Injectable, Optional } from '@nestjs/common';

export type Task<T> = () => Promise<T>;

class Worker<T> {
  public workerPromise: Promise<T>;
  private resolve: (resolvedValue: T) => void;
  private reject: (error: Error) => void;

  constructor(private task: Task<T>) {
    this.workerPromise = new Promise<T>((resolve, reject) => {
      this.resolve = resolve;
      this.reject = reject;
    });
  }

  public run(): Promise<T> {
    const taskPromise = this.task();
    taskPromise.then(this.resolve, this.reject);
    return taskPromise;
  }
}

@Injectable()
export class WorkerPool<T> {
  private queuedTasks: Array<Worker<T>> = [];
  private runningTasks = 0;

  constructor(@Optional() private concurrency: number = 30) {}

  public queue(tasks: Task<T>[]): Promise<T[]> {
    const workers = tasks.map((task) => new Worker(task));

    for (const worker of workers) {
      this.queuedTasks.push(worker);
    }

    this.flush();

    return Promise.all(workers.map(({ workerPromise }) => workerPromise));
  }

  private flush() {
    const { queuedTasks, runningTasks } = this;

    while (queuedTasks.length && runningTasks < this.concurrency) {
      this.runTask(queuedTasks.shift()!);
    }
  }

  private completeTask() {
    this.runningTasks--;
    this.flush();
  }

  private runTask(worker: Worker<T>) {
    this.runningTasks++;
    const done = () => {
      return this.completeTask();
    };
    worker.run().then(done, done);
  }
}
Solution `const { queuedTasks, runningTasks } = this;` Destructering the variables ensures that a local copy of the variables is created within the function. If the class variables are updated, this has no effect. In the example with the queue, the while loop will never become `true`.
Adrian

Softwareentwickler

Zur Übersicht

Standort Hannover

newcubator GmbH
Bödekerstraße 22
30161 Hannover

Standort Dortmund

newcubator GmbH
Westenhellweg 85-89
44137 Dortmund