Installations
npm install kubemq-js
Developer Guide
Typescript
Yes
Module System
CommonJS, ESM
Min. Node Version
>=14.0.0
Node Version
20.17.0
NPM Version
10.8.2
Score
61.7
Supply Chain
97
Quality
79.9
Maintenance
100
Vulnerability
97
License
Contributors
Unable to fetch Contributors
Languages
TypeScript (58.55%)
JavaScript (41.45%)
Developer
kubemq-io
Download Statistics
Total Downloads
56,689
Last Day
41
Last Week
276
Last Month
1,393
Last Year
19,622
GitHub Statistics
10 Stars
99 Commits
7 Forks
3 Watching
2 Branches
7 Contributors
Bundle Size
567.48 kB
Minified
133.68 kB
Minified + Gzipped
Package Meta Information
Latest Version
2.1.0
Package Id
kubemq-js@2.1.0
Unpacked Size
726.14 kB
Size
78.20 kB
File Count
71
NPM Version
10.8.2
Node Version
20.17.0
Publised On
18 Dec 2024
Total Downloads
Cumulative downloads
Total Downloads
56,689
Last day
-46.8%
41
Compared to previous day
Last week
-34%
276
Compared to previous week
Last month
38.2%
1,393
Compared to previous month
Last year
-11.8%
19,622
Compared to previous year
Daily Downloads
Weekly Downloads
Monthly Downloads
Yearly Downloads
The KubeMQ SDK for NodeJS enables Node JS/Typescript developers to seamlessly communicate with the KubeMQ server, implementing various communication patterns such as Events, EventStore, Commands, Queries, and Queues.
Prerequisites
-
Node.js (Ensure you have a recent version of Node.js installed)
-
TypeScript Compiler
-
KubeMQ server running locally or accessible over the network
Installation
The recommended way to use the SDK for Node in your project is to consume it from Node package manager.
npm install kubemq-js
Payload Details
- Metadata: The metadata allows us to pass additional information with the event. Can be in any form that can be presented as a string, i.e., struct, JSON, XML and many more.
- Body: The actual content of the event. Can be in any form that is serializable into a byte array, i.e., string, struct, JSON, XML, Collection, binary file and many more.
- ClientID: Displayed in logs, tracing, and KubeMQ dashboard(When using Events Store, it must be unique).
- Tags: Set of Key-value pair that help categorize the message
KubeMQ PubSub Client
For executing PubSub operation we have to create the instance of PubsubClient, its instance can be created with minimum two parameter address
(KubeMQ server address) & clientId
. With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.
PubSub Client Configuration
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
address | String | The address of the KubeMQ server. | None | Yes |
clientId | String | The client ID used for authentication. | None | Yes |
authToken | String | The authorization token for secure communication. | None | No |
tls | boolean | Indicates if TLS (Transport Layer Security) is enabled. | None | No |
tlsCertFile | String | The path to the TLS certificate file. | None | No (Yes if tls is true) |
tlsKeyFile | String | The path to the TLS key file. | None | No (Yes if tls is true) |
tlsCaCertFile | String | The path to the TLS CA cert file. | None | No (Yes if tls is true) |
maxReceiveSize | int | The maximum size of the messages to receive (in bytes). | 104857600 (100MB) | No |
reconnectIntervalSeconds | int | The interval in seconds between reconnection attempts. | 1 | No |
Pubsub Client connection establishment example code
1 2const opts: Config = { 3 address: 'localhost:50000', 4 clientId: Utils.uuid(), 5 reconnectIntervalSeconds: 1, 6}; 7 8const pubsubClient = new PubsubClient(opts); 9
The example below demonstrates to construct PubSubClient with ssl and other configurations:
1 2const config: Config = { 3 4 address: 'localhost:50000', // KubeMQ gRPC endpoint address 5 clientId: 'your-client-id', // Connection clientId 6 authToken: 'your-jwt-auth-token', // Optional JWT authorization token 7 tls: true, // Indicates if TLS is enabled 8 tlsCertFile: 'path/to/tls-cert.pem', // Path to the TLS certificate file 9 tlsKeyFile: 'path/to/tls-key.pem', // Path to the TLS key file 10 tlsCaCertFile: 'path/to/tls-key.pem', // Path to the TLS key file 11 maxReceiveSize: 1024 * 1024 * 100, // Maximum size of the messages to receive (100MB) 12 reconnectIntervalSeconds: 1 // Interval in milliseconds between reconnect attempts (1 second) 13}; 14
Ping To KubeMQ server
You can ping the server to check connection is established or not.
Request: NONE
Response: ServerInfo
Interface Attributes
Name | Type | Description |
---|---|---|
host | String | The host of the server. |
version | String | The version of the server. |
serverStartTime | long | The start time of the server (in seconds). |
serverUpTimeSeconds | long | The uptime of the server (in seconds). |
1 2ServerInfo pingResult = pubsubClient.ping(); 3console.log('Ping Response: ' + pingResult); 4
Create Channel
PubSub CreateEventsChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | Channel name which you want to create | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function createEventsChannel(channel: string) { 3return pubsubClient.createEventsChannel(channel); 4} 5
PubSub Create Events Store Channel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | Channel name to which you want to create | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function createEventsStoreChannel(channel: string) { 3return pubsubClient.createEventsStoreChannel(channel); 4} 5
Delete Channel
PubSub DeleteEventsChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | Channel name which you want to delete | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function deleteEventsChannel(channel: string) { 3return pubsubClient.deleteEventsChannel(channel); 4} 5
PubSub Delete Events Store Channel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | Channel name to which you want to delete | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function deleteEventsStoreChannel(channel: string) { 3return pubsubClient.deleteEventsStoreChannel(channel); 4} 5
List Channels
PubSub ListEventsChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
search | String | Search query to filter channels (optional) | None | No |
Response: PubSubChannel[]
PubSubChannel
interface Attributes
Name | Type | Description |
---|---|---|
name | String | The name of the Pub/Sub channel. |
type | String | The type of the Pub/Sub channel. |
lastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
isActive | boolean | Indicates whether the channel is active or not. |
incoming | PubSubStats | The statistics related to incoming messages for this channel. |
outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
1 2async function listEventsChannel(search: string) { 3 const channels = await pubsubClient.listEventsChannels(search); 4 console.log(channels); 5} 6
PubSub ListEventsStoreChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
search | String | Search query to filter channels (optional) | None | No |
Response: PubSubChannel[]
PubSubChannel
interface Attributes
Name | Type | Description |
---|---|---|
name | String | The name of the Pub/Sub channel. |
type | String | The type of the Pub/Sub channel. |
lastActivity | long | The timestamp of the last activity on the channel, represented in milliseconds since epoch. |
isActive | boolean | Indicates whether the channel is active or not. |
incoming | PubSubStats | The statistics related to incoming messages for this channel. |
outgoing | PubSubStats | The statistics related to outgoing messages for this channel. |
1 2async function listEventsStoreChannel(search: string) { 3 const channels = await pubsubClient.listEventsStoreChannels(search); 4 console.log(channels); 5} 6
PubSub Send & Receive
PubSub SendEventMessage Example:
Request: EventMessage
Interface Attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
id | String | Unique identifier for the event message. | None | No |
channel | String | The channel to which the event message is sent. | None | Yes |
metadata | String | Metadata associated with the event message. | None | No |
body | byte[] | Body of the event message in bytes. | Empty byte array | No |
tags | Map<String, String> | Tags associated with the event message as key-value pairs. | Empty Map | No |
Note:- metadata
or body
or tags
any one is required
Response: NONE
1 2await pubsubClient.sendEventsMessage({ 3 id: `234`, 4 channel: 'events.single', 5 body: Utils.stringToBytes('event message'), 6}); 7
PubSub SendEventStoreMessage Example:
Request: EventStoreMessage
Class Attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
id | String | Unique identifier for the event message. | None | No |
channel | String | The channel to which the event message is sent. | None | Yes |
metadata | String | Metadata associated with the event message. | None | No |
body | byte[] | Body of the event message in bytes. | Empty byte array | No |
tags | Map<String, String> | Tags associated with the event message as key-value pairs. | Empty Map | No |
Note:- metadata
or body
or tags
any one is required
Response: NONE
1 2await pubsubClient.sendEventStoreMessage({ 3 id: '987', 4 channel: 'events_store.single', 5 body: Utils.stringToBytes('event store message'), 6}); 7
PubSub SubscribeEvents Example:
Request: EventsSubscription
Class Attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channel | String | The channel to subscribe to. | None | Yes |
group | String | The group to subscribe with. | None | No |
onReceiveEventCallback | Consumer | Callback function to be called when an event message is received. | None | Yes |
onErrorCallback | Consumer | Callback function to be called when an error occurs. | None | No |
Response: NONE
Callback: EventMessageReceived
class details
Name | Type | Description |
---|---|---|
id | String | The unique identifier of the message. |
fromClientId | String | The ID of the client that sent the message. |
timestamp | long | The timestamp when the message was received, in seconds. |
channel | String | The channel to which the message belongs. |
metadata | String | The metadata associated with the message. |
body | byte[] | The body of the message. |
sequence | long | The sequence number of the message. |
tags | Map<String, String> | The tags associated with the message. |
1async function subscribeToEvent() { 2 //Subscribes to events from the specified channel and processes received events. 3 const eventsSubscriptionRequest = new EventsSubscriptionRequest('events.A', ''); 4 5 // Define the callback for receiving events 6 eventsSubscriptionRequest.onReceiveEventCallback = (event: EventMessageReceived) => { 7 console.log('SubscriberA received event:', { 8 id: event.id, 9 fromClientId: event.fromClientId, 10 timestamp: event.timestamp, 11 channel: event.channel, 12 metadata: event.metadata, 13 body: event.body, 14 tags: event.tags, 15 }); 16 }; 17 18 // Define the callback for handling errors 19 eventsSubscriptionRequest.onErrorCallback = (error: string) => { 20 console.error('SubscriberA error:', error); 21 }; 22 23 pubsubClient 24 .subscribeToEvents(eventsSubscriptionRequest) 25 .then(() => { 26 console.log('Subscription successful'); 27 }) 28 .catch((reason: any) => { 29 console.error('Subscription failed:', reason); 30 }); 31 32}
PubSub SubscribeEventsStore Example:
Request: EventsStoreSubscription
Interface Attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channel | String | The channel to subscribe to. | None | Yes |
group | String | The group to subscribe with. | None | No |
onReceiveEventCallback | Consumer | Callback function to be called when an event message is received. | None | Yes |
onErrorCallback | Consumer | Callback function to be called when an error occurs. | None | No |
Response: None
Callback: EventStoreMessageReceived
class details
Name | Type | Description |
---|---|---|
id | String | The unique identifier of the message. |
fromClientId | String | The ID of the client that sent the message. |
timestamp | long | The timestamp when the message was received, in seconds. |
channel | String | The channel to which the message belongs. |
metadata | String | The metadata associated with the message. |
body | byte[] | The body of the message. |
sequence | long | The sequence number of the message. |
tags | Map<String, String> | The tags associated with the message. |
1async function subscribeToEventStore() { 2 //Subscribes to events store messages from the specified channel with a specific configuration. 3 const eventsSubscriptionRequest = new EventsStoreSubscriptionRequest('events_store.A', ''); 4 eventsSubscriptionRequest.eventsStoreType = EventStoreType.StartAtSequence; 5 eventsSubscriptionRequest.eventsStoreSequenceValue=1; 6 7 // Define the callback for receiving events 8 eventsSubscriptionRequest.onReceiveEventCallback = (event: EventStoreMessageReceived) => { 9 console.log('SubscriberA received event:', { 10 id: event.id, 11 fromClientId: event.fromClientId, 12 timestamp: event.timestamp, 13 channel: event.channel, 14 metadata: event.metadata, 15 body: event.body, 16 tags: event.tags, 17 sequence: event.sequence, 18 }); 19 }; 20 21 // Define the callback for handling errors 22 eventsSubscriptionRequest.onErrorCallback = (error: string) => { 23 console.error('SubscriberA error:', error); 24 }; 25 26 pubsubClient 27 .subscribeToEvents(eventsSubscriptionRequest) 28 .then(() => { 29 console.log('Events Subscription successful'); 30 }) 31 .catch((reason: any) => { 32 console.error('Event Subscription failed:', reason); 33 }); 34}
KubeMQ Queues Operations
The examples below demonstrate the usage of KubeMQ Queues client. The examples include creating, deleting, listing channels, and sending/receiving queues messages.
Construct the Queues Client
For executing Queues operation we have to create the instance of QueuesClient, its instance can be created with minimum two parameter address
(KubeMQ server address) & clientId
. With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.
QueuesClient Configuration
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
address | String | The address of the KubeMQ server. | None | Yes |
clientId | String | The client ID used for authentication. | None | Yes |
authToken | String | The authorization token for secure communication. | None | No |
tls | boolean | Indicates if TLS (Transport Layer Security) is enabled. | None | No |
tlsCertFile | String | The path to the TLS certificate file. | None | No (Yes if tls is true) |
tlsKeyFile | String | The path to the TLS key file. | None | No (Yes if tls is true) |
tlsCaCertFile | String | The path to the TLS CA cert file. | None | No (Yes if tls is true) |
maxReceiveSize | int | The maximum size of the messages to receive (in bytes). | 104857600 (100MB) | No |
reconnectIntervalSeconds | int | The interval in seconds between reconnection attempts. | 1 | No |
Queues Client establishing a connection example code
1 2const opts: Config = { 3 address: 'localhost:50000', 4 clientId: Utils.uuid(), 5}; 6 7const queuesClient = new QueuesClient(opts); 8
The example below demonstrates to construct PubSubClient with ssl and other configurations:
1const opts: Config = { 2 address: 'localhost:50000', // KubeMQ gRPC endpoint address 3 clientId: 'your-client-id', // Connection clientId 4 authToken: 'your-jwt-auth-token', // Optional JWT authorization token 5 tls: true, // Indicates if TLS is enabled 6 tlsCertFile: 'path/to/tls-cert.pem', // Path to the TLS certificate file 7 tlsKeyFile: 'path/to/tls-key.pem', // Path to the TLS key file 8 tlsCaCertFile: 'path/to/tls-ca-cert.pem', // Path to the TLS CA cert file 9 maxReceiveSize: 1024 * 1024 * 100, // The Maximum size of the messages to receive (100MB) 10 reconnectIntervalSeconds: 1 // Interval in milliseconds between reconnect attempts (1 second) 11}; 12 13const queuesClient = new QueuesClient(opts);
Ping To KubeMQ server
You can ping the server to check connection is established or not.
Request: NONE
Response: ServerInfo
Class Attributes
Name | Type | Description |
---|---|---|
host | String | The host of the server. |
version | String | The version of the server. |
serverStartTime | long | The start time of the server (in seconds). |
serverUpTimeSeconds | long | The uptime of the server (in seconds). |
1 2const pingResult = queuesClient.ping(); 3console.log('Ping Response: ' + pingResult); 4
Create Channel
Queues CreateQueueChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | The name of the channel you want to create | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function createQueueChannel(channel: string) { 3 return queuesClient.createQueuesChannel(channel); 4} 5
Delete Channel
Queues DeleteQueueChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | The name of the channel you want to delete | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function createQueueChannel(channel: string) { 3 return queuesClient.deleteQueuesChannel(channel); 4} 5
List Channels
Queues listQueueChannels Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
searchString | String | The channel name you want to search for | None | No |
Response: QueuesChannel[]
QueuesChannel interface Attributes
Name | Type | Description |
---|---|---|
name | String | The name of the queue channel. |
type | String | The type of the queue channel. |
lastActivity | long | The timestamp of the last activity in the queue channel. |
isActive | boolean | Indicates whether the queue channel is currently active. |
incoming | QueuesStats | The statistics for incoming messages in the queue channel. |
outgoing | QueuesStats | The statistics for outgoing messages in the queue channel. |
1 2async function listQueueChannels(search: string) { 3 const channels = await queuesClient.listQueuesChannel(search); 4 console.log(channels); 5} 6
Send & Receive Queue Messages
Queues SendSingleMessage Example:
Request: QueueMessage
class attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
id | String | The unique identifier for the message. | None | No |
channel | String | The channel of the message. | None | Yes |
metadata | String | The metadata associated with the message. | None | No |
body | byte[] | The body of the message. | new byte[0] | No |
tags | Map<String, String> | The tags associated with the message. | new HashMap<>() | No |
delayInSeconds | int | The delay in seconds before the message becomes available in the queue. | None | No |
expirationInSeconds | int | The expiration time in seconds for the message. | None | No |
attemptsBeforeDeadLetterQueue | int | The number of receive attempts allowed before the message is moved to the dead letter queue. | None | No |
deadLetterQueue | String | The dead letter queue where the message will be moved after reaching max receive attempts. | None | No |
Response: QueueSendResult
class attributes
Name | Type | Description |
---|---|---|
id | String | The unique identifier of the message. |
sentAt | LocalDateTime | The timestamp when the message was sent. |
expiredAt | LocalDateTime | The timestamp when the message will expire. |
delayedTo | LocalDateTime | The timestamp when the message will be delivered. |
isError | boolean | Indicates if there was an error while sending the message. |
error | String | The error message if isError is true. |
1 2await queuesClient.sendQueuesMessage({ 3 channel: 'queues.single', 4 body: Utils.stringToBytes('queue message'), 5}) 6.then((result) => console.log(result)) 7.catch((reason) => console.error(reason));
Queues Pulls messages from a queue. Example:
Request: QueuesPullWaitingMessagesRequest
class attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channel | String | The channel to poll messages from. | None | Yes |
maxNumberOfMessages | int | The maximum number of messages to poll. | 1 | No |
waitTimeoutSeconds | int | The wait timeout in seconds for polling messages. | 60 | No |
Response: QueuesPullWaitingMessagesResponse
class attributes
Name | Type | Description |
---|---|---|
id | String | The reference ID of the request. |
messagesReceived | number | Number of valid messages received. |
messages | QueueMessage[] | The list of received queue messages. |
error | String | The error message, if any error occurred. |
isError | boolean | Indicates if there was an error. |
isPeek | boolean | Indicates if it is a peek or pull operation. |
messagesExpired | number | Number of expired messages from the queue. |
1 2await queuesClient 3.pull({ 4 channel: 'queues.peek', 5 maxNumberOfMessages: 10, 6 waitTimeoutSeconds: 10, 7}) 8 9.then((response) => { 10 response.messages.forEach((msg) => { 11 console.log(msg); 12}); 13}) 14 15.catch((reason) => { 16 console.error(reason); 17}); 18
Queues Get waiting messages from a queue Example:
Request: QueuesPullWaitngMessagesRequest
class attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channel | String | The channel to poll messages from. | None | Yes |
maxNumberOfMessages | int | The maximum number of messages to poll. | 1 | No |
waitTimeoutSeconds | int | The wait timeout in seconds for polling messages. | 60 | No |
Response: QueuesPullWaitingMessagesResponse
class attributes
Name | Type | Description |
---|---|---|
id | String | The reference ID of the request. |
messagesReceived | number | Number of valid messages received. |
messages | QueueMessage[] | The list of received queue messages. |
error | String | The error message, if any error occurred. |
isError | boolean | Indicates if there was an error. |
isPeek | boolean | Indicates if the operation is a peek or pull. |
messagesExpired | number | Number of expired messages from the queue. |
1 2await queuesClient 3.waiting({ 4 channel: 'queues.peek', 5 maxNumberOfMessages: 5, 6 waitTimeoutSeconds: 20, 7}) 8 9.then((response) => { 10 response.messages.forEach((msg) => { 11 console.log(msg); 12}); 13}) 14.catch((reason) => { 15 console.error(reason); 16}); 17
Poll Queue Messages
Receives messages from a Queue channel.
Request: QueuesPollRequest
Class Attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channel | String | The channel to poll messages from. | None | Yes |
pollMaxMessages | int | The maximum number of messages to poll. | 1 | No |
pollWaitTimeoutInSeconds | int | The wait timeout in seconds for polling messages. | 60 | No |
autoAckMessages | boolean | Indicates if messages should be auto-acknowledged. | false | No |
visibilitySeconds | int | Add a visibility timeout feature for messages. | 0 | No |
Response: QueuesMessagesPulledResponse
Class Attributes
Name | Type | Description |
---|---|---|
id | String | The reference ID of the request. |
messages | QueueMessageReceived[] | The list of received queue messages. |
messagesReceived | number | Number of valid messages received. |
messagesExpired | number | Number of messages expired. |
isPeek | boolean | Indicates if the operation is a peek or pull. |
error | String | The error message, if any error occurred. |
isError | boolean | Indicates if there was an error. |
visibilitySeconds | int | The visibility timeout for the message in seconds. |
isAutoAcked | boolean | Indicates whether the message was auto-acknowledged. |
Response: QueueMessageReceived
class attributes
Here's the requested Markdown table for the QueueMessageReceived
class:
Name | Type | Description |
---|---|---|
id | String | The unique identifier for the message. |
channel | String | The channel from which the message was received. |
metadata | String | Metadata associated with the message. |
body | byte[] | The body of the message in byte array format. |
fromClientId | String | The ID of the client that sent the message. |
tags | Map<String, String> | Key-value pairs representing tags for the message. |
timestamp | Instant | The timestamp when the message was created. |
sequence | long | The sequence number of the message. |
receiveCount | int | The number of times the message has been received. |
isReRouted | boolean | Indicates whether the message was rerouted. |
reRouteFromQueue | String | The name of the queue from which the message was rerouted. |
expiredAt | Instant | The expiration time of the message, if applicable. |
delayedTo | Instant | The time the message is delayed until, if applicable. |
transactionId | String | The transaction ID associated with the message. |
isTransactionCompleted | boolean | Indicates whether the transaction for the message is completed. |
responseHandler | StreamObserver<QueuesDownstreamRequest> | The response handler for processing downstream requests. |
receiverClientId | String | The ID of the client receiving the message. |
visibilitySeconds | int | The visibility timeout for the message in seconds. |
isAutoAcked | boolean | Indicates whether the message was auto-acknowledged. |
Example
1async function main() { 2 const opts: Config = { 3 address: 'localhost:50000', 4 clientId: 'kubeMQClientId-ts', 5 }; 6 const queuesClient = new QueuesClient(opts); 7 8 // Receive with message visibility 9 async function receiveWithVisibility(visibilitySeconds: number) { 10 console.log("\n============================== Receive with Visibility =============================\n"); 11 try { 12 const pollRequest = new QueuesPollRequest({ 13 channel: 'visibility_channel', 14 pollMaxMessages: 1, 15 pollWaitTimeoutInSeconds: 10, 16 visibilitySeconds: visibilitySeconds, 17 autoAckMessages: false, 18 }); 19 20 const pollResponse = await queuesClient.receiveQueuesMessages(pollRequest); 21 console.log("Received Message Response:", pollResponse); 22 23 if (pollResponse.isError) { 24 console.log("Error: " + pollResponse.error); 25 } else { 26 pollResponse.messages.forEach(async (msg) => { 27 console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`); 28 try { 29 await new Promise(resolve => setTimeout(resolve, 1000)); 30 await msg.ack(); 31 console.log("Acknowledged message"); 32 } catch (err) { 33 console.error("Error acknowledging message:", err); 34 } 35 }); 36 } 37 } catch (error) { 38 console.error('Failed to receive queue messages:', error); 39 } 40 } 41 42 // Test visibility expiration 43 async function receiveWithVisibilityExpired() { 44 console.log("\n============================== Receive with Visibility Expired =============================\n"); 45 await receiveWithVisibility(2); 46 } 47 48 // Test visibility extension 49 async function receiveWithVisibilityExtension() { 50 console.log("\n============================== Receive with Visibility Extension =============================\n"); 51 try { 52 const pollRequest = new QueuesPollRequest({ 53 channel: 'visibility_channel', 54 pollMaxMessages: 1, 55 pollWaitTimeoutInSeconds: 10, 56 visibilitySeconds: 3, 57 autoAckMessages: false, 58 }); 59 60 const pollResponse = await queuesClient.receiveQueuesMessages(pollRequest); 61 console.log("Received Message Response:", pollResponse); 62 63 if (pollResponse.isError) { 64 console.log("Error: " + pollResponse.error); 65 } else { 66 pollResponse.messages.forEach(async (msg) => { 67 console.log(`Message ID: ${msg.id}, Message Body: ${Utils.bytesToString(msg.body)}`); 68 try { 69 await new Promise(resolve => setTimeout(resolve, 1000)); 70 await msg.extendVisibilityTimer(3); 71 await new Promise(resolve => setTimeout(resolve, 2000)); 72 await msg.ack(); 73 console.log("Acknowledged message after extending visibility"); 74 } catch (err) { 75 console.error("Error during visibility extension:", err); 76 } 77 }); 78 } 79 } catch (error) { 80 console.error('Failed to receive queue messages:', error); 81 } 82 } 83 84 await receiveWithVisibilityExpired(); 85 await receiveWithVisibilityExtension(); 86} 87 88main();
This method allows you to receive messages from a specified Queue channel. You can configure the polling behavior, including the maximum number of messages to receive and the wait timeout. The response provides detailed information about the received messages and the transaction.
Message Handling Options:
- Acknowledge (ack): Mark the message as processed and remove it from the queue.
- Reject: Reject the message. It won't be requeued.
- Requeue: Send the message back to the queue for later processing.
Choose the appropriate handling option based on your application's logic and requirements.
KubeMQ Command & Query Operations
Construct the CQClient
For executing command & query operation we have to create the instance of CQClient, its instance can be created with minimum two parameter address
(KubeMQ server address) & clientId
. With these two parameter plainText connections are established. The table below describes the Parameters available for establishing a connection.
CQClient Configuration
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
address | String | The address of the KubeMQ server. | None | Yes |
clientId | String | The client ID used for authentication. | None | Yes |
authToken | String | The authorization token for secure communication. | None | No |
tls | boolean | Indicates if TLS (Transport Layer Security) is enabled. | None | No |
tlsCertFile | String | The path to the TLS certificate file. | None | No (Yes if tls is true) |
tlsKeyFile | String | The path to the TLS key file. | None | No (Yes if tls is true) |
tlsCaCertFile | String | The path to the TLS CA cert file. | None | No (Yes if tls is true) |
maxReceiveSize | int | The maximum size of the messages to receive (in bytes). | 104857600 (100MB) | No |
reconnectIntervalSeconds | int | The interval in seconds between reconnection attempts. | 1 | No |
CQClient establishing a connection example code
1 2const opts: Config = { 3 4 address: 'localhost:50000', 5 clientId: Utils.uuid(), 6 reconnectIntervalSeconds: 1, 7}; 8 9const cqClient = new CQClient(opts); 10
The example below demonstrates to construct CQClient with ssl and other configurations:
1 2const config: Config = { 3 4 address: 'localhost:50000', // KubeMQ gRPC endpoint address 5 clientId: 'your-client-id', // Connection clientId 6 authToken: 'your-jwt-auth-token', // Optional JWT authorization token 7 tls: true, // Indicates if TLS is enabled 8 tlsCertFile: 'path/to/tls-cert.pem', // Path to the TLS certificate file 9 tlsKeyFile: 'path/to/tls-key.pem', // Path to the TLS key file 10 tlsCaCertFile: 'path/to/tls-ca-cert.pem', // Path to the TLS CA cert file 11 maxReceiveSize: 1024 * 1024 * 100, // Maximum size of the messages to receive (100MB) 12 reconnectIntervalSeconds: 1, // Interval in milliseconds between reconnect attempts (1 second) 13}; 14const cqClient = new CQClient(opts); 15
Ping To KubeMQ server
You can ping the server to check connection is established or not.
Request: NONE
Response: ServerInfo
interface Attributes
Name | Type | Description |
---|---|---|
host | String | The host of the server. |
version | String | The version of the server. |
serverStartTime | long | The start time of the server (in seconds). |
serverUpTimeSeconds | long | The uptime of the server (in seconds). |
1 2const pingResult = cqClient.ping(); 3console.log('Ping Response: ' + pingResult); 4
Create Channel
Command CreateCommandsChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | Channel name which you want to create | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function createCommandsChannel(channel: string) { 3 return cqClient.createCommandsChannel(channel); 4}
Queries CreateQueriesChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | The name of the channel to create. | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function createQueriesChannel(channel: string) { 3 return cqClient.createQueriesChannel(channel); 4} 5
Delete Channel
Command DeleteCommandsChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | Channel name which you want to delete | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function deleteCommandsChannel(channel: string) { 3 return cqClient.deleteCommandsChannel(channel); 4}
Queries DeleteQueriesChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channelName | String | The name of the channel to delete | None | Yes |
Response:
Name | Type | Description |
---|---|---|
void | Promise | Doesn't return a value upon completion |
1 2async function deleteQueriesChannel(channel: string) { 3 return cqClient.deleteQueriesChannel(channel); 4} 5
List Channels
Command ListCommandsChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
searchString | String | The name of the channel to search for. | None | No |
Response: CQChannel[]
CQChannel
interface attributes
Name | Type | Description |
---|---|---|
name | String | The name of the channel. |
type | String | The type of the channel. |
lastActivity | long | The timestamp of the last activity on the channel. |
isActive | boolean | Indicates whether the channel is currently active. |
incoming | CQStats | Statistics about incoming messages to the channel. |
outgoing | CQStats | Statistics about outgoing messages from the channel. |
1 2async function listCommandsChannels(search: string) { 3 const channels = await cqClient.listCommandsChannels(search); 4 console.log(channels); 5} 6
Queries ListQueriesChannel Example:
Request:
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
searchString | String | Channel name which you want to search | None | No |
Response: List<CQChannel>
CQChannel
class attributes
Name | Type | Description |
---|---|---|
name | String | The name of the channel. |
type | String | The type of the channel. |
lastActivity | long | The timestamp of the last activity on the channel. |
isActive | boolean | Indicates whether the channel is currently active. |
incoming | CQStats | Statistics about incoming messages to the channel. |
outgoing | CQStats | Statistics about outgoing messages from the channel. |
1 2async function listQueriesChannels(search: string) { 3 const channels = await cqClient.listQueriesChannels(search); 4 console.log(channels); 5} 6
Send & Receive Command & Query Messages
Command SubscribeToCommandsChannel Example:
Request: CommandsSubscription
Class Attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channel | String | The channel for the subscription. | None | Yes |
group | String | The group associated with the subscription. | None | No |
onReceiveCommandCallback | CommandsReceiveMessage | Callback function for receiving commands. | None | Yes |
Response: None
Callback: CommandsReceiveMessage
interface attributes
Name | Type | Description |
---|---|---|
commandReceived | CommandsReceiveMessage | The command message that was received. |
clientId | String | The ID of the client that sent the command. |
requestId | String | The ID of the request. |
isExecuted | boolean | Indicates whether the command was executed. |
timestamp | LocalDateTime | The timestamp of the response. |
error | String | The error message if an error occurred. |
1async function subscribeToCommands(channelName: string) { 2 //Subscribes to commands from the specified channel with a specific configuration. 3 const commandSubscriptionRequest = new CommandsSubscriptionRequest(channelName, 'group1'); 4 5 // Define the callback for receiving commandMessage 6 commandSubscriptionRequest.onReceiveEventCallback = (commandMessage: CommandMessageReceived) => { 7 console.log('SubscriberA received commandMessage:', { 8 id: commandMessage.id, 9 fromClientId: commandMessage.fromClientId, 10 timestamp: commandMessage.timestamp, 11 channel: commandMessage.channel, 12 metadata: commandMessage.metadata, 13 body: commandMessage.body, 14 tags: commandMessage.tags, 15 }); 16 }; 17 18 // Define the callback for handling errors 19 commandSubscriptionRequest.onErrorCallback = (error: string) => { 20 console.error('SubscriberA error:', error); 21 }; 22 23 cqClient.subscribeToCommands(commandSubscriptionRequest) 24 .then(() => { 25 console.log('Command Subscription successful'); 26 }) 27 .catch((reason: any) => { 28 console.error('Command Subscription failed:', reason); 29 }); 30}
Queries SubscribeToQueriesChannel Example:
Request: QueriesSubscriptionRequest
Class Attributes
Name | Type | Description | Default Value | Mandatory |
---|---|---|---|---|
channel | String | The channel for the subscription. | None | Yes |
group | String | The group associated with the subscription. | None | No |
onReceiveQueriesCallback | QueriesReceiveMessage | Callback function for receiving queries. | None | Yes |
Response: None
Callback: QueriesReceiveMessage
interface attributes
Name | Type | Description |
---|---|---|
id | String | The ID of the request. |
channel | String | Channel name from which the message was received. |
metadata | String | Metadata of the message. |
body | Uint8Array | The body of the response. |
tags | Map<String, String> | Tags associated with the query message. |
replyChannel | String | The reply channel for this message. |
1async function subscribeToQueries(channelName: string) { 2 3 //Subscribes to queries from the specified channel with a specific configuration. 4 const commandSubscriptionRequest = new CommandsSubscriptionRequest(channelName, 'group1'); 5 6 // Define the callback for receiving queriesMessage 7 commandSubscriptionRequest.onReceiveEventCallback = (commandMessage: CommandMessageReceived) => { 8 console.log('SubscriberA received event:', { 9 id: commandMessage.id, 10 fromClientId: commandMessage.fromClientId, 11 timestamp: commandMessage.timestamp, 12 channel: commandMessage.channel, 13 metadata: commandMessage.metadata, 14 body: commandMessage.body, 15 tags: commandMessage.tags, 16 }); 17 }; 18 19 // Define the callback for handling errors 20 commandSubscriptionRequest.onErrorCallback = (error: string) => { 21 console.error('SubscriberA error:', error); 22 }; 23 24 cqClient.subscribeToQueries(commandSubscriptionRequest) 25 .then(() => { 26 console.log('Queries Subscription successful'); 27 }) 28 .catch((reason: any) => { 29 console.error('Queries Subscription failed:', reason); 30 }); 31}
No vulnerabilities found.
Reason
no binaries found in the repo
Reason
0 existing vulnerabilities detected
Reason
license file detected
Details
- Info: project has a license file: LICENSE:0
- Info: FSF or OSI recognized license: Apache License 2.0: LICENSE:0
Reason
Found 0/19 approved changesets -- score normalized to 0
Reason
1 commit(s) and 0 issue activity found in the last 90 days -- score normalized to 0
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
security policy file not detected
Details
- Warn: no security policy file detected
- Warn: no security file to analyze
- Warn: no security file to analyze
- Warn: no security file to analyze
Reason
project is not fuzzed
Details
- Warn: no fuzzer integrations found
Reason
branch protection not enabled on development/release branches
Details
- Warn: branch protection not enabled for branch 'main'
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
- Warn: 0 commits out of 14 are checked with a SAST tool
Score
3
/10
Last Scanned on 2025-01-27
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More