Gathering detailed insights and metrics for zero-overhead-keyed-promise-lock
Gathering detailed insights and metrics for zero-overhead-keyed-promise-lock
Gathering detailed insights and metrics for zero-overhead-keyed-promise-lock
Gathering detailed insights and metrics for zero-overhead-keyed-promise-lock
An efficient keyed Promise lock for Node.js projects, ensuring the mutually exclusive execution of tasks associated with the same key. Key features include active key metrics and the ability to gracefully await the completion of all currently executing or pending tasks, making it ideal for robust production applications requiring smooth teardown.
npm install zero-overhead-keyed-promise-lock
Typescript
Module System
Min. Node Version
Node Version
NPM Version
74.9
Supply Chain
98.8
Quality
86.1
Maintenance
100
Vulnerability
100
License
Update dependencies, add Prettier for formatting
Updated on Apr 19, 2025
Enable smart reuse via getCurrentExecution method to expose the active task's promise for a specific key
Updated on Apr 11, 2025
README Enhancements
Updated on Feb 28, 2025
Improve coding examples in README, add negative test for invalid key
Updated on Feb 27, 2025
README improvements
Updated on Feb 26, 2025
First Release
Updated on Feb 23, 2025
TypeScript (99.51%)
JavaScript (0.49%)
Total Downloads
2,244
Last Day
16
Last Week
189
Last Month
1,124
Last Year
2,244
Apache-2.0 License
6 Commits
1 Watchers
1 Branches
1 Contributors
Updated on Apr 19, 2025
Minified
Minified + Gzipped
Latest Version
1.1.1
Package Id
zero-overhead-keyed-promise-lock@1.1.1
Unpacked Size
105.63 kB
Size
23.98 kB
File Count
11
NPM Version
10.9.2
Node Version
20.13.1
Published on
Apr 19, 2025
Cumulative downloads
Total Downloads
Last Day
45.5%
16
Compared to previous day
Last Week
-55%
189
Compared to previous week
Last Month
81%
1,124
Compared to previous month
Last Year
0%
2,244
Compared to previous year
1
6
The ZeroOverheadKeyedLock
class implements a modern Promise-lock for Node.js projects, enabling users to ensure the mutually exclusive execution of tasks associated with the same key.
Effectively, a keyed lock functions as a temporary FIFO task queue per key. The key acts as an identifier for the queue, which exists only while tasks are pending or executing for that key.
A plausible use case is batch-processing Kafka messages from the same partition, where each message is linked to an entity-specific key (e.g., a User Account ID). By using a keyed lock, messages with the same key can be processed sequentially, while still leveraging Kafka’s client support for batch processing. This prevents race conditions when concurrent execution of same-key messages could lead to inconsistencies.
This package extends zero-overhead-promise-lock by adding support for keyed locking. If your use case involves only a few fixed keys for tasks known at compile time (e.g., bulk writes to a database), using multiple instances of the non-keyed lock may be a viable alternative.
waitForAllExistingTasksToComplete
method. Example use cases include application shutdowns (e.g., onModuleDestroy
in Nest.js applications) or maintaining a clear state between unit-tests. A particularly interesting use case is in batch processing handlers, where it allows you to signal the completion of all event handlers associated with a batch.isActiveKey
getter allows skipping or aborting operations if a lock is already held.getCurrentExecution
method, which exposes the currently executing task's promise for a specific key, if one is active.activeKeys
getter provides real-time insight into currently active keys - i.e., keys associated with ongoing tasks.tsconfig
target is set to ES2020.The ZeroOverheadKeyedLock
class provides the following methods:
onModuleDestroy
in Nest.js applications) or maintaining a clear state between unit tests.If needed, refer to the code documentation for a more comprehensive description of each method.
The ZeroOverheadKeyedLock
class provides the following getter methods to reflect the current lock's state:
In Kafka consumer configurations, the most common approach is to consume messages sequentially from the same partition while potentially consuming from multiple partitions concurrently. For example, if a Kafka consumer is consuming from 5 partitions concurrently, at most 5 message event handlers will be in progress at the same time.
The primary reason for sequential processing within the same partition is that some messages should not be processed concurrently - such as actions on the same user account (e.g., buy/sell stocks). By assigning a key property to each Kafka message, messages with the same key will be guaranteed to reside in the same partition. Since there is a one-to-one mapping between partitions and consumers, all messages with the same key will be processed by the same consumer, eliminating the risk of concurrently processing same-key messages by multiple consumers.
To increase concurrency, one option is to increase the number of partitions and consume from them concurrently. However, batch processing in the context of a Kafka consumer offers distinct advantages, depending on the use case.
By grouping messages into batches, you reduce the overhead of consuming and processing each message individually—particularly when processing involves complex logic or external resource calls (e.g., batch operations in MongoDB). This can significantly improve throughput. Instead of making individual requests to external services or databases for each message, you can batch several messages together, making fewer, larger requests and thus reducing overall network latency and resource contention.
However, batch processing does not guarantee the sequential processing of same-key messages. To address this, a keyed lock can be employed, effectively introducing a temporary FIFO task queue per key.
1import { ZeroOverheadKeyedLock } from 'zero-overhead-keyed-promise-lock'; 2import { Kafka, Consumer, EachBatchPayload } from 'kafkajs'; 3import { IStockOrder } from './stock-order.interfaces'; 4 5export class StockOrdersConsumer { 6 // Initialization methods are omitted for brevity in this example. 7 8 public async startConsuming(): Promise<void> { 9 // ... 10 await this._consumer.run({ 11 eachBatch: this._handleBatch.bind(this) 12 }); 13 } 14 15 private async _handleBatch(payload: EachBatchPayload): Promise<void> { 16 const orderMessages = payload.batch.messages; 17 18 // Orders from the same user must be processed sequentially. 19 // We assume the Producer actor assigns a unique user account ID as the key. 20 const userLock = new ZeroOverheadKeyedLock<void>(); 21 22 // Register all batch orders simultaneously, then wait for their completion. 23 for (const { value, key } of orderMessages) { 24 const order: IStockOrder = JSON.parse(value.toString()); 25 const userID = key.toString(); 26 // The `executeExclusive` method returns a Promise, but we don't await 27 // it here, as the individual task completion is not relevant. 28 userLock.executeExclusive(userID, () => this._processOrder(order)); 29 } 30 31 // Graceful teardown is not only important during application shutdowns; 32 // in this case, it is used to signal the Kafka client that all batch 33 // messages have been processed. 34 await userLock.waitForAllExistingTasksToComplete(); 35 } 36 37 private async _processOrder(order: IStockOrder): Promise<void> { 38 // Stock order handling goes here. 39 } 40}
Unfortunately, at least in the JavaScript ecosystem, we cannot control the batch size by message count. Instead, Kafka clients allow setting only a maximum cumulative message size in bytes. As a result, in real-world scenarios, it is often necessary to use a Promise Semaphore to limit the number of batch messages processed concurrently.
The following example demonstrates how to extend the previous implementation to impose a concurrency limit on processed orders per batch. This is achieved using the zero-backpressure-semaphore-typescript package, which is maintained by the same author as this package. :blue_heart:
1import { ZeroOverheadKeyedLock } from 'zero-overhead-keyed-promise-lock'; 2import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript'; 3import { Kafka, Consumer, EachBatchPayload } from 'kafkajs'; 4import { IStockOrder } from './stock-order.interfaces'; 5 6const MAX_BATCH_CONCURRENCY = 32; 7 8export class StockOrdersConsumer { 9 // Initialization methods are omitted for brevity in this example. 10 11 public async startConsuming(): Promise<void> { 12 // ... 13 await this._consumer.run({ 14 eachBatch: this._handleBatch.bind(this) 15 }); 16 } 17 18 private async _handleBatch(payload: EachBatchPayload): Promise<void> { 19 const orderMessages = payload.batch.messages; 20 21 const semaphore = new ZeroBackpressureSemaphore<void>(MAX_BATCH_CONCURRENCY); 22 const userLock = new ZeroOverheadKeyedLock<void>(); 23 24 for (const { value, key } of orderMessages) { 25 const order: IStockOrder = JSON.parse(value.toString()); 26 const userID = key.toString(); 27 const alreadyActive = userLock.isActiveKey(userID); 28 const executeExclusive = () => userLock.executeExclusive( 29 userID, 30 () => this._processOrder(order) 31 ); 32 if (alreadyActive) { 33 // No need to await; this key already occupies capacity in the semaphore. 34 // In other words, the semaphore is currently processing a previous order 35 // belonging to the current userID. 36 executeExclusive(); 37 continue; 38 } 39 40 await semaphore.startExecution(executeExclusive); 41 } 42 43 // Graceful teardown is not only important during application shutdowns; 44 // in this case, it is used to signal the Kafka client that all batch 45 // messages have been processed. 46 await userLock.waitForAllExistingTasksToComplete(); 47 } 48 49 private async _processOrder(order: IStockOrder): Promise<void> { 50 // Stock order handling goes here. 51 } 52}
It is crucial to avoid resolving a batch handler too early. Once the batch handler resolves, the Kafka client updates the processed offsets. If a message handler resolves before processing is fully completed, and the container crashes before all batch messages are handled, Kafka may not reassign those messages to another consumer, leading to potential data loss.
Note that the examples above implement a graceful teardown only for the batch handling method, not for the StockOrdersConsumer
component itself. Implementing a full teardown for StockOrdersConsumer
can be challenging, as locks and/or semaphores are created and disposed per batch.
A feasible approach is to introduce a private _activeLocks
property to track currently active locks. Then, a dedicated teardown method (e.g., onDestroy
, onShutdown
, terminate
) can invoke the locks' graceful teardown mechanism, ensuring a deterministic shutdown.
Consider the following adaptation, which abstracts away irrelevant details:
1import { ZeroOverheadKeyedLock } from 'zero-overhead-keyed-promise-lock'; 2import { ZeroBackpressureSemaphore } from 'zero-backpressure-semaphore-typescript'; 3import { Kafka, Consumer, EachBatchPayload } from 'kafkajs'; 4import { IStockOrder } from './stock-order.interfaces'; 5 6export class StockOrdersConsumer { 7 private readonly _activeLocks = new Set<ZeroOverheadKeyedLock<void>>(); 8 9 // Initialization methods are omitted for brevity in this example. 10 11 public async startConsuming(): Promise<void> { 12 // Implementation goes here. 13 } 14 15 public async onDestroy(): Promise<void> { 16 await this._consumer.disconnect(); 17 18 // Message event handlers may still be executing. 19 while (this._activeLocks.size > 0) { 20 const locks = Array.from(this._activeLocks.keys()); 21 const waitForCompletionPromises = locks.map(lock => lock.waitForAllExistingTasksToComplete()); 22 await Promise.all(waitForCompletionPromises); 23 } 24 } 25 26 private async _handleBatch(payload: EachBatchPayload): Promise<void> { 27 const userLock = new ZeroOverheadKeyedLock<void>(); 28 this._activeLocks.add(userLock); 29 30 // Implementation goes here. 31 32 await userLock.waitForAllExistingTasksToComplete(); 33 this._activeLocks.delete(userLock); 34 } 35}
Note that a single Kafka consumer may be subscribed to multiple partitions and might perform batch processing for each. Refer to the partitionsConsumedConcurrently
attribute in the 'kafkajs' package for more details.
Unlike single-threaded C code, the event-loop architecture used in modern JavaScript runtime environments introduces the possibility of race conditions, especially for asynchronous tasks that span multiple event-loop iterations.
In Node.js, synchronous code blocks - those that do not contain an await
keyword - are guaranteed to execute within a single event-loop iteration. These blocks inherently do not require synchronization, as their execution is mutually exclusive by definition and cannot overlap.
In contrast, asynchronous tasks that include at least one await
, necessarily span across multiple event-loop iterations. Such tasks may require synchronization to prevent overlapping executions that could lead to race conditions, resulting in inconsistent or invalid states. Such races occur when event-loop iterations from task A interleave with those from task B, each unaware of the other and potentially acting on an intermediate state.
Additionally, locks are sometimes employed purely for performance optimization, such as throttling, rather than for preventing race conditions. In such cases, the lock effectively functions as a semaphore with a concurrency of 1.
For example, limiting concurrent access to a shared resource may be necessary to reduce contention or meet operational constraints. In such cases, locks are employed as a semaphore with a concurrency limit of 1, ensuring that no more than one operation is executed at a time.
Traditional lock APIs require explicit acquire and release steps, adding overhead and responsibility for the user. Additionally, they introduce the risk of deadlocking the application if one forgets to release, for example, due to a thrown exception.
In contrast, ZeroOverheadKeyedLock
manages task execution, abstracting away these details and reducing user responsibility. The acquire and release steps are handled implicitly by the executeExclusive
method, reminiscent of the RAII idiom in C++.
A common example of using locks is the READ-AND-UPDATE scenario, where concurrent reads of the same value can lead to erroneous updates. While such examples are intuitive, they are often less relevant in modern applications due to advancements in databases and external storage solutions. Modern databases, as well as caches like Redis, provide native support for atomic operations. Always prioritize leveraging atomicity in external resources before resorting to in-memory locks.
Consider the following function that increments the number of product views for the last hour in a MongoDB collection. Using two separate operations, this implementation introduces a race condition:
1async function updateViews(products: Collection<IProductSchema>, productID: string): Promise<void> {
2 const product = await products.findOne({ _id: productID }); // Step 1: Read
3 if (!product) return;
4
5 const currentViews = product?.hourlyViews ?? 0;
6 await products.updateOne(
7 { _id: productID },
8 { $set: { hourlyViews: currentViews + 1 } } // Step 2: Update
9 );
10}
The race condition occurs when two or more processes or concurrent tasks (Promises within the same process) execute this function simultaneously, potentially leading to incorrect counter values. This can be mitigated by using MongoDB's atomic $inc
operator, as shown below:
1async function updateViews(products: Collection<IProductSchema>, productID: string): Promise<void> {
2 await products.updateOne(
3 { _id: productID },
4 { $inc: { hourlyViews: 1 } } // Atomic increment
5 );
6}
By combining the read and update into a single atomic operation, the code avoids the need for locks and improves both reliability and performance.
No vulnerabilities found.
No security vulnerabilities found.