Gathering detailed insights and metrics for pull-stream
Gathering detailed insights and metrics for pull-stream
Gathering detailed insights and metrics for pull-stream
Gathering detailed insights and metrics for pull-stream
npm install pull-stream
Module System
Unable to determine the module system for this package.
Min. Node Version
Typescript Support
Node Version
NPM Version
797 Stars
410 Commits
78 Forks
15 Watching
2 Branches
33 Contributors
Updated on 30 Oct 2024
Minified
Minified + Gzipped
JavaScript (100%)
Cumulative downloads
Total Downloads
Last day
-2.7%
13,902
Compared to previous day
Last week
9.2%
76,664
Compared to previous week
Last month
15.8%
272,085
Compared to previous month
Last year
-48.5%
2,679,992
Compared to previous year
Minimal Pipeable Pull-stream
In classic-streams,
streams push data to the next stream in the pipeline.
In new-streams,
data is pulled out of the source stream, into the destination.
pull-stream
is a minimal take on streams,
pull streams work great for "object" streams as well as streams of raw text or binary data.
Stat some files:
1pull( 2 pull.values(['file1', 'file2', 'file3']), 3 pull.asyncMap(fs.stat), 4 pull.collect(function (err, array) { 5 console.log(array) 6 }) 7)
Note that pull(a, b, c)
is basically the same as a.pipe(b).pipe(c)
.
To grok how pull-streams work, read through pull-streams workshop
There is a module for that!
Check the pull-stream FAQ and post an issue if you have a question that is not covered.
pull-streams are not directly compatible with node streams, but pull-streams can be converted into node streams with pull-stream-to-stream and node streams can be converted into pull-stream using stream-to-pull-stream correct back pressure is preserved.
Instead of a readable stream, and a writable stream, there is a readable
stream,
(aka "Source") and a reader
stream (aka "Sink"). Through streams
is a Sink that returns a Source.
See also:
A Source is a function read(end, cb)
,
that may be called many times,
and will (asynchronously) call cb(null, data)
once for each call.
To signify an end state, the stream eventually returns cb(err)
or cb(true)
.
When signifying an end state, data
must be ignored.
The read
function must not be called until the previous call has called back.
Unless, it is a call to abort the stream (read(Error || true, cb)
).
1var n = 5; 2 3// random is a source 5 of random numbers. 4function random (end, cb) { 5 if(end) return cb(end) 6 // only read n times, then stop. 7 if(0 > --n) return cb(true) 8 cb(null, Math.random()) 9} 10
A Sink is a function reader(read)
that calls a Source (read(null, cb)
),
until it decides to stop (by calling read(true, cb)
), or the readable ends (read
calls
cb(Error || true)
All Throughs and Sinks are reader streams.
1// logger reads a source and logs it. 2function logger (read) { 3 read(null, function next(end, data) { 4 if(end === true) return 5 if(end) throw end 6 7 console.log(data) 8 read(null, next) 9 }) 10}
Since Sources and Sinks are functions, you can pass them to each other!
1logger(random) //"pipe" the streams. 2
but, it's easier to read if you use's pull-stream's pull
method
1var pull = require('pull-stream') 2 3pull(random, logger)
When working with pull streams it is common to create functions that return a stream.
This is because streams contain mutable state and so can only be used once.
In the above example, once random
has been connected to a sink and has produced 5 random numbers it will not produce any more random numbers if connected to another sink.
Therefore, use a function like this to create a random number generating stream that can be reused:
1 2// create a stream of n random numbers 3function createRandomStream (n) { 4 return function randomReadable (end, cb) { 5 if(end) return cb(end) 6 if(0 > --n) return cb(true) 7 cb(null, Math.random()) 8 } 9} 10 11pull(createRandomStream(5), logger)
A through stream is both a reader (consumes values) and a readable (produces values).
It's a function that takes a read
function (a Sink),
and returns another read
function (a Source).
1// double is a through stream that doubles values. 2function double (read) { 3 return function readable (end, cb) { 4 read(end, function (end, data) { 5 cb(end, data != null ? data * 2 : null) 6 }) 7 } 8} 9 10pull(createRandomStream(5), double, logger)
Every pipeline must go from a source
to a sink
.
Data will not start moving until the whole thing is connected.
1pull(source, through, sink)
some times, it's simplest to describe a stream in terms of other streams. pull can detect what sort of stream it starts with (by counting arguments) and if you pull together through streams, it gives you a new through stream.
1var tripleThrough = 2 pull(through1(), through2(), through3()) 3// The three through streams become one. 4 5pull(source(), tripleThrough, sink())
pull detects if it's missing a Source by checking function arity, if the function takes only one argument it's either a sink or a through. Otherwise it's a Source.
Duplex streams, which are used to communicate between two things,
(i.e. over a network) are a little different. In a duplex stream,
messages go both ways, so instead of a single function that represents the stream,
you need a pair of streams. {source: sourceStream, sink: sinkStream}
Pipe duplex streams like this:
1var a = duplex() 2var b = duplex() 3 4pull(a.source, b.sink) 5pull(b.source, a.sink) 6 7//which is the same as 8 9b.sink(a.source); a.sink(b.source) 10 11//but the easiest way is to allow pull to handle this 12 13pull(a, b, a) 14 15//"pull from a to b and then back to a" 16
This means two duplex streams communicating actually forms two completely independent pipelines:
As a result, one pipeline might finish or error out before or after the other. A duplex stream is only "finished" once both pipelines are done communicating.
There is a deeper, platonic abstraction, where a streams is just an array in time, instead of in space. And all the various streaming "abstractions" are just crude implementations of this abstract idea.
classic-streams, new-streams, reducers
The objective here is to find a simple realization of the best features of the above.
A stream abstraction should be able to handle both streams of text and streams of objects.
Something like this should work: a.pipe(x.pipe(y).pipe(z)).pipe(b)
this makes it possible to write a custom stream simply by
combining a few available streams.
If a stream ends in an unexpected way (error), then other streams in the pipeline should be notified. (this is a problem in node streams - when an error occurs, the stream is disconnected, and the user must handle that specially)
Also, the stream should be able to be ended from either end.
Very simple transform streams must be able to transfer back pressure instantly.
This is a problem in node streams, pause is only transfered on write, so
on a long chain (a.pipe(b).pipe(c)
), if c
pauses, b
will have to write to it
to pause, and then a
will have to write to b
to pause.
If b
only transforms a
's output, then a
will have to write to b
twice to
find out that c
is paused.
reducers reducers has an interesting method, where synchronous tranformations propagate back pressure instantly!
This means you can have two "smart" streams doing io at the ends, and lots of dumb streams in the middle, and back pressure will work perfectly, as if the dumb streams are not there.
This makes laziness work right.
in pull streams, any part of the stream (source, sink, or through) may terminate the stream. (this is the case with node streams too, but it's not handled well).
A source may end (cb(true)
after read) or error (cb(error)
after read)
After ending, the source must never cb(null, data)
Sinks do not normally end the stream, but if they decide they do
not need any more data they may "abort" the source by calling read(true, cb)
.
A abort (read(true, cb)
) may be called before a preceding read call
has called back.
Rules for implementing read
in a through stream:
Sink wants to stop. sink aborts the through
just forward the exact read() call to your source, any future read calls should cb(true).
We want to stop. (abort from the middle of the stream)
abort your source, and then cb(true) to tell the sink we have ended.
If the source errored during abort, end the sink by cb read with cb(err)
.
(this will be an ordinary end/error for the sink)
Source wants to stop. (read(null, cb) -> cb(err||true)
)
forward that exact callback towards the sink chain,
we must respond to any future read calls with cb(err||true)
.
In none of the above cases data is flowing!
4) If data is flowing (normal operation: read(null, cb) -> cb(null, data)
forward data downstream (towards the Sink)
do none of the above!
There either is data flowing (4) OR you have the error/abort cases (1-3), never both.
A pull stream source (and thus transform) returns exactly one value per read.
This differs from node streams, which can use this.push(value)
and in internal
buffer to create transforms that write many values from a single read value.
Pull streams don't come with their own buffering mechanism, but there are ways to get around this.
If you need only the pull
function from this package you can reduce the size
of the imported code (for instance to reduce a Browserify bundle) by requiring
it directly:
1var pull = require('pull-stream/pull') 2 3pull(createRandomStream(5), logger())
Explore this repo further for more information about sources, throughs, sinks, and glossary.
MIT
No vulnerabilities found.
Reason
no dangerous workflow patterns detected
Reason
no binaries found in the repo
Reason
0 existing vulnerabilities detected
Reason
license file detected
Details
Reason
Found 11/14 approved changesets -- score normalized to 7
Reason
0 commit(s) and 0 issue activity found in the last 90 days -- score normalized to 0
Reason
detected GitHub workflow tokens with excessive permissions
Details
Reason
dependency not pinned by hash detected -- score normalized to 0
Details
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
security policy file not detected
Details
Reason
project is not fuzzed
Details
Reason
branch protection not enabled on development/release branches
Details
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
Score
Last Scanned on 2024-11-18
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More