Gathering detailed insights and metrics for event-stream
Gathering detailed insights and metrics for event-stream
Gathering detailed insights and metrics for event-stream
Gathering detailed insights and metrics for event-stream
EventStream is like functional programming meets IO
npm install event-stream
Module System
Unable to determine the module system for this package.
Min. Node Version
Typescript Support
Node Version
NPM Version
2,185 Stars
322 Commits
150 Forks
71 Watching
1 Branches
33 Contributors
Updated on 19 Nov 2024
JavaScript (100%)
Cumulative downloads
Total Downloads
Last day
-13.2%
787,715
Compared to previous day
Last week
3.4%
4,773,865
Compared to previous week
Last month
8.6%
19,352,140
Compared to previous month
Last year
7.1%
201,014,597
Compared to previous year
5
Streams are node's best and most misunderstood idea, and EventStream is a toolkit to make creating and working with streams easy.
Normally, streams are only used for IO, but in event stream we send all kinds of objects down the pipe. If your application's input and output are streams, shouldn't the throughput be a stream too?
The EventStream functions resemble the array functions, because Streams are like Arrays, but laid out in time, rather than in memory.
All the event-stream
functions return instances of Stream
.
event-stream
creates 0.8 streams, which are compatible with 0.10 streams.
NOTE: I shall use the term "through stream" to refer to a stream that is writable and readable.
NOTE for Gulp users: Merge will not work for gulp 4. merge-stream should be used.
1//pretty.js 2 3if(!module.parent) { 4 var es = require('event-stream') 5 var inspect = require('util').inspect 6 7 process.stdin //connect streams together with `pipe` 8 .pipe(es.split()) //split stream to break on newlines 9 .pipe(es.map(function (data, cb) { //turn this async function into a stream 10 cb(null 11 , inspect(JSON.parse(data))) //render it nicely 12 })) 13 .pipe(process.stdout) // pipe it to stdout ! 14}
run it ...
1curl -sS registry.npmjs.org/event-stream | node pretty.js
Re-emits data synchronously. Easy way to create synchronous through streams.
Pass in optional write
and end
methods. They will be called in the
context of the stream. Use this.pause()
and this.resume()
to manage flow.
Check this.paused
to see current flow state. (write always returns !this.paused
)
this function is the basis for most of the synchronous streams in event-stream
.
1 2es.through(function write(data) { 3 this.emit('data', data) 4 //this.pause() 5 }, 6 function end () { //optional 7 this.emit('end') 8 }) 9
Create a through stream from an asynchronous function.
1var es = require('event-stream') 2 3es.map(function (data, callback) { 4 //transform data 5 // ... 6 callback(null, data) 7}) 8
Each map MUST call the callback. It may callback with data, with an error or with no arguments,
callback()
drop this data.
this makes the map work like filter
,
note:callback(null,null)
is not the same, and will emit null
callback(null, newData)
turn data into newData
callback(error)
emit an error for this item.
Note: if a callback is not called,
map
will think that it is still being processed,
every call must be answered or the stream will not know when to end.Also, if the callback is called more than once, every call but the first will be ignored.
Same as map
, but the callback is called synchronously. Based on es.through
Map elements nested.
1var es = require('event-stream') 2 3es.flatmapSync(function (data) { 4 //transform data 5 // ... 6 return data 7}) 8
Filter elements.
1var es = require('event-stream') 2 3es.filterSync(function (data) { 4 return data > 0 5}) 6
Break up a stream and reassemble it so that each line is a chunk. matcher may be a String
, or a RegExp
Example, read every line in a file ...
1fs.createReadStream(file, {flags: 'r'}) 2 .pipe(es.split()) 3 .pipe(es.map(function (line, cb) { 4 //do something with the line 5 cb(null, line) 6 }))
split
takes the same arguments as string.split
except it defaults to '\n' instead of ',', and the optional limit
parameter is ignored.
String#split
NOTE - Maintaining Line Breaks
If you want to process each line of the stream, transform the data, reassemble, and KEEP the line breaks the example will look like this:
1fs.createReadStream(file, {flags: 'r'}) 2 .pipe(es.split(/(\r?\n)/)) 3 .pipe(es.map(function (line, cb) { 4 //do something with the line 5 cb(null, line) 6 }))
This technique is mentioned in the underlying documentation for the split npm package.
Create a through stream that emits separator
between each chunk, just like Array#join.
(for legacy reasons, if you pass a callback instead of a string, join is a synonym for es.wait
)
concat → merge
Merges streams into one and returns it.
Incoming data will be emitted as soon it comes into - no ordering will be applied (for example: data1 data1 data2 data1 data2
- where data1
and data2
is data from two streams).
Counts how many streams were passed to it and emits end only when all streams emitted end.
1es.merge( 2 process.stdout, 3 process.stderr 4).pipe(fs.createWriteStream('output.log'));
It can also take an Array of streams as input like this:
1es.merge([ 2 fs.createReadStream('input1.txt'), 3 fs.createReadStream('input2.txt') 4]).pipe(fs.createWriteStream('output.log'));
Replace all occurrences of from
with to
. from
may be a String
or a RegExp
.
Works just like string.split(from).join(to)
, but streaming.
Convenience function for parsing JSON chunks. For newline separated JSON,
use with es.split
. By default it logs parsing errors by console.error
;
for another behaviour, transforms created by es.parse({error: true})
will
emit error events for exceptions thrown from JSON.parse
, unmodified.
1fs.createReadStream(filename) 2 .pipe(es.split()) //defaults to lines. 3 .pipe(es.parse())
convert javascript objects into lines of text. The text will have whitespace escaped and have a \n
appended, so it will be compatible with es.parse
1objectStream 2 .pipe(es.stringify()) 3 .pipe(fs.createWriteStream(filename))
create a readable stream (that respects pause) from an async function.
while the stream is not paused,
the function will be polled with (count, callback)
,
and this
will be the readable stream.
1 2es.readable(function (count, callback) { 3 if(streamHasEnded) 4 return this.emit('end') 5 6 //... 7 8 this.emit('data', data) //use this way to emit multiple chunks per call. 9 10 callback() // you MUST always call the callback eventually. 11 // the function will not be called again until you do this. 12})
you can also pass the data and the error to the callback.
you may only call the callback once.
calling the same callback more than once will have no effect.
Create a readable stream from an Array.
Just emit each item as a data event, respecting pause
and resume
.
1 var es = require('event-stream') 2 , reader = es.readArray([1,2,3]) 3 4 reader.pipe(...)
If you want the stream behave like a 0.10 stream you will need to wrap it using Readable.wrap()
function. Example:
1 var s = new stream.Readable({objectMode: true}).wrap(es.readArray([1,2,3]));
create a writeable stream from a callback,
all data
events are stored in an array, which is passed to the callback when the stream ends.
1 var es = require('event-stream') 2 , reader = es.readArray([1, 2, 3]) 3 , writer = es.writeArray(function (err, array){ 4 //array deepEqual [1, 2, 3] 5 }) 6 7 reader.pipe(writer)
A stream that buffers all chunks when paused.
1 var ps = es.pause() 2 ps.pause() //buffer the stream, also do not allow 'end' 3 ps.resume() //allow chunks through
Takes a writable stream and a readable stream and makes them appear as a readable writable stream.
It is assumed that the two streams are connected to each other in some way.
(This is used by pipeline
and child
.)
1 var grep = cp.exec('grep Stream') 2 3 es.duplex(grep.stdin, grep.stdout)
Create a through stream from a child process ...
1 var cp = require('child_process') 2 3 es.child(cp.exec('grep Stream')) // a through stream 4
waits for stream to emit 'end'. joins chunks of a stream into a single string or buffer. takes an optional callback, which will be passed the complete string/buffer when it receives the 'end' event.
also, emits a single 'data' event.
1 2readStream.pipe(es.wait(function (err, body) { 3 // have complete text here. 4})) 5
These modules are not included as a part of EventStream but may be useful when working with streams.
Like Array.prototype.reduce
but for streams. Given a sync reduce
function and an initial value it will return a through stream that emits
a single data event with the reduced value once the input stream ends.
1var reduce = require("stream-reduce"); 2process.stdin.pipe(reduce(function(acc, data) { 3 return acc + data.length; 4}, 0)).on("data", function(length) { 5 console.log("stdin size:", length); 6});
The latest stable version of the package.
Stable Version
1
9.8/10
Summary
Critical severity vulnerability that affects event-stream and flatmap-stream
Affected Versions
= 3.3.6
Patched Versions
4.0.0
Reason
no binaries found in the repo
Reason
license file detected
Details
Reason
3 existing vulnerabilities detected
Details
Reason
Found 3/25 approved changesets -- score normalized to 1
Reason
project is archived
Details
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
security policy file not detected
Details
Reason
project is not fuzzed
Details
Reason
branch protection not enabled on development/release branches
Details
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
Score
Last Scanned on 2024-11-25
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More