11.8.2023

Mastering Concurrency Control in TypeScript: Unleashing the Power of WorkerPool

This TypeScript code defines a WorkerPool class that manages the execution of asynchronous tasks with concurrency control. It utilizes the Worker class to execute tasks individually.

Here's a breakdown of the code:

  1. Define a generic Task type that represents a function returning a promise.
  2. Create a generic Worker class with the following properties and methods:
    • workerPromise: A promise that resolves to the result of the task.
    • resolve and reject: Functions to resolve or reject the worker's promise.
    • constructor: Takes a Task<T> as input and initializes the workerPromise.
    • run: Executes the task and resolves or rejects the worker's promise accordingly.
  3. Define a generic WorkerPool class that manages the execution of tasks with the following properties and methods:
    • queuedTasks: An array of queued Worker<T> instances.
    • runningTasks: The number of tasks currently running.
    • constructor: Initializes the concurrency limit, which defaults to 30 if not provided.
    • queue: Takes an array of tasks, creates workers for each task, adds them to the queue, and returns a promise that resolves to an array of results when all tasks are complete.
    • flush: Runs queued tasks while there are tasks in the queue and the number of running tasks is below the concurrency limit.
    • completeTask: Decreases the running task count and calls flush to process any remaining tasks.
    • runTask: Takes a worker, increases the running task count, and runs the worker's task. When the task is complete or fails, it calls completeTask.

In summary, this code provides a generic WorkerPool class that manages the execution of asynchronous tasks with concurrency control using the NestJS framework. Developers can use the WorkerPool class to queue and run tasks, ensuring that no more than the specified concurrency limit of tasks are executed simultaneously.

import { Injectable, Optional } from '@nestjs/common';

export type Task<T> = () => Promise<T>;

class Worker<T> {
  public workerPromise: Promise<T>;
  private resolve: (resolvedValue: T) => void;
  private reject: (error: Error) => void;

  constructor(private task: Task<T>) {
    this.workerPromise = new Promise<T>((resolve, reject) => {
      this.resolve = resolve;
      this.reject = reject;
    });
  }

  public run(): Promise<T> {
    const taskPromise = this.task();
    taskPromise.then(this.resolve, this.reject);
    return taskPromise;
  }
}

@Injectable()
export class WorkerPool<T> {
  private queuedTasks: Array<Worker<T>> = [];
  private runningTasks = 0;

  constructor(@Optional() private concurrency: number = 30) {}

  public queue(tasks: Task<T>[]): Promise<T[]> {
    const workers = tasks.map((task) => new Worker(task));

    for (const worker of workers) {
      this.queuedTasks.push(worker);
    }

    this.flush();

    return Promise.all(workers.map(({ workerPromise }) => workerPromise));
  }

  private flush() {
    while (this.queuedTasks.length && this.runningTasks < this.concurrency) {
      this.runTask(queuedTasks.shift()!);
    }
  }

  private completeTask() {
    this.runningTasks--;
    this.flush();
  }

  private runTask(worker: Worker<T>) {
    this.runningTasks++;
    const done = () => {
      return this.completeTask();
    };
    worker.run().then(done, done);
  }
}
Adrian

Softwareentwickler

Zur Übersicht

Standort Hannover

newcubator GmbH
Bödekerstraße 22
30161 Hannover

Standort Dortmund

newcubator GmbH
Westenhellweg 85-89
44137 Dortmund