Zwei Softwareentwicklerinnen arbeiten am Laptop
Adrian, Kiya | 14.08.2023

Mastering Concurrency Control in TypeScript: Unleashing the Power of WorkerPool

Web > Mastering Concurrency Control in TypeScript: Unleashing the Power of WorkerPool

Asynchronous programming is an integral part of any modern-day application. With promises and async/await, asynchronous code in JavaScript and TypeScript can be written in a more readable fashion. But, handling multiple tasks concurrently can still be a daunting task. That's where WorkerPool comes into play.

This TypeScript code defines a WorkerPool class that manages the execution of asynchronous tasks with concurrency control. It uses a Worker class to execute tasks individually.

The following is a breakdown of the code:

  1. We first define a generic Task type that refers to a function returning a promise.

  2. Then a generic Worker class is created with properties and methods like workerPromise, resolve and reject, a constructor, and a run.

    • workerPromise: A promise that reflects the result of the task.

    • resolve and reject: Functions to resolve or deny the worker's promise.

    • constructor: Takes in a Task<T> and initializes the workerPromise.

    • run: Executes the task and either resolve or reject the worker's promise.

  3. We then define a WorkerPool class that manages the execution of tasks with the following properties and methods:

    • queuedTasks: An array of queued Worker<T> instances.

    • runningTasks: Reflects the number of tasks that are currently running.

    • constructor: Initializes the concurrency limit, default being 30 if not provided.

    • queue: Takes array of tasks, creates workers for each, add them to the queue, and returns a promise that resolves to an array of results when all tasks are done.

    • flush: Executes queued tasks while there are tasks in the queue and the number of running tasks is less than the concurrency limit.

    • completeTask: Decreases the running task count and calls flush to process any remaining tasks.

    • runTask: Takes a worker, increases the number of running tasks, and runs the worker’s task. Calls completeTask when the task is finished or fails.

In summation, we have a generic WorkerPool class that manages the execution of asynchronous tasks with concurrency control using the NestJS framework. You can utilize this WorkerPool class to queue up and run tasks, ensuring that no more than a certain limit number of tasks are executed simultaneously.

Here, Look at the code:

1import { Injectable, Optional } from '@nestjs/common';
2
3export type Task<T> = () => Promise<T>;
4
5class Worker<T> {
6  public workerPromise: Promise<T>;
7  private resolve: (resolvedValue: T) => void;
8  private reject: (error: Error) => void;
9
10  constructor(private task: Task<T>) {
11    this.workerPromise = new Promise<T>((resolve, reject) => {
12      this.resolve = resolve;
13      this.reject = reject;
14    });
15  }
16
17  public run(): Promise<T> {
18    const taskPromise = this.task();
19    taskPromise.then(this.resolve, this.reject);
20    return taskPromise;
21  }
22}
23
24@Injectable()
25export class WorkerPool<T> {
26  private queuedTasks: Array<Worker<T>> = [];
27  private runningTasks = 0;
28
29  constructor(@Optional() private concurrency: number = 30) {}
30
31  public queue(tasks: Task<T>[]): Promise<T[]> {
32    const workers = tasks.map((task) => new Worker(task));
33
34    for (const worker of workers) {
35      this.queuedTasks.push(worker);
36    }
37
38    this.flush();
39
40    return Promise.all(workers.map(({ workerPromise }) => workerPromise));
41  }
42
43  private flush() {
44    while (this.queuedTasks.length && this.runningTasks < this.concurrency) {
45      this.runTask(queuedTasks.shift()!);
46    }
47  }
48
49  private completeTask() {
50    this.runningTasks--;
51    this.flush();
52  }
53
54  private runTask(worker: Worker<T>) {
55    this.runningTasks++;
56    const done = () => {
57      return this.completeTask();
58    };
59    worker.run().then(done, done);
60  }
61}

This example can help you manage asynchronous tasks using Concurrency Control in your TypeScript application in a more efficient way.

Content
  • What is the role of Concurrency Control in TypeScript?
  • How does the WorkerPool class operate?
  • How to queue and run tasks using the WorkerPool class?
Adrian Görisch
Adrian (Softwareentwickler)

... arbeitet als zuverlässiger Full-Stack-Softwareentwickler, aktuell hauptsächlich im Backend mit NestJS, ECMAScript und diversen Datenbanktechnologien. Besonders großen Wert legt er auf ordentliche... mehr anzeigen

Github
Kiya

... ist unsere engagierte und leidenschaftliche Künstliche Intelligenz und Expertin für Softwareentwicklung. Mit einem unermüdlichen Interesse für technologische Innovationen bringt sie Enthusiasmus u... mehr anzeigen

Standort Hannover

newcubator GmbH
Bödekerstraße 22
30161 Hannover

Standort Dortmund

newcubator GmbH
Westenhellweg 85-89
44137 Dortmund