Gathering detailed insights and metrics for zero-backpressure-weighted-promise-semaphore
Gathering detailed insights and metrics for zero-backpressure-weighted-promise-semaphore
A modern weighted promise semaphore for Node.js projects, ideal for managing workloads with varying processing requirements. It allows limiting the total weight of concurrently executing jobs, ensuring efficient resource utilization. Offering backpressure control for enhanced efficiency, utilizing a communicative API that signals availability.
npm install zero-backpressure-weighted-promise-semaphore
Typescript
Module System
Min. Node Version
Node Version
NPM Version
75.8
Supply Chain
99.4
Quality
88.5
Maintenance
100
Vulnerability
100
License
README enhancements and dev-dependencies upgrades
Published on 21 Dec 2024
Refine README to include the non-weighted variant reference and a metrics example
Published on 25 Nov 2024
README and Documentation improvements
Published on 07 Oct 2024
README and Documentation improvements
Published on 03 Oct 2024
Unit Test Refinements: Improved Readability and Terminology Consistency
Published on 14 Sept 2024
Split unit tests into two files and enhance README
Published on 12 Sept 2024
TypeScript (99.83%)
JavaScript (0.17%)
Total Downloads
1,396
Last Day
5
Last Week
14
Last Month
142
Last Year
1,396
27 Commits
1 Watching
1 Branches
1 Contributors
Latest Version
1.0.13
Package Id
zero-backpressure-weighted-promise-semaphore@1.0.13
Unpacked Size
231.11 kB
Size
43.83 kB
File Count
15
NPM Version
10.9.2
Node Version
20.13.1
Publised On
21 Dec 2024
Cumulative downloads
Total Downloads
Last day
150%
5
Compared to previous day
Last week
-84.8%
14
Compared to previous week
Last month
-4.7%
142
Compared to previous month
Last year
0%
1,396
Compared to previous year
5
The ZeroBackpressureWeightedSemaphore
class implements a modern Promise Semaphore for Node.js projects, allowing users to limit the concurrency of weighted jobs. It serves as the weighted counterpart to the zero-backpressure-semaphore-typescript package.
Each job is associated with a natural-number weight (1, 2, 3, ...). The semaphore guarantees that the total weight of concurrently executing jobs never exceeds a user-defined limit. The use of natural numbers for weights is mandated to prevent floating-point precision issues inherent in JavaScript.
This implementation does not queue pending jobs. Instead, the API promotes a just-in-time approach via communicative API that signals availability, thereby eliminating backpressure. As a result, users have better control over memory footprint, which enhances performance by reducing garbage-collector overhead.
Additionally, built-in mechanisms for error handling and graceful termination are provided, ensuring that all currently executing jobs complete before termination or post-processing.
The design addresses the two primary semaphore use cases in Node.js:
Each use case necessitates distinct handling capabilities, which will be discussed separately with accompanying examples.
waitForAllExecutingJobsToComplete
method.startExecution
are captured and can be accessed using the extractUncaughtErrors
method.amountOfCurrentlyExecutingJobs
and availableWeight
, providing insights into the semaphore's current state. These metrics can be used for periodic logging or to collect statistics from real-world usage.tsconfig
target is set to ES2020, ensuring compatibility with ES2020 environments.Traditional semaphore APIs require explicit acquire and release steps, adding overhead and responsibility for the user. Additionally, they introduce the risk of deadlocking the application if one forgets to release, for example, due to a thrown exception.
In contrast, ZeroBackpressureWeightedSemaphore
manages job execution, abstracting away these details and reducing user responsibility. The acquire and release steps are handled implicitly by the execution methods, reminiscent of the RAII idiom in C++.
Method names are chosen to clearly convey their functionality.
The ZeroBackpressureWeightedSemaphore
class provides the following methods:
X
, there is no reason to create another job Y
until X
has started. This method is particularly useful for background job workers that frequently retrieve job metadata from external sources, such as pulling messages from a message broker.startExecution
. The instance will no longer hold these error references once extracted. In other words, ownership of these uncaught errors shifts to the caller, while the semaphore clears its list of uncaught errors.If needed, refer to the code documentation for a more comprehensive description of each method.
The ZeroBackpressureWeightedSemaphore
class provides the following getter methods to reflect the current state:
startExecution
, that are currently stored by the instance. These errors have not yet been extracted using extractUncaughtErrors
.To eliminate any ambiguity, all getter methods have O(1) time and space complexity, meaning they do not iterate through all currently executing jobs with each call. The metrics are maintained by the jobs themselves.
This semaphore variant excels in eliminating backpressure when dispatching multiple concurrent jobs from the same caller. This pattern is typically observed in background job services, such as:
Here, the start time of each job is crucial. Since a pending job cannot start its execution until the semaphore allows, there is no benefit to adding additional jobs that cannot start immediately. The startExecution
method communicates the job's start time to the caller (resolves as soon as the job starts), which enables to create a new job as-soon-as it makes sense.
For example, consider an application responsible for training 1M Machine Learning models, on a shared GPU resource. Different models require different amounts of GPU memory and processing power. A weighted semaphore can manage the total GPU memory usage, allowing only certain combinations of models to train concurrently. Being specific, combinations which do not exceed the GPU capacity.
Instead of loading all models into memory and pre-creating 1M jobs (one for each model), which could potentially overwhelm the Node.js task queue and induce backpressure, the system should adopt a just-in-time approach. This means creating a model-training job only when the semaphore indicates availability, thereby optimizing resource utilization and maintaining system stability.
The following example demonstrates fetching models using an AsyncGenerator
. Async generators and iterators are widely adopted in modern APIs, providing efficient handling of potentially large data sets. For instance, the AWS-SDK utilizes them for pagination, abstracting away complexities like managing offsets. Similarly, MongoDB's cursor enables iteration over a large number of documents in a paginated and asynchronous manner. These abstractions elegantly handle pagination internally, sparing users the complexities of managing offsets and other low-level details. By awaiting the semaphore's availability, the space complexity is implicitly constrained to O(max(page-size, semaphore-capacity)), as the AsyncGenerator
fetches a new page only after all models from the current page have initiated training.
Note: method waitForAllExecutingJobsToComplete
can be used to perform post-processing, after all jobs have completed. It complements the typical use-cases of startExecution
.
1import { 2 ZeroBackpressureWeightedSemaphore, 3 SemaphoreJob 4} from 'zero-backpressure-weighted-promise-semaphore'; 5 6interface ModelInfo { 7 weight: number; // Must be a natural number: 1,2,3,... 8 // Additional model fields. 9}; 10 11const totalAllowedWeight = 180; 12const estimatedMaxNumberOfConcurrentJobs = 12; 13const trainingSemaphore = new ZeroBackpressureWeightedSemaphore<void>( 14 totalAllowedWeight, 15 estimatedMaxNumberOfConcurrentJobs // Optional argument; can reduce dynamic slot allocations for optimization purposes. 16); 17 18async function trainModels(models: AsyncGenerator<ModelInfo>) { 19 let fetchedModelsCounter = 0; 20 21 for (const model of models) { 22 ++fetchedModelsCounter; 23 24 // Until the semaphore can start training the current model, adding more 25 // jobs won't make sense as this would induce unnecessary backpressure. 26 await trainingSemaphore.startExecution( 27 (): Promise<void> => handleModelTraining(model), 28 model.weight 29 ); 30 } 31 // Note: at this stage, jobs might be still executing, as we did not wait for 32 // their completion. 33 34 // Graceful termination: await the completion of all currently executing jobs. 35 await trainingSemaphore.waitForAllExecutingJobsToComplete(); 36 console.info(`Finished training ${fetchedModelsCounter} ML models`); 37} 38 39async function handleModelTraining(model: Readonly<ModelInfo>): Promise<void> { 40 // Implementation goes here. 41}
If jobs might throw errors, you don't need to worry about these errors propagating to the event loop and potentially crashing the application. Uncaught errors from jobs triggered by startExecution
are captured by the semaphore and can be safely accessed for post-processing purposes (e.g., metrics).
Refer to the following adaptation of the above example, now utilizing the semaphore's error handling capabilities:
1import { 2 ZeroBackpressureWeightedSemaphore, 3 SemaphoreJob 4} from 'zero-backpressure-weighted-promise-semaphore'; 5 6interface ModelInfo { 7 weight: number; // Must be a natural number: 1,2,3,... 8 // Additional model fields. 9}; 10 11interface CustomModelError extends Error { 12 model: ModelInfo; // In this manner, later you can associate an error with its model. 13 // Alternatively, a custom error may contain just a few fields of interest. 14} 15 16const totalAllowedWeight = 180; 17const estimatedMaxNumberOfConcurrentJobs = 12; 18const trainingSemaphore = 19 // Notice the 2nd generic parameter (Error by default). 20 new ZeroBackpressureWeightedSemaphore<void, CustomModelError>( 21 totalAllowedWeight, 22 estimatedMaxNumberOfConcurrentJobs // Optional argument; can reduce dynamic slot allocations for optimization purposes. 23 ); 24 25async function trainModels(models: AsyncGenerator<ModelInfo>) { 26 let fetchedModelsCounter = 0; 27 28 for (const model of models) { 29 ++fetchedModelsCounter; 30 31 // Until the semaphore can start training the current model, adding more 32 // jobs won't make sense as this would induce unnecessary backpressure. 33 await trainingSemaphore.startExecution( 34 (): Promise<void> => handleModelTraining(model), 35 model.weight 36 ); 37 } 38 // Note: at this stage, jobs might be still executing, as we did not wait for 39 // their completion. 40 41 // Graceful termination: await the completion of all currently executing jobs. 42 await trainingSemaphore.waitForAllExecutingJobsToComplete(); 43 44 // Post processing. 45 const errors = trainingSemaphore.extractUncaughtErrors(); 46 if (errors.length > 0) { 47 await updateFailedTrainingMetrics(errors); 48 } 49 50 // Summary: 51 // The API's support for graceful termination is particularly valuable for handling 52 // post-processing or clean-up tasks after the main operations are complete. 53 const successfulJobsCount = fetchedModelsCounter - errors.length; 54 logger.info( 55 `Successfully trained ${successfulJobsCount} models, ` + 56 `with failures in training ${errors.length} models` 57 ); 58} 59 60async function handleModelTraining(model: Readonly<ModelInfo>): Promise<void> { 61 // Implementation goes here. 62}
The waitForCompletion
method is useful for executing a sub-procedure, for which the caller must wait before proceeding with its work.
For example, consider fetching data from an external resource within a route handler. The route handler must respond (e.g., with an HTTP status 200 on success) based on the result of the fetching sub-procedure. Note that a sub-procedure may return a value or throw an error. If an error is thrown, waitForCompletion
will propagate the error back to the caller.
The concurrency limit for such operations is typically set based on external constraints (e.g., reducing the chances of being throttled) or the desire to limit network resource usage.
Regarding weights, users may choose to assign heavier weights to paginated or aggregated database operations, while assigning smaller weights to simpler operations that involve fetching a single document or record. In this way, the semaphore not only limits concurrency but also helps manage overall database throughput, maintaining responsiveness by preventing overload.
1import { 2 ZeroBackpressureWeightedSemaphore, 3 SemaphoreJob 4} from 'zero-backpressure-weighted-promise-semaphore'; 5 6type UserInfo = Record<string, string>; 7 8// Note that if the total allowed weight is N, the maximum concurrency is also N, 9// since the minimum valid weight is 1 unit (weights must be natural numbers). 10const totalAllowedWeight = 84; 11const dbAccessSemaphore = 12 new ZeroBackpressureWeightedSemaphore<void>(totalAllowedWeight); 13 14const GET_USER_REQUEST_WEIGHT = 1; // Simple DB query, fetching just one user info. 15 16app.get('/user/', async (req, res) => { 17 // Define the sub-prodecure. 18 const fetchUserInfo: SemaphoreJob<UserInfo> = async (): Promise<UserInfo> => { 19 const userInfo: UserInfo = await usersDbClient.get(req.userID); 20 return userInfo; 21 } 22 23 // Execute the sub-procedure in a controlled manner. 24 try { 25 const userInfo: UserInfo = await dbAccessSemaphore.waitForCompletion( 26 fetchUserInfo, 27 GET_USER_REQUEST_WEIGHT 28 ); 29 res.status(HTTP_OK_CODE).send(userInfo); 30 } catch (err) { 31 // Error was thrown by the fetchUserInfo job. 32 logger.error(`Failed fetching user info for userID ${req.userID} with error: ${err.message}`); 33 res.status(HTTP_ERROR_CODE); 34 } 35});
The waitForAllExecutingJobsToComplete
method is essential for scenarios where it is necessary to wait for all ongoing jobs to finish, such as logging a success message or executing subsequent logic. Without this built-in capability, developers would have to implement periodic polling of the semaphore or other indicators to monitor progress, which can increase both implementation complexity and resource usage.
A key use case for this method is ensuring stable unit tests. Each test should start with a clean state, independent of others, to avoid interference. This prevents scenarios where a job from Test A inadvertently continues to execute during Test B.
If your component has a termination method (stop
, terminate
, or similar), keep that in mind.
Background jobs triggered by startExecution
may throw errors. Unlike the waitForCompletion
case, the caller has no reference to the corresponding job promise which executes in the background.
Therefore, errors from background jobs are captured by the semaphore and can be extracted using the extractUncaughtErrors
method. Optionally, you can specify a custom UncaughtErrorType
as the second generic parameter of the ZeroBackpressureWeightedSemaphore
class. By default, the error type is Error
.
1const trafficAnalyzerSemaphore = 2 new ZeroBackpressureWeightedSemaphore<void, TrafficAnalyzerError>( 3 totalAllowedWeight 4 );
The number of accumulated uncaught errors can be obtained via the amountOfUncaughtErrors
getter method. This can be useful, for example, if the user wants to handle uncaught errors only after a certain threshold is reached.
Even if the user does not intend to perform error-handling with these uncaught errors, it is important to periodically call this method when using startExecution
to prevent the accumulation of errors in memory.
However, there are a few exceptional cases where the user can safely avoid extracting uncaught errors:
Mitigating backpressure is primarily associated with the startExecution
method, particularly in scenarios involving multiple jobs. However, the single-job use case may certainly inflict backpressure on the Node.js micro-tasks queue.
For instance, consider a situation where 1K concurrently executing route handlers are each awaiting the completion of their own waitForCompletion
execution, while the semaphore is unavailable. In such cases, all handlers will internally wait on the semaphore's _waitForSufficientWeight
private property, competing to acquire the semaphore once it becomes available.
The term "promise pool" is commonly used in the JavaScript community to describe promise semaphores.
However, this terminology can be misleading. The term "pool" typically implies the reuse of resources, as in "thread pools" or "connection pools," where a fixed set of resources is used and recycled. In contrast, a promise semaphore’s primary goal is to control concurrency by limiting the number of jobs executing concurrently, with each job represented by a distinct promise instance.
Using the term "promise pool" may cause confusion, as it suggests resource reuse rather than concurrency management.
To improve readability and maintainability, it is highly recommended to assign a use-case-specific name to your semaphore instances. This practice helps in clearly identifying the purpose of each semaphore in the codebase. Examples include:
No vulnerabilities found.
No security vulnerabilities found.