11.8.2023
Mastering Concurrency Control in TypeScript: Unleashing the Power of WorkerPool
This TypeScript code defines a WorkerPool
class that manages the execution of asynchronous tasks with concurrency control. It utilizes the Worker
class to execute tasks individually.
Here's a breakdown of the code:
- Define a generic
Task
type that represents a function returning a promise. - Create a generic
Worker
class with the following properties and methods:-
workerPromise
: A promise that resolves to the result of the task. -
resolve
andreject
: Functions to resolve or reject the worker's promise. -
constructor
: Takes aTask<T>
as input and initializes the workerPromise. -
run
: Executes the task and resolves or rejects the worker's promise accordingly.
-
- Define a generic
WorkerPool
class that manages the execution of tasks with the following properties and methods:-
queuedTasks
: An array of queuedWorker<T>
instances. -
runningTasks
: The number of tasks currently running. -
constructor
: Initializes the concurrency limit, which defaults to 30 if not provided. -
queue
: Takes an array of tasks, creates workers for each task, adds them to the queue, and returns a promise that resolves to an array of results when all tasks are complete. -
flush
: Runs queued tasks while there are tasks in the queue and the number of running tasks is below the concurrency limit. -
completeTask
: Decreases the running task count and callsflush
to process any remaining tasks. -
runTask
: Takes a worker, increases the running task count, and runs the worker's task. When the task is complete or fails, it callscompleteTask
.
-
In summary, this code provides a generic WorkerPool
class that manages the execution of asynchronous tasks with concurrency control using the NestJS framework. Developers can use the WorkerPool
class to queue and run tasks, ensuring that no more than the specified concurrency limit of tasks are executed simultaneously.
import { Injectable, Optional } from '@nestjs/common';
export type Task<T> = () => Promise<T>;
class Worker<T> {
public workerPromise: Promise<T>;
private resolve: (resolvedValue: T) => void;
private reject: (error: Error) => void;
constructor(private task: Task<T>) {
this.workerPromise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}
public run(): Promise<T> {
const taskPromise = this.task();
taskPromise.then(this.resolve, this.reject);
return taskPromise;
}
}
@Injectable()
export class WorkerPool<T> {
private queuedTasks: Array<Worker<T>> = [];
private runningTasks = 0;
constructor(@Optional() private concurrency: number = 30) {}
public queue(tasks: Task<T>[]): Promise<T[]> {
const workers = tasks.map((task) => new Worker(task));
for (const worker of workers) {
this.queuedTasks.push(worker);
}
this.flush();
return Promise.all(workers.map(({ workerPromise }) => workerPromise));
}
private flush() {
while (this.queuedTasks.length && this.runningTasks < this.concurrency) {
this.runTask(queuedTasks.shift()!);
}
}
private completeTask() {
this.runningTasks--;
this.flush();
}
private runTask(worker: Worker<T>) {
this.runningTasks++;
const done = () => {
return this.completeTask();
};
worker.run().then(done, done);
}
}
Standort Hannover
newcubator GmbH
Bödekerstraße 22
30161 Hannover
Standort Dortmund
newcubator GmbH
Westenhellweg 85-89
44137 Dortmund