Gathering detailed insights and metrics for async-toolbox
Gathering detailed insights and metrics for async-toolbox
npm install async-toolbox
Typescript
Module System
TypeScript (99.17%)
Shell (0.83%)
Love this project? Help keep it running — sponsor us today! 🚀
Total Downloads
164,553
Last Day
200
Last Week
552
Last Month
3,051
Last Year
22,710
1 Stars
216 Commits
2 Watchers
10 Branches
1 Contributors
Updated on Apr 01, 2022
Minified
Minified + Gzipped
Latest Version
0.9.0
Package Id
async-toolbox@0.9.0
Unpacked Size
382.44 kB
Size
84.56 kB
File Count
168
Published on
Jul 11, 2023
Cumulative downloads
Total Downloads
Last Day
86.9%
200
Compared to previous day
Last Week
-18.7%
552
Compared to previous week
Last Month
156.6%
3,051
Compared to previous month
Last Year
-28.6%
22,710
Compared to previous year
14
1
This package contains a number of utilities that have been useful for me in developing with async code in nodejs and the browser. It contains a number of tools which you can require as you desire.
import { wait, waitUntil, Semaphore, Limiter, ... } from 'async-toolbox'
OR
import wait from 'async-toolbox/wait'
import waitUntil from 'async-toolbox/waitUntil'
import Semaphore from 'async-toolbox/semaphore'
etc...
function wait(ms: number): Promise<void>
Returns a promise which resolves after a given number of milliseconds, using setTimeout.
class Semaphore extends EventEmitter
A Semaphore which queues up tasks to be executed once prior tasks are complete.
Number of concurrent inflight tasks is configurable at initialization.
class Limiter extends Semaphore
A rate-limiting semaphore which uses limiter
to enforce the rate limiting.
promisify
and callbackify
Accepts an action which either receives a callback or returns a promise,
and awaits the appropriate thing.
throttle
Wraps an asynchronous function to prevent it from being executed more than
once in any given period. Similar to lodash throttle but for async functions.
recurring
Wraps an asynchronous function in a setInterval, where the specified interval
is the time between when the async function completes and the start of it's next invocation.
Example:
setRecurring(myFunc, 500):
| -- execution takes 10ms -- | -- 500 ms -- | <-- total of 510 ms to next execution
setInterval(myFunc, 500):
| -- execution takes 10ms -- |
| ------------------------- 500 ms ---- | <-- total of 500ms to next execution
import { Pipeline, StreamProgress } from 'async-toolbox/pipeline'
The Pipeline
composes a list of streams into one Duplex stream. Writing to the Pipeline
writes to the first stream
in the list, and reading from the Pipeline
reads from the last stream in the list.
The StreamProgress
class prints pipeline information to stderr as the pipeline.
1import { Pipeline } from 'async-toolbox/pipeline' 2const pipeline = new Pipeline([ 3 new ParsesXmlToJson(), 4 JSONStream.parse(), 5 new TransformsJsonObjects(), 6 JSONStream.stringify(false) 7]) 8 9process.stdin 10 .pipe(pipeline) 11 .pipe(process.stdout)
1const pipeline = new Pipeline([
2 ShellPipe.spawn('yes abcdefgh'),
3 ShellPipe.spawn('rev'),
4 new SplitLines(),
5 new Transform({
6 transform(chunk: string, encoding, cb) {
7 const str = chunk.toString().slice(1, 4)
8 cb(undefined, str)
9 },
10 }),
11 new CombineLines(),
12 ShellPipe.spawn('head -n1000 > tmp.txt'),
13])
14await pipeline.run()
1class Extract extends Pipeline { 2 constructor() { 3 super([ 4 new DownloadsPaginatedXml(), 5 new XmlParser(), 6 ]) 7 } 8} 9 10class Transform extends Pipeline { 11 constructor() { 12 super([ 13 // parse the lines using JSONStream 14 Object.assign(parse(null), { name: 'parse' }), 15 new ConvertsXmlishData(), 16 new ConvertsHtmlToMarkdown(), 17 new ConvertsToContentfulEntries(), 18 new AddsLocaleToFields(), 19 // stringify the transformed lines separated by newlines 20 Object.assign(stringify(false), { name: 'stringify' }), 21 ]) 22 } 23} 24 25class Load extends Pipeline { 26 constructor() { 27 super([ 28 Object.assign(parse(null), { name: 'parse' }), 29 new UploadsEntries(), 30 new ProcessesAssets(), 31 publish && new PublishesEntries(), 32 Object.assign(stringify(false), { name: 'stringify' }), 33 ].filter((stream) => !!stream)) 34 } 35} 36 37const extract = new Extract(argv) 38const transform = new Transform(argv) 39const load = new Load(argv) 40 41const pipeline = new Pipeline([ 42 extract, 43 transform, 44 load, 45]) 46 47await pipeline.run( 48 undefined, 49 undefined, 50 { progress: true } 51)
import ... from 'async-toolbox/stream'
Augments the base Readable, Writable, and Duplex streams with new capabilities, and provides a couple extra
function toReadable(entries: any[]): Readable
function collect(stream: Readable): Promise<any[]>
Reads all the chunks of a readable stream and collects them in an array.writeAsync(stream: Writable, chunk: any, encoding?: string): Promise<void>
readAsync(stream: Readable, size?: number): Promise<any>
class ParallelWritable extends Writable
class ParallelTransform extends Transform
batch<T, U>(processor: (batch: T[]) => Promise<U[] | void>): Transform<T, U>
This constructor wraps a batch processing function in a TransformStream.
When the stream collects maxBatchSize
items (default 1000), it invokes the
batch function with the array of all 1000 items. The batch function should
return an array (or void), which is then written to the output of the
TransformStream.class SplitLines
and class CombineLines
Converts a string/buffer stream into an Object stream where each chunk is one line, and vice versa.
Really useful when reading from or writing to a ShellPipe
.class ShellPipe
Wraps child_process.spawn
in a Duplex stream. Writing to the stream writes bytes to stdin of the child process,
while reading from the stream reads from stdout of the child process. Note that certain programs like head
will
cause an error with code == 'ERR_STREAM_WRITE_AFTER_END'
once it closes its stdin. The Pipeline
class handles
this by destroying all previous streams.1await new Pipeline([
2 new DownloadsFromApi(),
3 ShellPipe.spawn('jq ".entries[].id"'),
4 new SplitLines(),
5 new Transform({
6 transform(line, encoding, cb) {
7 // do something with the line here
8 cb()
9 }
10 })
11]).run()
import 'async-toolbox/events'
Augments the EventEmitter class with new capabilities
EventEmitter.onceAsync(event: string | symbol): Promise<any[]>
import { sequential, parallel } from 'async-toolbox/list'
Creates a chainable monad which facilitates asynchronous transformations of data.
This can be used to replace the following pattern of code:
await Promise.all(myList.map(async (item) => ...))
sequential(list, options?).flatMap(async (item) => ...)).flatMap(...
Creates a monad which executes each async task in sequence, i.e. one at a time.
Can accept a semaphore implementation which is passed along to each step in the chain.
parallel(list, options?).flatMap(async (item) => ...)).flatMap(...
Creates a monad which executes all async tasks in parallel, with optionally limited concurrency.
Can accept a semaphore implementation which is passed along to each step in the chain.
No vulnerabilities found.
Reason
no binaries found in the repo
Reason
Found 1/25 approved changesets -- score normalized to 0
Reason
0 commit(s) and 0 issue activity found in the last 90 days -- score normalized to 0
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
project is not fuzzed
Details
Reason
security policy file not detected
Details
Reason
license file not detected
Details
Reason
branch protection not enabled on development/release branches
Details
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
Reason
16 existing vulnerabilities detected
Details
Score
Last Scanned on 2025-02-03
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More