Gathering detailed insights and metrics for @rewaa/event-broker
Gathering detailed insights and metrics for @rewaa/event-broker
Gathering detailed insights and metrics for @rewaa/event-broker
Gathering detailed insights and metrics for @rewaa/event-broker
worker-timers-broker
The broker which is used by the worker-timers package.
@qrvey/event-broker
![install size](https://packagephobia.com/badge?p=@qrvey/event-broker) ![coverage](https://img.shields.io/badge/unit_test_coverage-0%25-brightgreen)
extendable-media-recorder-wav-encoder-broker
The broker which is used by the extendable-media-recorder-wav-encoder package.
media-encoder-host-broker
The broker which is used by the media-encoder-host package.
A broker for all the events that Rewaa consumes or produces
npm install @rewaa/event-broker
Module System
Min. Node Version
Typescript Support
Node Version
NPM Version
3 Stars
249 Commits
1 Forks
15 Watching
53 Branches
9 Contributors
Updated on 25 Nov 2024
TypeScript (100%)
Cumulative downloads
Total Downloads
Last day
0.6%
3,299
Compared to previous day
Last week
-1.9%
18,500
Compared to previous week
Last month
-13.3%
90,742
Compared to previous month
Last year
867.8%
702,998
Compared to previous year
3
3
3
A broker for producing and consuming messages across multiple micro-services.
This package is intended to abstract out the functionality of an event broker keeping the underlying client hidden.
For now, the underlying client is SNS+SQS. So this package requires AWS and is limited in functionality by the quotas for SNS and SQS respectively.
The broker supports 2 exchange types currently:
Queue: This means that the topic will have 1 - 1 mapping between the producer and consumer i.e. only 1 consumer can consume the messages on this type of topic.
Fanout: This type of exchange means a 1 - Many mapping between the producer and consumer. So messages on this type of topic can be consumed by multiple consumers. For fanout topics, message filtering is also supported.
Topics can be mapped to lambda functions for consumption. The broker supports specifying lambda functions along with the batch size in which case it does the mapping by itself. However it is recommended to do the mapping in the serverless file. For this purpose, the broker also exposes helper functions to get the internally generated ARNs of both the topic and queues.
A serverless plugin can be used in this case.
The broker also exposes a method processMessage
which takes in any consumed message and executes it as per the mapping it has. This can be used in case you are using only 1 lambda for all types of topics. Since the broker knows about the mapping of events to functions, it will handle the execution.
This broker package can be simulated on an offline platform like localstack. As long as Lambda, SQS and SNS are on the same network the broker will work. For offline support, the broker takes as input any options for sqs, sns or lambda like the endpoint and a flag isLocal which when true will use the provided endpoints.
For serverless, serverless-offline-sqs can be used to redirect sqs to localstack so that the event source mapping can be entertained by serverless itself.
1import { 2 Emitter, 3 IEmitterOptions, 4} from "@rewaa/event-broker"; 5import { EventEmitter } from 'events'; 6 7const env = `local`; 8const region = `us-east-1`; 9const emitterOptions: IEmitterOptions = { 10 environment: env, 11 localEmitter: new EventEmitter(), 12 useExternalBroker: `true`, 13 awsConfig: { 14 accountId: `000000000000`, 15 region 16 }, 17 log: true, 18}; 19 20if(env === 'local') { 21 emitterOptions.isLocal = true; 22 emitterOptions.lambdaConfig = { 23 endpoint: `http://localhost:4000`, 24 region 25 } 26 emitterOptions.sqsConfig = { 27 endpoint: `http://localhost:4566`, 28 region 29 } 30 emitterOptions.snsConfig = { 31 endpoint: `http://localhost:4566`, 32 region 33 } 34} 35 36const emitter = new Emitter(this.emitterOptions);
1import { ExchangeType } from '@rewaa/event-broker'; 2 3interface Notification { 4 name: string; 5 payload: any; 6} 7 8emitter.on<Notification>("Notification", 9 async (...data) => { 10 const input = data[0]; // typeof === Notification 11 // Do something with the Notification 12 }, { 13 isFifo: false, 14 exchangeType: ExchangeType.Queue, 15 deadLetterQueueEnabled: true, 16 } 17);
1import { ExchangeType } from '@rewaa/event-broker'; 2 3const notification = { 4 name: `Some notification`, 5 payload: { 6 text: `Hello` 7 } 8} 9 10await emitter.emit("Notification", { 11 exchangeType: ExchangeType.Queue, 12 isFifo: false 13}, notification);
For fanout, the exchange type of message must be 'Fanout'.
Emitter:
1import { ExchangeType } from '@rewaa/event-broker'; 2 3const notification = { 4 name: `Some notification`, 5 payload: { 6 text: `Hello` 7 } 8} 9 10await emitter.emit("Notification", { 11 exchangeType: ExchangeType.Fanout, 12 isFifo: false 13}, notification);
Consumer 1:
1import { ExchangeType } from '@rewaa/event-broker'; 2 3emitter.on<Notification>("Notification", 4 async (...data) => { 5 const input = data[0]; // typeof === Notification 6 // Do something with the Notification 7 }, { 8 isFifo: false, 9 exchangeType: ExchangeType.Fanout, 10 deadLetterQueueEnabled: true, 11 separateConsumerGroup: "consumer_group_1" 12 } 13);
Consumer 2:
1import { ExchangeType } from '@rewaa/event-broker'; 2 3emitter.on<Notification>("Notification", 4 async (...data) => { 5 const input = data[0]; // typeof === Notification 6 // Do something with the Notification 7 }, { 8 isFifo: false, 9 exchangeType: ExchangeType.Fanout, 10 deadLetterQueueEnabled: true, 11 separateConsumerGroup: "consumer_group_2" 12 } 13);
The deployment of resources created by the broker is a separate process extracted out in the method bootstrap
.
This method takes an optional array of topics which can be useful for serverless case where we might not have attached the consumer by calling the on
method.
if you have called the on
method on the emitter object for all the topics before calling bootstrap
, then the topics array is not required.
bootsrap
can be called only once during deployment. The APIs used internally are idempotent so providing the same topics won't create duplicate resources.
Following table shows which properties are automatically updated when bootstrapping if changed in a topic:
Property | Updated |
---|---|
visibilityTimeout | Yes |
batchSize | Yes |
maxRetryCount | Yes |
deadLetterQueueEnabled | Yes, only attaches/detaches the DLQ. Doesn't delete it |
separateConsumerGroup | Yes, creates a new queue. Old one is not deleted |
enableHighThroughput | Yes |
retentionPeriod | Yes |
contentBasedDeduplication | Yes |
tags | Yes |
The following things are part of the roadmap for the broker:
emit
or attaching a consumer using on
. The registration part can be part of the deployment phase.No vulnerabilities found.
No security vulnerabilities found.