Installations
npm install p-queue
Developer Guide
Typescript
Yes
Module System
ESM
Min. Node Version
>=18
Node Version
18.19.0
NPM Version
9.2.0
Score
99
Supply Chain
99.5
Quality
78.2
Maintenance
100
Vulnerability
100
License
Releases
Contributors
Unable to fetch Contributors
Languages
TypeScript (100%)
Developer
sindresorhus
Download Statistics
Total Downloads
1,023,129,130
Last Day
592,351
Last Week
5,549,006
Last Month
28,348,439
Last Year
319,279,290
GitHub Statistics
3,509 Stars
142 Commits
187 Forks
22 Watching
1 Branches
31 Contributors
Sponsor this package
Package Meta Information
Latest Version
8.0.1
Package Id
p-queue@8.0.1
Unpacked Size
35.14 kB
Size
9.25 kB
File Count
13
NPM Version
9.2.0
Node Version
18.19.0
Publised On
14 Dec 2023
Total Downloads
Cumulative downloads
Total Downloads
1,023,129,130
Last day
-54.5%
592,351
Compared to previous day
Last week
-19.1%
5,549,006
Compared to previous week
Last month
1.2%
28,348,439
Compared to previous month
Last year
27.4%
319,279,290
Compared to previous year
Daily Downloads
Weekly Downloads
Monthly Downloads
Yearly Downloads
p-queue
Promise queue with concurrency control
Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
For servers, you probably want a Redis-backed job queue instead.
Note that the project is feature complete. We are happy to review pull requests, but we don't plan any further development. We are also not answering email support questions.
Install
1npm install p-queue
Warning: This package is native ESM and no longer provides a CommonJS export. If your project uses CommonJS, you'll have to convert to ESM. Please don't open issues for questions regarding CommonJS / ESM.
Usage
Here we run only one promise at the time. For example, set concurrency
to 4 to run four promises at the same time.
1import PQueue from 'p-queue'; 2import got from 'got'; 3 4const queue = new PQueue({concurrency: 1}); 5 6(async () => { 7 await queue.add(() => got('https://sindresorhus.com')); 8 console.log('Done: sindresorhus.com'); 9})(); 10 11(async () => { 12 await queue.add(() => got('https://avajs.dev')); 13 console.log('Done: avajs.dev'); 14})(); 15 16(async () => { 17 const task = await getUnicornTask(); 18 await queue.add(task); 19 console.log('Done: Unicorn task'); 20})();
API
PQueue(options?)
Returns a new queue
instance, which is an EventEmitter3
subclass.
options
Type: object
concurrency
Type: number
Default: Infinity
Minimum: 1
Concurrency limit.
timeout
Type: number
Per-operation timeout in milliseconds. Operations fulfill once timeout
elapses if they haven't already.
throwOnTimeout
Type: boolean
Default: false
Whether or not a timeout is considered an exception.
autoStart
Type: boolean
Default: true
Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
queueClass
Type: Function
Class with a enqueue
and dequeue
method, and a size
getter. See the Custom QueueClass section.
intervalCap
Type: number
Default: Infinity
Minimum: 1
The max number of runs in the given interval of time.
interval
Type: number
Default: 0
Minimum: 0
The length of time in milliseconds before the interval count resets. Must be finite.
carryoverConcurrencyCount
Type: boolean
Default: false
If true
, specifies that any pending Promises, should be carried over into the next interval and counted against the intervalCap
. If false
, any of those pending Promises will not count towards the next intervalCap
.
queue
PQueue
instance.
.add(fn, options?)
Adds a sync or async task to the queue. Always returns a promise.
Note: If your items can potentially throw an exception, you must handle those errors from the returned Promise or they may be reported as an unhandled Promise rejection and potentially cause your process to exit immediately.
fn
Type: Function
Promise-returning/async function. When executed, it will receive {signal}
as the first argument.
options
Type: object
priority
Type: number
Default: 0
Priority of operation. Operations with greater priority will be scheduled first.
signal
AbortSignal
for cancellation of the operation. When aborted, it will be removed from the queue and the queue.add()
call will reject with an error. If the operation is already running, the signal will need to be handled by the operation itself.
1import PQueue from 'p-queue'; 2import got, {CancelError} from 'got'; 3 4const queue = new PQueue(); 5 6const controller = new AbortController(); 7 8try { 9 await queue.add(({signal}) => { 10 const request = got('https://sindresorhus.com'); 11 12 signal.addEventListener('abort', () => { 13 request.cancel(); 14 }); 15 16 try { 17 return await request; 18 } catch (error) { 19 if (!(error instanceof CancelError)) { 20 throw error; 21 } 22 } 23 }, {signal: controller.signal}); 24} catch (error) { 25 if (!(error instanceof DOMException)) { 26 throw error; 27 } 28}
.addAll(fns, options?)
Same as .add()
, but accepts an array of sync or async functions and returns a promise that resolves when all functions are resolved.
.pause()
Put queue execution on hold.
.start()
Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via options.autoStart = false
or by .pause()
method.)
Returns this
(the instance).
.onEmpty()
Returns a promise that settles when the queue becomes empty.
Can be called multiple times. Useful if you for example add additional items at a later time.
.onIdle()
Returns a promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0
.
The difference with .onEmpty
is that .onIdle
guarantees that all work from the queue has finished. .onEmpty
merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
.onSizeLessThan(limit)
Returns a promise that settles when the queue size is less than the given limit: queue.size < limit
.
If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan()
before adding a new item.
Note that this only limits the number of items waiting to start. There could still be up to concurrency
jobs already running that this call does not include in its calculation.
.clear()
Clear the queue.
.size
Size of the queue, the number of queued items waiting to run.
.sizeBy(options)
Size of the queue, filtered by the given options.
For example, this can be used to find the number of items remaining in the queue with a specific priority level.
1import PQueue from 'p-queue'; 2 3const queue = new PQueue(); 4 5queue.add(async () => '🦄', {priority: 1}); 6queue.add(async () => '🦄', {priority: 0}); 7queue.add(async () => '🦄', {priority: 1}); 8 9console.log(queue.sizeBy({priority: 1})); 10//=> 2 11 12console.log(queue.sizeBy({priority: 0})); 13//=> 1
.pending
Number of running items (no longer in the queue).
.timeout
.concurrency
.isPaused
Whether the queue is currently paused.
Events
active
Emitted as each item is processed in the queue for the purpose of tracking progress.
1import delay from 'delay'; 2import PQueue from 'p-queue'; 3 4const queue = new PQueue({concurrency: 2}); 5 6let count = 0; 7queue.on('active', () => { 8 console.log(`Working on item #${++count}. Size: ${queue.size} Pending: ${queue.pending}`); 9}); 10 11queue.add(() => Promise.resolve()); 12queue.add(() => delay(2000)); 13queue.add(() => Promise.resolve()); 14queue.add(() => Promise.resolve()); 15queue.add(() => delay(500));
completed
Emitted when an item completes without error.
1import delay from 'delay'; 2import PQueue from 'p-queue'; 3 4const queue = new PQueue({concurrency: 2}); 5 6queue.on('completed', result => { 7 console.log(result); 8}); 9 10queue.add(() => Promise.resolve('hello, world!'));
error
Emitted if an item throws an error.
1import delay from 'delay'; 2import PQueue from 'p-queue'; 3 4const queue = new PQueue({concurrency: 2}); 5 6queue.on('error', error => { 7 console.error(error); 8}); 9 10queue.add(() => Promise.reject(new Error('error')));
empty
Emitted every time the queue becomes empty.
Useful if you for example add additional items at a later time.
idle
Emitted every time the queue becomes empty and all promises have completed; queue.size === 0 && queue.pending === 0
.
The difference with empty
is that idle
guarantees that all work from the queue has finished. empty
merely signals that the queue is empty, but it could mean that some promises haven't completed yet.
1import delay from 'delay'; 2import PQueue from 'p-queue'; 3 4const queue = new PQueue(); 5 6queue.on('idle', () => { 7 console.log(`Queue is idle. Size: ${queue.size} Pending: ${queue.pending}`); 8}); 9 10const job1 = queue.add(() => delay(2000)); 11const job2 = queue.add(() => delay(500)); 12 13await job1; 14await job2; 15// => 'Queue is idle. Size: 0 Pending: 0' 16 17await queue.add(() => delay(600)); 18// => 'Queue is idle. Size: 0 Pending: 0'
The idle
event is emitted every time the queue reaches an idle state. On the other hand, the promise the onIdle()
function returns resolves once the queue becomes idle instead of every time the queue is idle.
add
Emitted every time the add method is called and the number of pending or queued tasks is increased.
next
Emitted every time a task is completed and the number of pending or queued tasks is decreased. This is emitted regardless of whether the task completed normally or with an error.
1import delay from 'delay'; 2import PQueue from 'p-queue'; 3 4const queue = new PQueue(); 5 6queue.on('add', () => { 7 console.log(`Task is added. Size: ${queue.size} Pending: ${queue.pending}`); 8}); 9 10queue.on('next', () => { 11 console.log(`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`); 12}); 13 14const job1 = queue.add(() => delay(2000)); 15const job2 = queue.add(() => delay(500)); 16 17await job1; 18await job2; 19//=> 'Task is added. Size: 0 Pending: 1' 20//=> 'Task is added. Size: 0 Pending: 2' 21 22await queue.add(() => delay(600)); 23//=> 'Task is completed. Size: 0 Pending: 1' 24//=> 'Task is completed. Size: 0 Pending: 0'
Advanced example
A more advanced example to help you understand the flow.
1import delay from 'delay'; 2import PQueue from 'p-queue'; 3 4const queue = new PQueue({concurrency: 1}); 5 6(async () => { 7 await delay(200); 8 9 console.log(`8. Pending promises: ${queue.pending}`); 10 //=> '8. Pending promises: 0' 11 12 (async () => { 13 await queue.add(async () => '🐙'); 14 console.log('11. Resolved') 15 })(); 16 17 console.log('9. Added 🐙'); 18 19 console.log(`10. Pending promises: ${queue.pending}`); 20 //=> '10. Pending promises: 1' 21 22 await queue.onIdle(); 23 console.log('12. All work is done'); 24})(); 25 26(async () => { 27 await queue.add(async () => '🦄'); 28 console.log('5. Resolved') 29})(); 30console.log('1. Added 🦄'); 31 32(async () => { 33 await queue.add(async () => '🐴'); 34 console.log('6. Resolved') 35})(); 36console.log('2. Added 🐴'); 37 38(async () => { 39 await queue.onEmpty(); 40 console.log('7. Queue is empty'); 41})(); 42 43console.log(`3. Queue size: ${queue.size}`); 44//=> '3. Queue size: 1` 45 46console.log(`4. Pending promises: ${queue.pending}`); 47//=> '4. Pending promises: 1'
$ node example.js
1. Added 🦄
2. Added 🐴
3. Queue size: 1
4. Pending promises: 1
5. Resolved 🦄
6. Resolved 🐴
7. Queue is empty
8. Pending promises: 0
9. Added 🐙
10. Pending promises: 1
11. Resolved 🐙
12. All work is done
Custom QueueClass
For implementing more complex scheduling policies, you can provide a QueueClass in the options:
1import PQueue from 'p-queue'; 2 3class QueueClass { 4 constructor() { 5 this._queue = []; 6 } 7 8 enqueue(run, options) { 9 this._queue.push(run); 10 } 11 12 dequeue() { 13 return this._queue.shift(); 14 } 15 16 get size() { 17 return this._queue.length; 18 } 19 20 filter(options) { 21 return this._queue; 22 } 23} 24 25const queue = new PQueue({queueClass: QueueClass});
p-queue
will call corresponding methods to put and get operations from this queue.
FAQ
How do the concurrency
and intervalCap
options affect each other?
They are just different constraints. The concurrency
option limits how many things run at the same time. The intervalCap
option limits how many things run in total during the interval (over time).
Maintainers
Related
- p-limit - Run multiple promise-returning & async functions with limited concurrency
- p-throttle - Throttle promise-returning & async functions
- p-debounce - Debounce promise-returning & async functions
- p-all - Run promise-returning & async functions concurrently with optional limited concurrency
- More…
No vulnerabilities found.
Reason
no dangerous workflow patterns detected
Reason
security policy file detected
Details
- Info: security policy file detected: .github/security.md:1
- Info: Found linked content: .github/security.md:1
- Info: Found disclosure, vulnerability, and/or timelines in security policy: .github/security.md:1
- Info: Found text in security policy: .github/security.md:1
Reason
no binaries found in the repo
Reason
0 existing vulnerabilities detected
Reason
license file detected
Details
- Info: project has a license file: license:0
- Info: FSF or OSI recognized license: MIT License: license:0
Reason
Found 2/30 approved changesets -- score normalized to 0
Reason
detected GitHub workflow tokens with excessive permissions
Details
- Warn: no topLevel permission defined: .github/workflows/main.yml:1
- Info: no jobLevel write permissions found
Reason
1 commit(s) and 0 issue activity found in the last 90 days -- score normalized to 0
Reason
dependency not pinned by hash detected -- score normalized to 0
Details
- Warn: GitHub-owned GitHubAction not pinned by hash: .github/workflows/main.yml:16: update your workflow using https://app.stepsecurity.io/secureworkflow/sindresorhus/p-queue/main.yml/main?enable=pin
- Warn: GitHub-owned GitHubAction not pinned by hash: .github/workflows/main.yml:17: update your workflow using https://app.stepsecurity.io/secureworkflow/sindresorhus/p-queue/main.yml/main?enable=pin
- Warn: npmCommand not pinned by hash: .github/workflows/main.yml:21
- Info: 0 out of 2 GitHub-owned GitHubAction dependencies pinned
- Info: 0 out of 1 npmCommand dependencies pinned
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
project is not fuzzed
Details
- Warn: no fuzzer integrations found
Reason
branch protection not enabled on development/release branches
Details
- Warn: branch protection not enabled for branch 'main'
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
- Warn: 0 commits out of 2 are checked with a SAST tool
Score
4.1
/10
Last Scanned on 2024-12-16
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More