Multiprocessing pool implementation for NodeJS and TypeScript.
Real multiprocessing is implemented using child_process module.
npm i multiprocessor
import { Pool } from 'multiprocessor';
const poolSize = 4;
const pool = new Pool(poolSize);
const input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const result = await pool.map(input, calcSinTask, {
onTaskSuccess: (result: number, input: number, index: number) => {
console.log(`Task #${index} | result: ${result}, input: ${input}`);
},
onTaskError: (error: string, input: number, index: number) => {
console.log(`Task #${index} | error: ${error}, input: ${input}`);
}
});
pool.close();
console.log(result);
// [ 0.8414, 0.9092, 0.1411, ... ]
function calcSinTask(x: number): number {
let result = 0;
let sign = 1;
let power = x;
let factorial = 1;
for (let n = 0; n < 1000000; n++) {
if (n > 0) {
factorial *= (2 * n) * (2 * n + 1);
power *= x * x;
sign *= -1;
}
const delta = sign * (power / factorial);
if (isNaN(result + delta)) {
return result
}
result += delta;
}
return result;
}
You can run this example from this repository.
// File: src/index.ts
import { Pool } from 'multiprocessor';
const poolSize = 4;
const pool = new Pool(poolSize);
const input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const result = await pool.map(input, calcSinTask, {
onTaskSuccess: (result: number, input: number, index: number) => {
console.log(`Task #${index} | result: ${result}, input: ${input}`);
},
onTaskError: (error: string, input: number, index: number) => {
console.log(`Task #${index} | error: ${error}, input: ${input}`);
}
});
pool.close();
console.log(result);
// [ 0.8414, 0.9092, 0.1411, ... ]
async function calcSinTask(x: number): Promise<number> {
const dirName = __dirname.replace('/node_modules/multiprocessor/lib', '/src');
const { calcSin } = await import(`${dirName}/path/to/your/module`);
return calcSin(x);
}
// File: src/path/to/your/module.ts
export function calcSin(x: number): number {
let result = 0;
let sign = 1;
let power = x;
let factorial = 1;
for (let n = 0; n < 1000000; n++) {
if (n > 0) {
factorial *= (2 * n) * (2 * n + 1);
power *= x * x;
sign *= -1;
}
const delta = calcDelta(sign, power, factorial);
if (isNaN(result + delta)) {
return result
}
result += delta;
}
return result;
}
function calcDelta(sign: number, power: number, factorial: number): number {
return sign * (power / factorial);
}
For detailed documentation and usage examples, please refer to the API documentation.
export type Task<TInput, TResult> = (input: TInput) => (Promise<TResult> | TResult);
export type TaskSuccessHandler<TInput, TResult> = (result: TResult, input: TInput, index: number) => void;
export type TaskErrorHandler<TInput> = (error: string, input: TInput, index: number) => void;
export type TaskResponse<TResult> = [number, TResult | undefined, string | undefined];
class Pool extends EventEmitter {
/**
* Create a new pool with the specified number of workers.
*
* @param poolSize The number of workers to create in the pool.
*/
constructor(poolSize: number);
/**
* Asynchronously processes tasks from the provided inputs in an ordered manner.
* Tasks are executed concurrently using a pool of workers.
*
* @template TInput The type of the input elements.
* @template TResult The type of the result elements.
*
* @param inputs An iterable or async iterable of input elements.
* @param task The task to execute for each input element.
* @param taskHandlers Optional handlers for task events (onTaskSuccess, onTaskError).
*
* @returns A promise that resolves to an array of task results in the order of the input elements.
*/
public async map<TInput, TResult>(
inputs: Iterable<TInput> | AsyncIterable<TInput>,
task: Task<TInput, TResult>,
taskHandlers?: TaskHandlers<TInput, TResult>,
): Promise<Array<TResult | undefined>>;
/**
* Asynchronously processes tasks from the provided inputs in a lazy ordered manner.
* Tasks are executed concurrently using a pool of workers.
*
* @template TInput The type of the input elements.
* @template TResult The type of the result elements.
*
* @param inputs An iterable or async iterable of input elements.
* @param task The task to execute for each input element.
* @param taskHandlers Optional handlers for task events (onTaskSuccess, onTaskError).
*
* @returns An async generator yielding results of the tasks in the order of the input elements.
*/
public async *imap<TInput, TResult>(
inputs: Iterable<TInput> | AsyncIterable<TInput>,
task: Task<TInput, TResult>,
taskHandlers?: TaskHandlers<TInput, TResult>,
): AsyncGenerator<TResult | undefined>;
/**
* Asynchronously processes tasks from the provided inputs in a lazy unordered manner.
* Tasks are executed concurrently using a pool of workers.
*
* @template TInput The type of the input elements.
* @template TResult The type of the result elements.
*
* @param inputs An iterable or async iterable of input elements.
* @param task The task to execute for each input element.
* @param taskHandlers Optional handlers for task events (onTaskSuccess, onTaskError).
*
* @returns An async generator yielding results of the tasks in completion order.
*/
public async *imapUnordered<TInput, TResult>(
inputs: Iterable<TInput> | AsyncIterable<TInput>,
task: Task<TInput, TResult>,
taskHandlers?: TaskHandlers<TInput, TResult>,
): AsyncGenerator<TResult | undefined>;
/**
* Asynchronously processes tasks from the provided inputs in a lazy unordered manner with extended information.
* Tasks are executed concurrently using a pool of workers.
*
* @template TInput The type of the input elements.
* @template TResult The type of the result elements.
*
* @param inputs An iterable or async iterable of input elements.
* @param task The task to execute for each input element.
* @param taskHandlers Optional handlers for task events (onTaskSuccess, onTaskError).
*
* @returns An async generator yielding task responses containing the index, result or error for each task.
*/
public async *imapUnorderedExtended<TInput, TResult>(
inputs: Iterable<TInput> | AsyncIterable<TInput>,
task: Task<TInput, TResult>,
taskHandlers?: TaskHandlers<TInput, TResult>,
): AsyncGenerator<TaskResponse<TResult>>;
/**
* Closes the worker pool by terminating all worker processes.
* This method should be called when the pool is no longer needed
* to ensure that all resources are properly released.
*/
public close();
}
npm i
npm run test
Multiprocessor TS is licensed under the MIT License.