Gathering detailed insights and metrics for @dev.smartpricing/alyxstream
Gathering detailed insights and metrics for @dev.smartpricing/alyxstream
Alyxstream is a library that simplify stream processing in Node.js. We use it in production to make real time logs analysis, errors detection, parallel job processing, using Kafka, Redis and Nats as sources
npm install @dev.smartpricing/alyxstream
Typescript
Module System
Node Version
NPM Version
JavaScript (94.59%)
TypeScript (5.41%)
Total Downloads
9,889
Last Day
16
Last Week
118
Last Month
1,120
Last Year
9,031
9 Stars
151 Commits
1 Forks
9 Watching
4 Branches
25 Contributors
Minified
Minified + Gzipped
Latest Version
0.9.0
Package Id
@dev.smartpricing/alyxstream@0.9.0
Unpacked Size
427.36 kB
Size
53.02 kB
File Count
117
NPM Version
8.15.0
Node Version
16.10.0
Publised On
13 Nov 2024
Cumulative downloads
Total Downloads
Last day
-50%
16
Compared to previous day
Last week
-48.9%
118
Compared to previous week
Last month
-13.7%
1,120
Compared to previous month
Last year
952.6%
9,031
Compared to previous year
Alyxstream is a library that simplify stream processing in Node.js. We use it in production to make real time logs analysis, errors detection, parallel job processing, using mainly Kafka as source and Cassandra and Redis as sinks. Although it's not perfect and still under active development, this library could help you to solve a lot of processing problems, with a nice dataflow syntax.
Out-of-the-box sources/sinks:
Working usage examples are in the usage-examples folder.
Install it:
1npm install @dev.smartpricing/alyxstream
1import {
2 Task,
3 MakeStorage,
4 StorageKind,
5 KafkaClient,
6 KafkaSource
7} from '@dev.smartpricing/alyxstream'
8
9const kafkaSource = KafkaSource(KafkaClient())
10
11await Task()
12.withLocalKVStorage()
13.withStorage(MakeStorage(StorageKind.Cassandra, null, 'hotel.errors.count'))
14.fromKafka(kafkaSource)
15.setLocalKV('kafka-mex', x => x)
16.withEventTime(x => x.eventTime)
17.keyBy(x => x.partition)
18.map(x => x.value)
19.filter(x => x.warningLevel == 'error')
20.slidingWindowTime(MakeStorage(StorageKind.Redis, null, 'my.window'), 5 * 60 * 1000, 60 * 1000)
21.groupBy(x => x.hotelName)
22.sumMap()
23.toStorage(x => 'result', x => x)
24.getLocalKV('kafka-mex')
25.kafkaCommit(kafkaSource)
26.close()
Alyxstream supports multiple sources by default, both for streaming and batch processing. It'also very easy to build your custom sources.
Always import Task, and Kafka Client/Source if you need Kafka support.
1import { 2 Task, 3 KafkaClient, 4 KafkaSource 5} from '@dev.smartpricing/alyxstream'
For every Task, remember to call the close method at the end of the task pipeline. The close method signal the Task that it can start to process the data stream
Array source, the downstream pipeline is called for every element of the array:
1await Task().fromArray([1,2,3]).close()
Object source, the downstream pipeline is called once:
1await Task().fromObject([1,2,3]).close() 2await Task().fromObject({name: 'alice'}).close()
String source:
1await Task().fromString('Alice').close()
From readable stream:
1await Task().fromReadableStream('/path/to/file.csv').close()
2// With integrated unzip
3await Task().fromReadableStream('/path/to/file.csv.gz', true).close()
From Kafka [The Kafka client/source/sink it's exaplained well below]
1const kafkaSource = await KafkaSource(KafkaClient({ 2 clientId: 'clientId' 3}), { 4 groupId: 'groupId', 5 topics: ['mytopic'] 6}) 7await Task().fromKafka(kafkaSource).close()
You can also define a Task without a source, and then inject the payload
For the inject source, the close function it's not needed
1const task = await Task().print('>') 2 3for (var i = 0; i < 10; i += 1) { 4 await task.inject(i) 5}
To get back the last result of a Task, use the finalize method:
1const t = await Task().fromString('Alice').close() 2 3const result = await t.finalize()
Alyxstream support keyed stream processing. In case you don't need it, you can set the key to default with the withDefaultKey() operator.
1await Task().withDefaultKey()
Instead if you need a keyed processing (like in windows), you have to set it with the keyBy operator, usually after the source operator.
1await Task().keyBy(x => x.myKey)
2await Task().keyBy(x => 'customKey')
If you want to use event time based processing, you have to specify it, usually after the source operator. If not specified, processing time is used.
1await Task().withEventTime(x => new Date(x.originalDate))
You can stop the pipeline execution for a message that is not conforming with your needs:
1await Task().filter(x => x % 2 == 0)
And in every pipeline step, you can print the current operator state:
1await Task().print('1 >')
The brach operator helps you to build DAG flow processing (the syntax will be improved):
1await Task() 2.fromArray([1,2,3]) 3.branch([ 4 async () => { return await Task().print('1 >') }, 5 async () => { return await Task().print('2 >') } 6]) 7.close()
Array operators help you to transform array data:
map take an input array and apply the map function to every element:
1await Task() 2.fromObject([1,2,3]) 3.map(x => x * 2) 4.print() // [2,4,6] 5.close()
each will call the downstream pipeline steps for every array argument (the same of fromArray source operator)
1await Task() 2.fromObject([1,2,3]) 3.each() 4.print() 5.close() 6// 1 7// 2 8// 3
groupBy take an array, and returns an object.
1await Task() 2.fromObject([ {name: 'Alice'}, {name: 'Andrea'}, {name: 'Paolo'}, {name: 'Alice'} ]) 3.groupBy(x => x.name) 4.print() 5.close() 6// { 7// Alice: [{name: 'Alice'}, {name: 'Alice'}], 8// Paolo: [{name: 'Paolo'}], 9// Andrea: [{name: 'Andrea'}] 10// }
sumMap take an object and count the array elements for every key.
1await Task() 2.fromObject([ {name: 'Alice'}, {name: 'Andrea'}, {name: 'Paolo'}, {name: 'Alice'} ]) 3.groupBy(x => x.name) 4.sumMap() 5.print() 6.close() 7// { 8// Alice: 2, 9// Paolo: 1, 10// Andrea: 1 11// }
1const storage = MakeStorage(StorageKind.Memory, null, 'example') 2 3const t = await Task() 4.fromArray(['hello', 'hello', 'alice']) 5.aggregate(storage, 'ex', x => x) 6.sumMap() 7.print('> step result:')
You can of course use any JS custom made function to process your data. You have to wrap your code inside a function (or an async function)
With synchronous functions:
1await Task() 2.fromArray([1,2,3]) 3.fn(x => { 4 // x will be 1, then 2, then 3 5 return x * 2 6}) 7.close()
or asynchronous:
1await Task() 2.fromArray([1,2,3]) 3.fn(async x => { 4 return await asyncFunctionYouHaveToCall(x) 5}) 6.close()
The x callback variable is the data flowing in the pipeline (your payload).
Alyxstream wraps your payload inside an internal datastructure, that is an object:
1Message = { 2 payload: 'YOUR_PAYLOAD', 3 metadata: {}, // Where keyBy, eventTime metadata are keept in memory, 4 globalState: {} // Here you can place state that must persist within steps 5}
In case you need to access the raw message with your custom functions, uses the fnRaw variant:
1await Task()
2.fromArray([1,2,3])
3.fnRaw(x => {
4 // x = {payload: 1, metadata: {}, globalState: {}}
5 x.payload *= 2
6 return x
7})
8.close()
Alyxstream has five kind of windows:
Window's state storage can be in memory or inside Redis (or Redis compatible DB like KVRocks) instance. Every window has an inactivity time period, to emit the window result even if no events can trigger it.
In time based windows, there is a watermark concept, so when using event time processing, later records are not allowed (a grace period will be implemented soon).
Windows split stream based on the key you have defined (keyBy operator). They flush the storage every time a window is ready to be emitted.
Base config:
1import { 2 Task, 3 MakeStorage, 4 StorageKind 5} from '@dev.smartpricing/alyxstream' 6 7const redisConfig = {} // default to localhost:6379 8const exampleWindowStorage = MakeStorage(StorageKind.Redis, redisConfig, 'windowStorageId')
tumblingWindowCount, with 100 elements length and 10 seconds max inactivity time:
1await Task() 2.fromKafka(...) 3.tumblingWindowCount(exampleWindowStorage, 100, 10000) 4.close()
tumblingWindowTime, 1 minute length and 10 seconds max inactivity time:
1await Task() 2.fromKafka(...) 3.tumblingWindowTime(exampleWindowStorage, 60000, 10000) 4.close()
slidingWindowCount, with 100 elements length, 25 elements slide, and 10 seconds max inactivity time:
1await Task() 2.fromKafka(...) 3.slidingWindowCount(exampleWindowStorage, 100, 25, 10000) 4.close()
slidingWindowTime, 1 minute length, 5 seconds slide, and 10 seconds max inactivity time:
1await Task() 2.fromKafka(...) 3.slidingWindowTime(exampleWindowStorage, 60000, 5000, 10000) 4.close()
sessionWindowTime, max 15 seconds inactivity
1await Task() 2.fromKafka(...) 3.sessionWindowTime(exampleWindowStorage, 150000) 4.close()
KafkaClient:
1import {
2 KafkaClient
3} from '@dev.smartpricing/alyxstream'
4
5const kafkaClient = KafkaClient({
6 clientId: 'my-client'
7 brokers: ['localhost:9092'],
8 ssl: false,
9 sasl: ({ // for Confluent access
10 mechanism: 'plain',
11 username: 'username',
12 password: 'password'
13 })
14})
KafkaSource:
1import { 2 KafkaSource 3} from '@dev.smartpricing/alyxstream' 4 5const topic = { 6 topic: 'my-topic', 7 fromBeginning: false, 8 autoCommit: false, 9 autoHeartbeat: 5000 10} 11 12const kafkaSource = KafkaSource(kafkaClient, { 13 groupId: 'my-group-id', 14 topics: [topic] 15})
Redis queue use Redis list with BRPOP command in order to distrubute jobs between workers:
1import { Task, MakeStorage, StorageKind } from '@dev.smartpricing/alyxstream' 2 3const queueStorage = MakeStorage(StorageKind.Redis, null, 'my-queue') 4 5async function producer () { 6 const t = await Task() 7 .fromReadableStream('data.csv.gz', true) 8 .readline() 9 .tumblingWindowCount(MakeStorage(StorageKind.Memory, null, 'my-queue-win'), 10) 10 .enqueue(queueStorage) 11 .fn(async (x) => { 12 while (true) { 13 const queueSize = await queueStorage.queueSize() 14 if (queueSize > 100) { 15 await new Promise(resolve => setTimeout(resolve, 1000)) 16 } else { 17 break 18 } 19 } 20 }) 21 .close() 22} 23 24async function consumer () { 25 const t = await Task() 26 .dequeue(queueStorage) 27 .fn(async x => { 28 console.log(x) 29 return x 30 }) 31 .close() 32} 33 34async function run () { 35 if (process.env.RUN == 'producer') { 36 await producer() 37 } else { 38 await consumer() 39 } 40} 41 42run()
Alyxstream supports out of the box three kind of storage: memory, Redis and Cassandra. Memory and Redis storage are suitable for windows state storage.
1import { MakeStorage, StorageKind } from '@dev.smartpricing/alyxstream' 2 3const memStorage = MakeStorage(StorageKind.Memory, null, 'storage-1') 4 5const redisStorage = MakeStorage(StorageKind.Redis, null, 'storage-1') 6 7const cassandraStorage = MakeStorage(StorageKind.Cassandra, null, 'storage-1')
Accessing raw client:
1 2const redisStorage = MakeStorage(StorageKind.Redis, null, 'storage-1') 3const redisStorageClient = redisStorage.db() // io-redis 4 5 6const cassandraStorage = MakeStorage(StorageKind.Cassandra, null, 'storage-1') 7const cassandraStorageClient = cassandraStorage.db() // cassandra-driver
1import { Task } from '@dev.smartpricing/alyxstream'
2
3await Task()
4.withLocalKVStorage()
5.setLocalKV('my-var', x => x * 2)
6.getLocalKV('my-var')
7.mergeLocalKV('my-var')
You can create your custom functions and call the functions from a task:
1import { Task, ExtendTask, ExtendTaskRaw } from '@dev.smartpricing/alyxstream' 2 3export const multiplyBy = ExtendTask('multiplyBy', async function (x, multiplier) { 4 return x * multiplier 5}) 6 7export const multiplyByRaw = ExtendTaskRaw('multiplyByRaw', async function (x, multiplier) { 8 return x.payload * multiplier 9}) 10 11await Task() 12.multiplyBy(2) 13.multiplyByRaw(2) 14.inject(3) // output 12
Alyxstream contains a wrapper around itself in order to simplify a special use case of Kafka communication between microservices. It's called Exchange and allow bidirectional communication between two (or more) services.
1import { 2 Exchange, 3 KafkaClient 4} from '@dev.smartpricing/alyxstream' 5 6const client = KafkaClient({ 7 clientId: 'my-client-1', 8 brokers: ['localhost:9092'], 9}) 10 11let mex = { 12 kind: 'TestObject', 13 metadata: { 14 key: '1' 15 }, 16 spec: { 17 value: 1 18 } 19} 20 21/** Exchange(KafkaClient, topicName, groupId, sourceoptions as kafka js options) */ 22const ex1 = await Exchange(client, 'alyxstream-exchange-01', 'ae-01', {autoCommit: false}) 23const ex2 = await Exchange(client, 'alyxstream-exchange-02', 'ae-02', {autoCommit: false}) 24 25await ex1.emit(mex) 26await ex1.on(async (messagge) => { 27 console.log('sub', messagge) 28 messagge.spec.value += 1 29 await ex2.emit(messagge) 30})
In order to override the default key parser and messagge validator:
1const ex1 = await Exchange(client, 'alyxstream-exchange-01', 'ae-01', {autoCommit: false})
2ex1.setKeyParser(x => x.metadata.myKey)
3ex1.setValidationFunction(x => {
4 if (x.spec.myValue == undefined) {
5 return false
6 }
7 return true
8})
You can process a stream of data using multiple Node.js process:
1import { Task } from '@dev.smartpricing/alyxstream' 2 3await Task() 4.parallel(3) 5.dequeue(STORAGE) 6.close()
Producer:
1import { Task, NatsClient } from '@dev.smartpricing/alyxstream'; 2 3const nc = await NatsClient() 4 5const t = await Task() 6.toNats(nc, 'sp-test.a.a') 7 8for (var i = 0; i < 100; i += 1) { 9 await t.inject({key: i}) 10} 11
Consumer:
1import { Task, NatsClient, NatsJetstreamSource } from '@dev.smartpricing/alyxstream'; 2 3const nc = await NatsClient() 4const source = await NatsJetstreamSource(nc, [{ 5 stream: 'sp-test', 6 durable_name: 'worker-4', 7 ack_policy: 'Explicit', 8 filter_subjects: ['sp-test.a.a', 'sp-test.a.b'] 9}]) 10 11await Task() 12.fromNats(source) 13.print('>') 14.close()
Using Smartlocks libs, we can acquire global locks and have atomic counters.
Lock/Release:
1import { Mutex, StorageKind as MSKind } from 'smartlocks' 2const lockStorage = Mutex(MSKind.Cassandra, null) 3 4await Task() 5.parallel(5) 6.fromArray([{i: 1}, {i: 2}, {i: 3}]) 7.fn(x => { 8 x.i = x.i * 2 9 return x 10}) 11.lock(lockStorage, x => 'my-lock') 12.fn(x => { 13 console.log(x) 14}) 15.release(lockStorage, x => 'my-lock') 16.close()
Pulsar functions are not maintained.
Pulsar producer:
1import { Task, PulsarClient, PulsarSink } from '@dev.smartpricing/alyxstream';
2
3(async () => {
4 const client = PulsarClient({
5 serviceUrl: 'pulsar://localhost:6650'
6 })
7
8 const pulsarSink = await PulsarSink(client, {
9 topic: 'non-persistent://public/default/my-topic-1',
10 batchingEnabled: true,
11 batchingMaxPublishDelayMs: 10
12 })
13
14 const t = await Task()
15 .toPulsar(pulsarSink, x => x.val, x => x)
16 .flushPulsar(pulsarSink)
17
18 for (var i = 0; i < 1000; i += 1) {
19 await t.inject({val: i})
20 }
21})()
Pulsar consumer:
1import { Task, PulsarClient, PulsarSource } from '@dev.smartpricing/alyxstream';
2
3(async () => {
4 const client = PulsarClient({
5 serviceUrl: 'pulsar://localhost:6650'
6 })
7
8 const pulsarSource = await PulsarSource(client, {
9 topic: 'non-persistent://public/default/my-topic-1',
10 subscription: 'sub1',
11 subscriptionType: "Shared", //'Failover'
12 parseWith: (x) => x
13 })
14
15 await Task()
16 .withLocalKVStorage()
17 .fromPulsar(pulsarSource)
18 .setLocalKV('local-mex', x => x)
19 .parsePulsar()
20 .print('>>>')
21 .getLocalKV('local-mex')
22 .ackPulsar(pulsarSource)
23 .close()
24})()
No vulnerabilities found.
No security vulnerabilities found.