Gathering detailed insights and metrics for asyncerator
Gathering detailed insights and metrics for asyncerator
Gathering detailed insights and metrics for asyncerator
Gathering detailed insights and metrics for asyncerator
Provide supporting types for AsyncIterable/AsyncIterableIterators, typed & promisified stream.pipeline implementation, and Array-like utility operators, sources and sinks.
npm install asyncerator
Typescript
Module System
Min. Node Version
Node Version
NPM Version
77.2
Supply Chain
100
Quality
79.3
Maintenance
100
Vulnerability
100
License
TypeScript (99.95%)
JavaScript (0.05%)
Total Downloads
107,855
Last Day
143
Last Week
1,171
Last Month
5,178
Last Year
53,006
MIT License
6 Stars
56 Commits
10 Watchers
2 Branches
16 Contributors
Updated on Feb 09, 2025
Latest Version
4.0.2
Package Id
asyncerator@4.0.2
Unpacked Size
106.96 kB
Size
22.64 kB
File Count
76
NPM Version
10.9.0
Node Version
20.18.0
Published on
Oct 24, 2024
Cumulative downloads
Total Downloads
Last Day
2.9%
143
Compared to previous day
Last Week
-11.3%
1,171
Compared to previous week
Last Month
-4.8%
5,178
Compared to previous month
Last Year
26.1%
53,006
Compared to previous year
Copyright (c) 2021–2024 Check Digit, LLC
The asyncerator
module provides three central capabilities:
Asyncerator<T>
interface.stream.pipeline
function.pipeline
.npm install asyncerator
Asyncerator<T>
interfaceexport interface Asyncerator<T> {
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
}
The Asyncerator
interface defined by this module is the minimum common for-await
compatible interface that both
NodeJS.ReadableStream and AsyncIterableIterator implement. It's a useful construct to be used with the Node
14+ stream.pipeline
function, since it allows AsyncIterableIterators
and Node stream-based objects to be combined in
various convenient ways.
The following all implement the Asyncerator
interface:
AsyncIterableIterator
AsyncGenerator
(aka async generator functions)NodeJS.ReadableStream
(internal Node implementations include stream.Readable
, readline
, fs.createReadStream
, etc)for await...of
statement will accept an AsynceratorIt's not a custom interface, it's simply the de facto interface used by Node and elsewhere. It's just not defined anywhere.
Specifically:
Asyncerator
is similar to AsyncIterableIterator
, but does not extend AsyncIterator
.AsyncIterable
, but [Symbol.asyncIterator]()
returns an AsyncIterableIterator
instead of an AsyncIterator
.Asyncerator
.asyncerator.pipeline
asyncerator.pipeline
is a strongly typed, promisified version of Node's
stream.pipeline
function.
Its typing is complicated, but the basic form of the function is:
pipeline(
source, // string | Readable | Iterable | AsyncIterable | Asyncerator
...transforms, // zero or more Transform | ((input: Asyncerator) => Asyncerator)
sink // Writable | Transform | ((input: Asyncerator) => Promise<Sink>) | ((input: Asyncerator) => AsyncIterable),
options?: PipelineOptions // only supported in Node 16 when returning Promise<Sink>, equivalent to Node 16 implementation
): Readable | Promise<Sink> | Promise<void>; {
The main advantage of the typings is that at compile-time, the outputs of each source or transformation must match the input of the later transformation or sink. This makes working with complex pipelines much less error-prone.
In addition, the standard Node typings (@types/node
) for stream.pipeline
are woefully incorrect (as of 14.14.32)
to the point of being unusable.
Functionally, the Asyncerator pipeline
function differs from the built-in stream.pipeline
in that it returns a promise, or a
stream, depending on the type of the sink (last argument):
Promise<void>
.Promise<T>
, return that promise.Transform
, return a Readable
stream.Despite these differences, under the hood stream.pipeline
is still used to perform the actual work.
1import { filter, map, pipeline, split, toArray } from 'asyncerator'; 2import fs from 'fs'; 3import zlib from 'zlib'; 4 5// ... 6 7// write a Gzipped file containing "hello" and "world" on two separate lines 8await pipeline( 9 ['hello', 'world'], 10 map((string) => `${string}\n`), 11 zlib.createGzip(), 12 fs.createWriteStream(temporaryFile), 13); 14 15// read file using a pipeline 16const result = await pipeline( 17 fs.createReadStream(temporaryFile), 18 zlib.createUnzip(), 19 split('\n'), 20 filter((string) => string !== ''), 21 toArray, 22); // ['hello', 'world'] 23 24// read file without pipeline (painful, don't do this!) 25const result2 = await toArray( 26 filter((string) => string !== '')(split('\n')(fs.createReadStream(temporaryFile).pipe(zlib.createUnzip()))), 27); // ['hello', 'world']
Sources return an object of type Asyncerator<T>
that can be passed to pipeline
as the first argument. Other than
these built-in functions, the other objects that are considered a "source" are string
, arrays or basically anything
that implements the Readable
, Iterable
and AsyncIterable
interfaces.
Some built-in source functions take an argument of type Asyncable<T>
. Asyncables are anything that can be
turned into an Asyncerator: normal iterators and iterables, AsyncIterators, AsyncIterables, AsyncGenerators,
AsyncIterableIterators, and of course Asyncerators themselves.
all<T>(promises: Iterable<Promise<T>>): Asyncerator<T>
Similar to Promise.all()
, but instead returns values as they become available via an Asyncerator.
Note: the output order is not the same as the input order, the fastest promise to resolve
will be first, the slowest last.
merge<T>(...iterators: Asyncable<T | Asyncable<T>>[]): Asyncerator<T>
Merge multiple asyncables into a single Asyncerator. If an iterator yields another Asyncerator, merge its output into the stream.
series<T>(...iterators: Asyncable<T>[]): Asyncerator<T>
Combine the output of iterators in a series. Requires all the iterators to complete.
These built-in convenience sink functions all return a Promise
that will wait until the Asyncerator
is done before
resolving, so be careful using this with endless iterators (in other words, don't do that). They can be used as the
last argument to pipeline
, which causes it to return a Promise
, along with:
Writable
(pipeline returns a Promise<void>
)(input: Asyncerator<Source>) => Promise<Sink>
(pipeline returns a Promise<Sink>
)Duplex
(pipeline returns a Readable
stream)((input: Asyncerator<Source>) => AsyncIterable<Sink>)
(pipeline returns a Readable
stream)reduce<Input, Output>(reduceFunction: (previousValue: Output, currentValue: Input, currentIndex: number) => Output, initialValue?: Input | Output): Promise<Input | Output | undefined>
Calls the specified callback function for all the elements in a stream. The return value of the callback function is the accumulated result, and is provided as an argument in the next call to the callback function.
Equivalent to the Javascript Array.reduce()
method.
toArray<T>(iterator: Asyncable<T>): Promise<T[]>
Turns a completed Asyncerator
into an Array.
toNull<T>(iterator: Asyncable<T>): Promise<void>
Drop the results of an Asyncerator
into /dev/null
.
toString<T>(iterator: Asyncable<T>): Promise<string>
Turns a completed Asyncerator
into a string
.
Operators are transformations that can be included anywhere in a pipeline
except as a source. They take a stream of
inputs, and generate a stream of outputs. An operator function is equivalent to an implementation of the
Node Duplex
or Transform
interfaces, and can be used interchangeably within a pipeline
.
The built-in operators all return something of type Operator<Input, Output>
which is
a function with the signature (input: Asyncerator<Input>) => Asyncerator<Output>
.
Async generator functions that take a single Asyncerator parameter are compatible with this signature. Operators should generally be implemented using the following pattern:
1function map<Input, Output>(mapFunction: (value: Input) => Output): Operator<Input, Output> { 2 return async function* (iterator: Asyncerator<Input>) { 3 for await (const item of iterator) { 4 yield mapFunction(item); 5 } 6 }; 7}
It is straightforward to create custom operators and mix with streams, but it is important to note how they relate to streams:
end
event of a stream. The pipeline will complete.throw
-ing an error is equivalent to the error
event of a stream. The pipeline will throw the error.undefined
values emitted by sources and prior operators.null
values emitted by sources and prior operators.after<Input>(value: Input): Operator<Input, Input>
Emit a value after stream completes. Useful for adding a footer to a stream.
before<Input>(value: Input): Operator<Input, Input>
Emit a value before a stream starts. Useful for adding a header to a stream.
filter<Input>(filterFunction: (value: Input, index: number) => boolean): Operator<Input, Input>
Similar to Array.filter
, only emit values from input for which filterFunction
returns true
.
flat<Input>(depth = 1): Operator<Input, Input extends (infer T)[] ? T : Input>
Similar to Array.flat
, flatten array inputs into a single sequence of values.
forEach<Input>(forEachFunction: (value: Input, index: number) => void): Operator<Input, Input>
Similar to Array.forEach, call forEachFunction for each value in the stream.
map<Input, Output>(mapFunction: (value: Input, index: number) => Output): Operator<Input, Output>
Similar to Array.map
, transform each value using mapFunction.
race<Input, Output>(raceFunction: (value: Input) => Promise<Output>, concurrent?: number): Operator<Input, Output>
Apply stream of values to the raceFunction
, emitting output values in order of completion. By default, allows
up to 128 concurrent
values to be processed.
sequence<Input>(sequenceFunction: (index: number) => Promise<Input>): Operator<Input, Input>
The sequenceFunction
will be called repeatedly with an incrementing numerical parameter, returning a Promise
that resolves with the same type as Input and is inserted into the stream. The sequence operator
passes through all other values. Because the sequenceFunction
returns a Promise, it
can delay its response (using setTimeout) to emit values on a regular schedule, e.g., once a second:
pipeline(
...
sequence(async () => {
await new Promise((resolve) => {
setTimeout(resolve, 1000);
});
return 'this value is emitted once a second!';
})
...
);
skip<Input>(numberToSkip = 1): Operator<Input, Input>
Skip numberToSkip values at the start of a stream.
split<Input extends { toString: () => string }>(separator: string, limit?: number): Operator<Input, string>
Equivalent of the Javascript Array.split
method.
Read a CSV file, output a new CSV file based on some logic:
1import { filter, map, pipeline, race, split } from 'asyncerator'; 2import fs from 'fs'; 3import timeout from '@checkdigit/timeout'; 4import retry from '@checkdigit/retry'; 5 6async function main() { 7 await pipeline( 8 fs.createReadStream('./input.csv'), 9 // turn buffers into string chunks 10 map((buffer: Buffer) => buffer.toString()), 11 // split chunks into lines 12 split('\n'), 13 // remove empty lines, and CSV header line 14 filter((string) => string !== '' && string !== '"header1","header2","header3","header4"'), 15 // transform string into an object 16 map((line: string) => ({ 17 field1: line.split(',')[0] as string, 18 field4: BigInt(line.split(',')[3] as string), 19 })), 20 // perform concurrent requests (up to 128 by default) - Note: DOES NOT PRESERVE ORDER 21 race( 22 retry((item) => 23 timeout( 24 (async ({ field1, field4 }) => ({ 25 calculated: await someAsyncNetworkAPIFunction(field1), 26 field1, 27 field4, // type is infered to be a BigInt, because Typescript is awesome 28 }))(item), 29 ), 30 ), 31 ), 32 // demonstrate use of an async generator to filter and transform objects into a string 33 // (FYI could be done more easily using map and filter) 34 async function* (iterator) { 35 for await (const item of iterator) { 36 if (doWeOutput(item)) { 37 yield `${item.field1},${item.field2},${item.calculated}\n`; 38 } 39 } 40 }, 41 fs.createWriteStream('./output.csv'), 42 ); 43}
Note this code, in addition to the built-in asyncerator functionality, also uses @checkdigit/timeout
and
@checkdigit/retry
to implement retries with timeouts and exponential back-off.
A pipeline
is an effective method for managing socket connections. On the server side, the inbound socket
can be both a source and a sink. On the client, a connection can be treated as a transform that takes an inbound
stream and provides the output stream (from the server) to the next stage of the pipeline:
1import net from 'net'; 2import { filter, pipeline, map, split, toArray } from 'asyncerator'; 3 4// ... 5 6// echo server 7const server = net 8 .createServer((socket) => 9 pipeline( 10 socket, 11 split('\n'), 12 map((command) => `echo:${command}\n`), 13 socket, 14 ), 15 ) 16 .listen(1234, '127.0.0.1'); 17 18// echo client 19const received = await pipeline( 20 'Hello Mr Server!\nRegards, Client.\n', 21 new net.Socket().connect(1234, '127.0.0.1'), 22 split('\n'), 23 filter((line) => line !== ''), 24 toArray, 25); // ['echo:Hello Mr Server!', 'echo:Regards, Client.']
MIT
No vulnerabilities found.
Reason
security policy file detected
Details
Reason
no dangerous workflow patterns detected
Reason
no binaries found in the repo
Reason
license file detected
Details
Reason
packaging workflow detected
Details
Reason
SAST tool detected but not run on all commits
Details
Reason
4 existing vulnerabilities detected
Details
Reason
Found 12/30 approved changesets -- score normalized to 4
Reason
dependency not pinned by hash detected -- score normalized to 3
Details
Reason
0 commit(s) and 0 issue activity found in the last 90 days -- score normalized to 0
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
detected GitHub workflow tokens with excessive permissions
Details
Reason
project is not fuzzed
Details
Score
Last Scanned on 2025-05-05
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More