PQueue
PQueue
is a powerful promise queue library that provides concurrency control for managing asynchronous tasks in JavaScript. It allows developers to:
AbortController
.Benefits:
Use Cases:
Install PQueue
using npm:
npm install p-queue
Or using yarn:
yarn add p-queue
Create a queue and add tasks:
import PQueue from 'p-queue';
const queue = new PQueue();
const task = async () => {
// Your asynchronous operation
return 'Task result';
};
queue.add(task).then(result => {
console.log(result); // Output: 'Task result'
});
In this example, a new PQueue
instance is created, and an asynchronous task is added to the queue using .add()
. The task executes immediately since the default concurrency is unlimited.
.add()
Use .add()
to include a single task in the queue. The task can be a function returning a promise (asynchronous) or a synchronous value.
queue.add(async () => {
// Asynchronous task
await performAsyncOperation();
return 'Async result';
});
queue.add(() => {
// Synchronous task
return 'Sync result';
});
.addAll()
To add multiple tasks simultaneously, use .addAll()
with an array of task functions.
const tasks = [
async () => {
await performAsyncOperation();
return 'Task 1';
},
() => 'Task 2',
];
queue.addAll(tasks).then(results => {
console.log(results); // Output: ['Task 1', 'Task 2']
});
Set the concurrency
option to limit the number of tasks running concurrently.
const queue = new PQueue({ concurrency: 2 });
queue.add(async () => {
// Task 1
});
queue.add(async () => {
// Task 2
});
queue.add(async () => {
// Task 3
});
console.log(queue.size); // Output: 1 (queued tasks)
console.log(queue.pending); // Output: 2 (running tasks)
Example from Tests:
const queue = new PQueue({ concurrency: 2 });
queue.add(async () => 'Result 1');
queue.add(async () => {
await delay(100);
return 'Result 2';
});
queue.add(async () => 'Result 3');
console.log(queue.size); // Output: 1
console.log(queue.pending); // Output: 2
interval
and intervalCap
intervalCap
: Maximum number of tasks to execute per interval.interval
: Time frame for the interval in milliseconds.const queue = new PQueue({
intervalCap: 5, // Max 5 tasks
interval: 1000, // Per 1000 milliseconds
});
Example from Tests:
const queue = new PQueue({
concurrency: 5,
intervalCap: 10,
interval: 1000,
});
for (let i = 0; i < 13; i++) {
queue.add(async () => {
await delay(300);
console.log(`Task ${i} completed`);
});
}
carryoverConcurrencyCount
When carryoverConcurrencyCount
is true
, unfinished tasks are counted towards the intervalCap
in the next interval.
const queue = new PQueue({
intervalCap: 1,
interval: 500,
carryoverConcurrencyCount: true,
});
queue.add(async () => {
await delay(600); // Exceeds the interval of 500ms
console.log('Task completed');
});
Assign priorities to tasks to control their execution order. Higher priority tasks run before lower priority ones.
queue.add(() => console.log('Task 1'), { priority: 1 });
queue.add(() => console.log('Task 2'), { priority: 0 });
queue.add(() => console.log('Task 3'), { priority: 2 });
Example from Tests:
const result = [];
const queue = new PQueue({ concurrency: 1 });
queue.add(() => result.push('A'), { priority: 1 });
queue.add(() => result.push('B'), { priority: 0 });
queue.add(() => result.push('C'), { priority: 1 });
queue.add(() => result.push('D'), { priority: 2 });
queue.onIdle().then(() => {
console.log(result); // Output: ['D', 'A', 'C', 'B']
});
By default, the queue starts processing tasks immediately. Set autoStart
to false
to manually control task execution.
const queue = new PQueue({ autoStart: false });
queue.add(() => 'Task');
console.log(queue.isPaused); // Output: true
queue.start();
console.log(queue.isPaused); // Output: false
Example from Tests:
const queue = new PQueue({ concurrency: 2, autoStart: false });
queue.add(() => delay(1000));
console.log(queue.size); // Output: 1
console.log(queue.pending); // Output: 0
console.log(queue.isPaused); // Output: true
queue.start();
console.log(queue.isPaused); // Output: false
Use .pause()
to stop task execution and .start()
to resume it.
queue.pause();
queue.add(() => 'Task');
console.log(queue.isPaused); // Output: true
queue.start();
console.log(queue.isPaused); // Output: false
Example from Tests:
const queue = new PQueue({ concurrency: 2 });
queue.pause();
queue.add(() => delay(1000));
queue.add(() => delay(1000));
console.log(queue.size); // Output: 2
console.log(queue.pending); // Output: 0
console.log(queue.isPaused); // Output: true
queue.start();
console.log(queue.isPaused); // Output: false
Set global or per-task timeouts to prevent tasks from running indefinitely.
timeout
: Maximum duration (in milliseconds) a task can run.throwOnTimeout
: Determines if a timeout should result in an error.const queue = new PQueue({ timeout: 300, throwOnTimeout: true });
queue.add(async () => {
await delay(400); // Exceeds the timeout
});
// This task will throw a timeout error.
Example from Tests:
const queue = new PQueue({ timeout: 300, throwOnTimeout: true });
queue.add(async () => {
await delay(400);
}).catch(error => {
console.error(error.message); // Output: 'Promise timed out after 300 milliseconds'
});
Monitor the queue's state using properties and methods:
size
: Number of tasks waiting in the queue.pending
: Number of tasks currently running.console.log(queue.size); // Output: Number of queued tasks
console.log(queue.pending); // Output: Number of running tasks
.onEmpty()
Resolves when the queue has no tasks waiting to be processed.
queue.onEmpty().then(() => {
console.log('Queue is empty');
});
.onIdle()
Resolves when all tasks have completed (both pending and queued).
queue.onIdle().then(() => {
console.log('Queue is idle');
});
Example from Tests:
const queue = new PQueue({ concurrency: 2 });
queue.add(() => delay(100));
queue.add(() => delay(100));
queue.add(() => delay(100));
console.log(queue.size); // Output: 1
console.log(queue.pending); // Output: 2
queue.onIdle().then(() => {
console.log('All tasks completed');
});
Use .clear()
to remove all pending tasks from the queue.
queue.add(() => delay(1000));
queue.add(() => delay(1000));
console.log(queue.size); // Output: 2
queue.clear();
console.log(queue.size); // Output: 0
Example from Tests:
const queue = new PQueue({ concurrency: 2 });
queue.add(() => delay(20000));
queue.add(() => delay(20000));
queue.add(() => delay(20000));
console.log(queue.size); // Output: 1
console.log(queue.pending); // Output: 2
queue.clear();
console.log(queue.size); // Output: 0
Use AbortController
and AbortSignal
to cancel tasks.
const controller = new AbortController();
queue.add(async ({ signal }) => {
signal.addEventListener('abort', () => {
console.log('Task aborted');
});
await performAsyncOperation();
}, { signal: controller.signal });
// To abort the task:
controller.abort();
Example from Tests:
const queue = new PQueue();
const controller = new AbortController();
controller.abort();
queue.add(() => 'Task', { signal: controller.signal }).catch(error => {
console.error(error.name); // Output: 'AbortError'
});
Modify options like concurrency
, timeout
, and throttling settings at runtime.
queue.concurrency = 5;
queue.timeout = 500;
queue.interval = 1000;
queue.intervalCap = 10;
Example from Tests:
let concurrency = 5;
const queue = new PQueue({ concurrency });
for (let i = 0; i < 100; i++) {
queue.add(async () => {
// Task logic
if (i % 30 === 0) {
queue.concurrency = --concurrency;
console.log(`Concurrency updated to ${queue.concurrency}`);
}
});
}
Combine synchronous and asynchronous tasks within the same queue seamlessly.
queue.add(() => 'Synchronous Task');
queue.add(async () => {
await performAsyncOperation();
return 'Asynchronous Task';
});
Example from Tests:
const queue = new PQueue({ concurrency: 1 });
queue.add(() => 'sync 1');
queue.add(async () => delay(1000));
queue.add(() => 'sync 2');
queue.add(() => 'sync 3');
queue.onIdle().then(() => {
console.log('All tasks completed');
});
Errors thrown within tasks will cause the task's promise to be rejected.
queue.add(async () => {
throw new Error('Task failed');
}).catch(error => {
console.error('Task error:', error.message);
});
Example from Tests:
const queue = new PQueue({ concurrency: 1 });
queue.add(async () => {
throw new Error('broken');
}).catch(error => {
console.error(error.message); // Output: 'broken'
});
Listen for error
events to manage task failures effectively.
queue.on('error', error => {
console.error('Error event:', error.message);
});
queue.add(async () => {
throw new Error('Task failed');
});
Example from Tests:
const queue = new PQueue({ concurrency: 1 });
queue.on('error', error => {
console.error('Error event:', error.message);
});
queue.add(async () => {
throw new Error('failure');
});
add
: Emitted when a task is added to the queue.active
: Emitted when a task starts running.next
: Emitted after a task completes.empty
: Emitted when the queue becomes empty (no queued tasks).idle
: Emitted when all tasks have completed (no running or queued tasks).completed
: Emitted when a task finishes successfully.error
: Emitted when a task fails.queue.on('add', () => {
console.log('Task added to the queue');
});
queue.on('active', () => {
console.log('Task started');
});
queue.on('completed', result => {
console.log('Task completed with result:', result);
});
queue.on('error', error => {
console.error('Task failed with error:', error.message);
});
queue.on('idle', () => {
console.log('All tasks have completed');
});
Example from Tests:
const items = [1, 2, 3];
const queue = new PQueue();
let activeCount = 0;
queue.on('active', () => {
activeCount++;
console.log(`Active tasks: ${activeCount}`);
});
items.forEach(item => {
queue.add(() => item);
});
queue.onIdle().then(() => {
console.log('All tasks completed');
});
Ensure that configuration options are set correctly to avoid runtime errors.
concurrency
: Must be a positive number greater than zero.interval
: Must be a finite non-negative number.intervalCap
: Must be a positive number.try {
const queue = new PQueue({ concurrency: 0 });
} catch (error) {
console.error(error.message); // Output: 'Expected `concurrency` to be a number from 1 and up'
}
Example from Tests:
try {
new PQueue({ interval: -1 });
} catch (error) {
console.error(error.message); // Output: 'Expected `interval` to be a finite number >= 0'
}
concurrency
, interval
, and intervalCap
are set to valid numbers.Control the rate of API calls to comply with external API limits.
const queue = new PQueue({
interval: 1000, // 1 second interval
intervalCap: 5, // Max 5 API calls per interval
});
const fetchData = async url => {
const response = await fetch(url);
return response.json();
};
const urls = ['https://api.example.com/data1', 'https://api.example.com/data2'];
urls.forEach(url => {
queue.add(() => fetchData(url)).then(data => {
console.log(data);
});
});
Efficiently process large data sets with controlled concurrency.
const queue = new PQueue({ concurrency: 3 });
const processItem = async item => {
// Processing logic
};
const items = [/* Large array of items */];
items.forEach(item => {
queue.add(() => processItem(item));
});
.catch()
or try...catch
to manage errors in tasks.idle
and empty
to track the queue.concurrency
based on system capabilities and task nature.For detailed information on all methods and options, refer to the [PQueue documentation](https://github.com/sindresorhus/p-queue).
.onEmpty()
or .onIdle()
when the queue is already empty resolves immediately.autoStart
is true
or call queue.start()
.interval
and intervalCap
to match desired throughput.Can I Change Concurrency During Execution?
queue.concurrency
at any time.How Do I Cancel a Task?
AbortController
and pass its signal to the task.What Happens When a Task Times Out?
throwOnTimeout
is true
, the task will reject with a timeout error.This guide provides a comprehensive overview of PQueue
, illustrating its features with practical code examples. By leveraging these functionalities, you can efficiently manage asynchronous tasks with precise control over execution flow, concurrency, and error handling.
Pay now to fund the work behind this issue.
Get updates on progress being made.
Maintainer is rewarded once the issue is completed.
You're funding impactful open source efforts
You want to contribute to this effort
You want to get funding like this too