Gathering detailed insights and metrics for batched-items-accumulator
Gathering detailed insights and metrics for batched-items-accumulator
Gathering detailed insights and metrics for batched-items-accumulator
Gathering detailed insights and metrics for batched-items-accumulator
A lightweight utility for Node.js projects that accumulates items into fixed-size batches (number-of-items wise), preserving insertion order. It abstracts batch management, allowing users to focus on application logic. Ideal for delayed processing tasks such as bulk write/publish operations to kafka, databases, blob storage, etc.
npm install batched-items-accumulator
Typescript
Module System
Min. Node Version
Node Version
NPM Version
73.3
Supply Chain
99.4
Quality
80.9
Maintenance
100
Vulnerability
100
License
TypeScript (98.82%)
JavaScript (1.18%)
Total Downloads
1,198
Last Day
16
Last Week
176
Last Month
867
Last Year
1,198
Apache-2.0 License
2 Commits
1 Watchers
1 Branches
1 Contributors
Updated on Apr 01, 2025
Minified
Minified + Gzipped
Latest Version
1.0.1
Package Id
batched-items-accumulator@1.0.1
Unpacked Size
53.56 kB
Size
12.65 kB
File Count
11
NPM Version
10.9.2
Node Version
20.13.1
Published on
Apr 01, 2025
Cumulative downloads
Total Downloads
Last Day
33.3%
16
Compared to previous day
Last Week
-39.7%
176
Compared to previous week
Last Month
161.9%
867
Compared to previous month
Last Year
0%
1,198
Compared to previous year
6
The BatchedAccumulator
class is a lightweight utility for Node.js projects that accumulates items into fixed-size batches (number-of-items wise), preserving insertion order. It streams items directly into batches during runtime:ocean:, avoiding the overhead of post-processing a 1D array. This abstraction lets users focus on application logic without worrying about batch management.
Ideal for delayed processing tasks such as bulk write operations to databases, blob storage, and batched publishing of Kafka messages. Delayed execution helps optimize network efficiency by reducing the number of requests while increasing their size.
Given a BatchedAccumulator
instance with a batch size of 4, inserting the items 1, 2, 3, 4, 5, 6, 7
results in the following batches:
extractAccumulatedBatches
method operates in O(1) time and space, as items are stored in batches from the start.accumulateItem
method appends the item to the most recent batch if it has not yet reached the size threshold. Otherwise, it creates a new batch. Each batch contains the same number of items, except for the last batch, which may have fewer items.batchesCount
, isEmpty
and accumulatedItemsCount
getters offer real-time insights into the accumulator's state, helping users make informed decisions, such as determining whether a minimum threshold of accumulated items has been reached before extracting batches.The BatchedAccumulator
class provides the following methods:
If needed, refer to the code documentation for a more comprehensive description of each method.
The BatchedAccumulator
class provides the following getter methods to reflect the current state:
In many applications, MongoDB documents originate from sources such as message queues or user interactions. Instead of upserting each document individually - potentially causing excessive network load - it is common to accumulate them in memory before performing a periodic batch flush to the database.
To account for real-life complexities, this example triggers a batch flush when either of the following conditions is met:
This example leverages the non-overlapping-recurring-task package to ensure that multiple batches are not upserted concurrently, helping to keep network bandwidth usage under control.
1import { BatchedAccumulator } from 'batched-items-accumulator'; 2import { 3 NonOverlappingRecurringTask, 4 INonOverlappingRecurringTaskOptions 5} from 'non-overlapping-recurring-task'; 6import { Collection, MongoError } from 'mongodb'; 7 8const FLUSH_INTERVAL_MS = 5000; 9const BATCH_SIZE = 512; 10const MIN_BATCH_SIZE_BEFORE_FLUSH = 64; 11const MAX_FLUSH_INTERVAL_MS = 60 * 1000; 12 13class PeriodicDocumentFlusher<DocumentType> { 14 private readonly _documentsAccumulator = new BatchedAccumulator<DocumentType>(BATCH_SIZE); 15 private readonly _recurringFlush: NonOverlappingRecurringTask<MongoError>; 16 17 private _lastFlushTimestamp: number = 0; 18 19 /** 20 * Injects a collection and a logger instance. 21 * Context-aware child loggers are commonly used, 22 * especially in Nest.js apps (e.g., pino-http). 23 */ 24 constructor( 25 private readonly _collection: Collection<DocumentType>, 26 private readonly _logger: ILogger 27 ) { 28 const recurringFlushOptions: INonOverlappingRecurringTaskOptions = { 29 intervalMs: FLUSH_INTERVAL_MS, 30 immediateFirstRun: false 31 }; 32 const forceFlush = false; 33 this._recurringFlush = new NonOverlappingRecurringTask<MongoError>( 34 () => this._flushAccumulatedBatches(forceFlush), 35 recurringFlushOptions, 36 this._onUpsertError.bind(this) 37 ); 38 } 39 40 public async start(): Promise<void> { 41 // Initialize with the current timestamp to mark the start of the process. 42 this._lastFlushTimestamp = Date.now(); 43 await this._recurringFlush.start(); 44 } 45 46 public async stop(): Promise<void> { 47 await this._recurringFlush.stop(); 48 const forceFlush = true; 49 await this._flushAccumulatedBatches(forceFlush); 50 } 51 52 public add(doc: DocumentType): void { 53 // Accumulate documents in memory for batch processing. 54 this._documentsAccumulator.accumulateItem(doc); 55 } 56 57 private async _bulkUpsert(batch: DocumentType[]): Promise<void> { 58 // Implementation: Upsert a batch of accumulated documents into MongoDB. 59 } 60 61 private get _elapsedTimeSinceLastFlushMs(): number { 62 return Date.now() - this._lastFlushTimestamp; 63 } 64 65 /** 66 * Extracts accumulated document batches and upserts them sequentially. 67 * A production-ready implementation may include per-batch error handling 68 * or retries mechanism. 69 */ 70 private async _flushAccumulatedBatches(forceFlush: boolean): Promise<void> { 71 const shouldSkip = 72 !forceFlush && 73 this._documentsAccumulator.accumulatedItemsCount < MIN_BATCH_SIZE_BEFORE_FLUSH && 74 this._elapsedTimeSinceLastFlushMs < MAX_FLUSH_INTERVAL_MS; 75 if (shouldSkip || this._documentsAccumulator.isEmpty) { 76 return; 77 } 78 79 this._lastFlushTimestamp = Date.now(); 80 const batches: DocumentType[][] = this._documentsAccumulator.extractAccumulatedBatches(); 81 for (const batch of batches) { 82 await this._bulkUpsert(batch); 83 } 84 } 85 86 private _onUpsertError(error: MongoError): void { 87 this._logger.error( 88 `Batch upload failed due to MongoDB error code ${error?.code}: ${error.message}` 89 ); 90 } 91}
To maintain integrity, the class does not provide direct access to accumulated items or batches. Exposing internal references could allow unintended modifications, such as appending items to a full batch. Instead, the extractAccumulatedBatches
method transfers ownership of all batches to the caller while resetting the instance to a clean state. This ensures the component's guarantees remain intact and prevents accidental modifications of extracted batches.
However, while direct peeking is not possible, users can utilize the getter methods batchesCount
, isEmpty
, and accumulatedItemsCount
to assess whether extraction is needed.
No vulnerabilities found.
No security vulnerabilities found.