Gathering detailed insights and metrics for async-mutex
Gathering detailed insights and metrics for async-mutex
Gathering detailed insights and metrics for async-mutex
Gathering detailed insights and metrics for async-mutex
A mutex for synchronizing async workflows in Javascript
npm install async-mutex
99.5
Supply Chain
99.1
Quality
77.6
Maintenance
100
Vulnerability
100
License
Module System
Min. Node Version
Typescript Support
Node Version
NPM Version
1,145 Stars
157 Commits
63 Forks
7 Watching
2 Branches
16 Contributors
Updated on 27 Nov 2024
Minified
Minified + Gzipped
TypeScript (100%)
Cumulative downloads
Total Downloads
Last day
2.6%
445,402
Compared to previous day
Last week
6%
2,366,007
Compared to previous week
Last month
12.9%
9,918,970
Compared to previous month
Last year
58.8%
88,283,857
Compared to previous year
This package implements primitives for synchronizing asynchronous operations in Javascript.
The term "mutex" usually refers to a data structure used to synchronize concurrent processes running on different threads. For example, before accessing a non-threadsafe resource, a thread will lock the mutex. This is guaranteed to block the thread until no other thread holds a lock on the mutex and thus enforces exclusive access to the resource. Once the operation is complete, the thread releases the lock, allowing other threads to acquire a lock and access the resource.
While Javascript is strictly single-threaded, the asynchronous nature of its execution model allows for race conditions that require similar synchronization primitives. Consider for example a library communicating with a web worker that needs to exchange several subsequent messages with the worker in order to achieve a task. As these messages are exchanged in an asynchronous manner, it is perfectly possible that the library is called again during this process. Depending on the way state is handled during the async process, this will lead to race conditions that are hard to fix and even harder to track down.
This library solves the problem by applying the concept of mutexes to Javascript. Locking the mutex will return a promise that resolves once the mutex becomes available. Once the async process is complete (usually taking multiple spins of the event loop), a callback supplied to the caller should be called in order to release the mutex, allowing the next scheduled worker to execute.
Imagine a situation where you need to control access to several instances of a shared resource. For example, you might want to distribute images between several worker processes that perform transformations, or you might want to create a web crawler that performs a defined number of requests in parallel.
A semaphore is a data structure that is initialized with an arbitrary integer value and that can be locked multiple times. As long as the semaphore value is positive, locking it will return the current value and the locking process will continue execution immediately; the semaphore will be decremented upon locking. Releasing the lock will increment the semaphore again.
Once the semaphore has reached zero, the next process that attempts to acquire a lock will be suspended until another process releases its lock and this increments the semaphore again.
This library provides a semaphore implementation for Javascript that is similar to the mutex implementation described above.
You can install the library into your project via npm
npm install async-mutex
The library is written in TypeScript and will work in any environment that
supports ES5, ES6 promises and Array.isArray
. On ancient browsers,
a shim can be used (e.g. core-js).
No external typings are required for using this library with
TypeScript (version >= 2).
Starting with Node 12.16 and 13.7, native ES6 style imports are supported.
WARNING: Node 13 versions < 13.2.0 fail to import this package correctly. Node 12 and earlier are fine, as are newer versions of Node 13.
CommonJS:
1var Mutex = require('async-mutex').Mutex; 2var Semaphore = require('async-mutex').Semaphore; 3var withTimeout = require('async-mutex').withTimeout;
ES6:
1import {Mutex, Semaphore, withTimeout} from 'async-mutex';
TypeScript:
1import {Mutex, MutexInterface, Semaphore, SemaphoreInterface, withTimeout} from 'async-mutex';
With the latest version of Node, native ES6 style imports are supported.
1const mutex = new Mutex();
Create a new mutex.
Promise style:
1mutex 2 .runExclusive(() => { 3 // ... 4 }) 5 .then((result) => { 6 // ... 7 });
async/await:
1await mutex.runExclusive(async () => { 2 // ... 3});
runExclusive
schedules the supplied callback to be run once the mutex is unlocked.
The function may return a promise. Once the promise is resolved or rejected (or immediately after
execution if an immediate value was returned),
the mutex is released. runExclusive
returns a promise that adopts the state of the function result.
The mutex is released and the result rejected if an exception occurs during execution of the callback.
Promise style:
1mutex 2 .acquire() 3 .then(function(release) { 4 // ... 5 6 release(); 7 });
async/await:
1const release = await mutex.acquire(); 2try { 3 // ... 4} finally { 5 release(); 6}
acquire
returns an (ES6) promise that will resolve as soon as the mutex is
available. The promise resolves with a function release
that
must be called once the mutex should be released again. The release
callback
is idempotent.
IMPORTANT: Failure to call release
will hold the mutex locked and will
likely deadlock the application. Make sure to call release
under all circumstances
and handle exceptions accordingly.
As an alternative to calling the release
callback returned by acquire
, the mutex
can be released by calling release
directly on it:
1mutex.release();
1mutex.isLocked();
Pending locks can be cancelled by calling cancel()
on the mutex. This will reject
all pending locks with E_CANCELED
:
Promise style:
1import {E_CANCELED} from 'async-mutex'; 2 3mutex 4 .runExclusive(() => { 5 // ... 6 }) 7 .then(() => { 8 // ... 9 }) 10 .catch(e => { 11 if (e === E_CANCELED) { 12 // ... 13 } 14 });
async/await:
1import {E_CANCELED} from 'async-mutex'; 2 3try { 4 await mutex.runExclusive(() => { 5 // ... 6 }); 7} catch (e) { 8 if (e === E_CANCELED) { 9 // ... 10 } 11}
This works with acquire
, too:
if acquire
is used for locking, the resulting promise will reject with E_CANCELED
.
The error that is thrown can be customized by passing a different error to the Mutex
constructor:
1const mutex = new Mutex(new Error('fancy custom error'));
Note that while all pending locks are cancelled, a currently held lock will not be
revoked. In consequence, the mutex may not be available even after cancel()
has been called.
You can wait until the mutex is available without locking it by calling waitForUnlock()
.
This will return a promise that resolve once the mutex can be acquired again. This operation
will not lock the mutex, and there is no guarantee that the mutex will still be available
once an async barrier has been encountered.
Promise style:
1mutex 2 .waitForUnlock() 3 .then(() => { 4 // ... 5 });
Async/await:
1await mutex.waitForUnlock(); 2// ...
1const semaphore = new Semaphore(initialValue);
Creates a new semaphore. initialValue
is an arbitrary integer that defines the
initial value of the semaphore.
Promise style:
1semaphore 2 .runExclusive(function(value) { 3 // ... 4 }) 5 .then(function(result) { 6 // ... 7 });
async/await:
1await semaphore.runExclusive(async (value) => { 2 // ... 3});
runExclusive
schedules the supplied callback to be run once the semaphore is available.
The callback will receive the current value of the semaphore as its argument.
The function may return a promise. Once the promise is resolved or rejected (or immediately after
execution if an immediate value was returned),
the semaphore is released. runExclusive
returns a promise that adopts the state of the function result.
The semaphore is released and the result rejected if an exception occurs during execution of the callback.
runExclusive
accepts a first optional argument weight
. Specifying a weight
will decrement the
semaphore by the specified value, and the callback will only be invoked once the semaphore's
value greater or equal to weight
.
runExclusive
accepts a second optional argument priority
. Specifying a greater value for priority
tells the scheduler to run this task before other tasks. priority
can be any real number. The default
is zero.
Promise style:
1semaphore 2 .acquire() 3 .then(function([value, release]) { 4 // ... 5 6 release(); 7 });
async/await:
1const [value, release] = await semaphore.acquire(); 2try { 3 // ... 4} finally { 5 release(); 6}
acquire
returns an (ES6) promise that will resolve as soon as the semaphore is
available. The promise resolves to an array with the
first entry being the current value of the semaphore, and the second value a
function that must be called to release the semaphore once the critical operation
has completed. The release
callback is idempotent.
IMPORTANT: Failure to call release
will hold the semaphore locked and will
likely deadlock the application. Make sure to call release
under all circumstances
and handle exceptions accordingly.
acquire
accepts a first optional argument weight
. Specifying a weight
will decrement the
semaphore by the specified value, and the semaphore will only be acquired once its
value is greater or equal to weight
.
acquire
accepts a second optional argument priority
. Specifying a greater value for priority
tells the scheduler to release the semaphore to the caller before other callers. priority
can be
any real number. The default is zero.
As an alternative to calling the release
callback returned by acquire
, the semaphore
can be released by calling release
directly on it:
1semaphore.release();
release
accepts an optional argument weight
and increments the semaphore accordingly.
IMPORTANT: Releasing a previously acquired semaphore with the releaser that was
returned by acquire will automatically increment the semaphore by the correct weight. If
you release by calling the unscoped release
you have to supply the correct weight
yourself!
1semaphore.getValue()
1semaphore.isLocked();
The semaphore is considered to be locked if its value is either zero or negative.
The value of a semaphore can be set directly to a desired value. A positive value will cause the semaphore to schedule any pending waiters accordingly.
1semaphore.setValue();
Pending locks can be cancelled by calling cancel()
on the semaphore. This will reject
all pending locks with E_CANCELED
:
Promise style:
1import {E_CANCELED} from 'async-mutex'; 2 3semaphore 4 .runExclusive(() => { 5 // ... 6 }) 7 .then(() => { 8 // ... 9 }) 10 .catch(e => { 11 if (e === E_CANCELED) { 12 // ... 13 } 14 });
async/await:
1import {E_CANCELED} from 'async-mutex'; 2 3try { 4 await semaphore.runExclusive(() => { 5 // ... 6 }); 7} catch (e) { 8 if (e === E_CANCELED) { 9 // ... 10 } 11}
This works with acquire
, too:
if acquire
is used for locking, the resulting promise will reject with E_CANCELED
.
The error that is thrown can be customized by passing a different error to the Semaphore
constructor:
1const semaphore = new Semaphore(2, new Error('fancy custom error'));
Note that while all pending locks are cancelled, any currently held locks will not be
revoked. In consequence, the semaphore may not be available even after cancel()
has been called.
You can wait until the semaphore is available without locking it by calling waitForUnlock()
.
This will return a promise that resolve once the semaphore can be acquired again. This operation
will not lock the semaphore, and there is no guarantee that the semaphore will still be available
once an async barrier has been encountered.
Promise style:
1semaphore 2 .waitForUnlock() 3 .then(() => { 4 // ... 5 });
Async/await:
1await semaphore.waitForUnlock(); 2// ...
waitForUnlock
accepts optional arguments weight
and priority
. The promise will resolve as soon
as it is possible to acquire
the semaphore with the given weight and priority. Scheduled tasks with
the greatest priority
values execute first.
Sometimes it is desirable to limit the time a program waits for a mutex or
semaphore to become available. The withTimeout
decorator can be applied
to both semaphores and mutexes and changes the behavior of acquire
and
runExclusive
accordingly.
1import {withTimeout, E_TIMEOUT} from 'async-mutex'; 2 3const mutexWithTimeout = withTimeout(new Mutex(), 100); 4const semaphoreWithTimeout = withTimeout(new Semaphore(5), 100);
The API of the decorated mutex or semaphore is unchanged.
The second argument of withTimeout
is the timeout in milliseconds. After the
timeout is exceeded, the promise returned by acquire
and runExclusive
will
reject with E_TIMEOUT
. The latter will not run the provided callback in case
of an timeout.
The third argument of withTimeout
is optional and can be used to
customize the error with which the promise is rejected.
1const mutexWithTimeout = withTimeout(new Mutex(), 100, new Error('new fancy error')); 2const semaphoreWithTimeout = withTimeout(new Semaphore(5), 100, new Error('new fancy error'));
A shortcut exists for the case where you do not want to wait for a lock to
be available at all. The tryAcquire
decorator can be applied to both mutexes
and semaphores and changes the behavior of acquire
and runExclusive
to
immediately throw E_ALREADY_LOCKED
if the mutex is not available.
Promise style:
1import {tryAcquire, E_ALREADY_LOCKED} from 'async-mutex'; 2 3tryAcquire(semaphoreOrMutex) 4 .runExclusive(() => { 5 // ... 6 }) 7 .then(() => { 8 // ... 9 }) 10 .catch(e => { 11 if (e === E_ALREADY_LOCKED) { 12 // ... 13 } 14 });
async/await:
1import {tryAcquire, E_ALREADY_LOCKED} from 'async-mutex'; 2 3try { 4 await tryAcquire(semaphoreOrMutex).runExclusive(() => { 5 // ... 6 }); 7} catch (e) { 8 if (e === E_ALREADY_LOCKED) { 9 // ... 10 } 11}
Again, the error can be customized by providing a custom error as second argument to
tryAcquire
.
1tryAcquire(semaphoreOrMutex, new Error('new fancy error')) 2 .runExclusive(() => { 3 // ... 4 });
Feel free to use this library under the conditions of the MIT license.
No vulnerabilities found.
Reason
no binaries found in the repo
Reason
no dangerous workflow patterns detected
Reason
license file detected
Details
Reason
6 existing vulnerabilities detected
Details
Reason
Found 3/8 approved changesets -- score normalized to 3
Reason
0 commit(s) and 0 issue activity found in the last 90 days -- score normalized to 0
Reason
dependency not pinned by hash detected -- score normalized to 0
Details
Reason
detected GitHub workflow tokens with excessive permissions
Details
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
security policy file not detected
Details
Reason
project is not fuzzed
Details
Reason
branch protection not enabled on development/release branches
Details
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
Score
Last Scanned on 2024-11-25
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More