Gathering detailed insights and metrics for ts-nats
Gathering detailed insights and metrics for ts-nats
Gathering detailed insights and metrics for ts-nats
Gathering detailed insights and metrics for ts-nats
ts-nkeys
A public-key signature system based on Ed25519 for the NATS ecosystem in typescript for ts-nats and node-nats
@asyncapi/ts-nats-template
NATS TypeScript/Node.js template for the AsyncAPI generator.
ts-natsutil
Opinionated NATS and NATS streaming utility to simplify NATS application development
sn-nats-ts-client
Skills Network NATS TypeScript client
npm install ts-nats
Typescript
Module System
Min. Node Version
Node Version
NPM Version
78.8
Supply Chain
99.2
Quality
76
Maintenance
100
Vulnerability
100
License
TypeScript (99.75%)
JavaScript (0.25%)
Total Downloads
1,539,347
Last Day
127
Last Week
1,402
Last Month
11,434
Last Year
156,000
178 Stars
533 Commits
13 Forks
7 Watching
3 Branches
15 Contributors
Minified
Minified + Gzipped
Latest Version
1.2.15
Package Id
ts-nats@1.2.15
Size
53.27 kB
NPM Version
6.14.5
Node Version
10.20.1
Publised On
23 May 2020
Cumulative downloads
Total Downloads
Last day
-67.8%
127
Compared to previous day
Last week
-42.2%
1,402
Compared to previous week
Last month
-17.3%
11,434
Compared to previous month
Last year
-12.8%
156,000
Compared to previous year
7
A TypeScript Node.js client for the NATS messaging system.
ts-nats is a typescript nats library for node that supports Promises and async/await patterns. Full documentation.
1npm install ts-nats 2 3# to install current dev version 4npm install ts-nats@next
The starting point is the connect()
function. You can give no arguments, a port, an URL or a
NatsConnectionOption
specifying detailed behaviour. Inside an async function, you can use async/await to wait for the
promise to resolve or reject.
1import {connect, NatsConnectionOptions, Payload} from 'ts-nats'; 2 3// ... 4try { 5 let nc = await connect({servers: ['nats://demo.nats.io:4222', 'tls://demo.nats.io:4443']}); 6 // Do something with the connection 7} catch(ex) { 8 // handle the error 9} 10// ...
Since connect()
returns a Promise, the promise patterns are supported. With no arguments, it will attempt to connect to nats://localhost:4222
1connect() 2 .then((nc) => { 3 // Do something with the connection 4 }) 5 .catch((ex) => { 6 // handle the error 7 }); 8
Once you have a connection, you can publish data:
1nc.publish('greeting', 'hello world!');
Subscribing allows you to receive messages published on a subject:
1// simple subscription - subscribe returns a promise to a subscription object 2let sub = await nc.subscribe('greeting', (err, msg) => { 3 if(err) { 4 // do something 5 } else { 6 // do something with msg.data 7 } 8});
Subscription can respond to the publisher if the publisher sets a reply subject:
1let service = await nc.subscribe('greeter', (err, msg) => { 2 if(err) { 3 // handle the error 4 } else if (msg.reply) { 5 nc.publish(msg.reply, `hello there ${msg.data}`); 6 } 7}); 8 9// create an unique subject just to receive a response message 10let inbox = nc.createInbox(); 11let sub2 = await nc.subscribe(inbox, (err, msg) => { 12 if(err) { 13 // handle the error 14 } else { 15 console.log(msg.data); 16 } 17}, {max: 1}); 18 19// publish a 'request' message 20nc.publish('greeter', "NATS", inbox) 21 22// stop getting messages on the service subscription 23service.unsubscribe(); 24 25// didn't have to specify unsubscribe for sub2, the `{max: 1}` auto unsubscribes 26// when the first message is received.
The above pattern is called Request/Reply - because it is so pervasive, ts-nats makes it very easy to make a request and handle a single response. The request API returns a Promise to a response message. All the bookkeeping, creating an inbox subject, creating a subscription to manage the response, and publishing details are handled. Since requests are expected to have a response, they require a timeout. If the request does not receive a response within the specified timeout, the promise rejects.
Subscriptions to handle request responses are very efficient since they utilize a shared subscription on the server. The client multiplexes the response without having to create and destroy subscriptions server-side.
1let msg = await nc.request('greeter', 1000, 'me');
When done using a connection, closing it releases all resources, and cancels all subscriptions.
1nc.close();
A single subscription can process related messages. When the subject
specifies wildcards tokens, those parts of the subject can match different things.
One of the wildcards is the *
(asterisk). The asterisk in foo.*.baz
matches all values in that token's position (foo.bar.baz
, foo.a.baz
, ...).
To work as a wildcard, the wildcard character must be the only character in the token.
Asterisks that are part of a token value are interpreted as string literals,
foo.a*.bar
and will only match the literal value of foo.a*.bar
.
1let sub1 = await nc.subscribe('foo.*.baz', (err, msg) => { 2 console.log('message received on', msg.subject, ":", msg.data); 3});
Another wildcard is the >
(greater than symbol). >
tokens can only appear
as the last token in a subject. foo.>
will match foo.bar
, foo.bar.baz
,
foo.foo.bar.bax.22
. When part of a token it is interpreted as a string
literal foo.bar.a>
will only match foo.bar.a>
. Subscribing to >
will
match all subjects.
1let sub2 = await nc.subscribe('foo.baz.>', (err, msg) => { 2 console.log('message received on', msg.subject, ":", msg.data); 3}); 4
All subscriptions with the same queue name will form a queue group. Each message will be delivered to only a single subscriber in the queue group. You can have as many queue groups as you wish. Normal subscribers and different queue groups are independent.
1let sub3 = await nc.subscribe('foo.baz.>', (err, msg) => { 2 console.log('message received on', msg.subject, ":", msg.data); 3}, {queue: 'A'}); 4
A NATS connection can specify several servers. When the NATS client connects to one of the servers, the server may gossip additional known cluster members. If the NATS client disconnects, it will attempt to connect to one of them.
1let servers = ['nats://demo.nats.io:4222', 'nats://127.0.0.1:5222', 'nats://127.0.0.1:6222']; 2// Randomly connect to a server in the cluster group. 3let nc2 = await connect({servers: servers});
The client will randomize the list of servers that it manages to prevent a thundering herd
of clients all at the same time trying to reconnect to the same server. To prevent randomization,
specify the noRandomize
option.
1// Preserve order when connecting to servers. 2let nc3 = await connect({servers: servers, noRandomize: true});
Using a TLS connection encrypts all traffic to the client. Secure connections are easy with NATS.
Servers using TLS typically specify the tls
protocol instead of nats
. Strict tls requirements or options
are specified by the tls
option.
1// Simple TLS connect 2let ncs = await connect({url: 'tls://demo.nats.io:4443'}); 3 4// Client can explicitly request that the server be using tls 5let ncs1 = await connect({url: 'tls://demo.nats.io:4443', tls: true}); 6 7// if CA is self signed: 8import {readFileSync} from "fs"; 9 10let caCert = readFileSync('/path/to/cacert'); 11let ncs2 = await connect({url: 'tls://demo.nats.io:4443', tls: { 12 ca: caCert 13}}); 14 15// client can verify server certificate: 16let ncs3 = await connect({url: 'tls://demo.nats.io:4443', tls: { 17 ca: caCert, 18 rejectUnauthorized: true 19}}); 20 21// client can request to not validate server cert: 22let ncs4 = await connect({url: 'tls://demo.nats.io:4443', tls: { 23 rejectUnauthorized: false 24}}); 25 26// if server requires client certificates 27import {readFileSync} from "fs"; 28let caCert = readFileSync('/path/to/cacert'); 29let clientCert = readFileSync('/path/to/clientCert'); 30let clientKey = readFileSync('/path/to/clientKey'); 31 32let ncs5 = await connect({url: 'tls://someserver:4443', tls: { 33 ca: caCert, 34 key: clientKey, 35 cert: clientCert 36}}); 37
User credentials can be specified in the URL or as NatsConnectionOptions:
1// Connect with username and password in the url 2let nc6 = await connect({url: 'nats://me:secret@127.0.0.1:4222'}); 3// Connect with username and password in the options 4let nc7 = await connect({url: 'nats://127.0.0.1:4222', user: 'me', pass: 'secret'}); 5// Connect with token in url 6let nc8 = await connect({url: 'nats://token@127.0.0.1:4222'}); 7// or token inside the options: 8let nc9 = await connect({url: 'nats://127.0.0.1:4222', token: 'token'});
NKey authentication sends to the server a public nkey from the user, and signs a challenge. Server matches the public key with those allowed to connect and cryptographically verifies that the challenge was signed the user owning having the presented public key.
1// Seed Keys should be treated as secrets 2const uSeed = "SUAEL6GG2L2HIF7DUGZJGMRUFKXELGGYFMHF76UO2AYBG3K4YLWR3FKC2Q"; 3const uPub = "UD6OU4D3CIOGIDZVL4ANXU3NWXOW5DCDE2YPZDBHPBXCVKHSODUA4FKI"; 4let nc11 = await connect({url: 'tls://localhost:4222', 5 nkey: uPub, 6 nonceSigner: function(nonce:string): Buffer { 7 // fromSeed is from ts-nkeys 8 let sk = fromSeed(Buffer.from(uSeed)); 9 return sk.sign(Buffer.from(nonce)); 10 } 11 }); 12 13// Or much simpler if the seed nkey is in a file 14let nc12 = await connect({url: 'tls://localhost:4222', nkeyCreds: "/tmp/seed.txt" });
JWT Authentication returns a JWT to the server and signs a challenge. The JWT includes the user's public key and permissions. The server resolves the issuer of the JWT (an account). If the server trusts account issuer, the user is authenticated. The server itself doesn't have a direct knowledge of the user or the account.
1// Simples way to connect to a server using JWT and NKeys 2let nc13 = await connect({url: 'tls://connect.ngs.global', userCreds: "/path/to/file.creds"}); 3 4// Setting nkeys and nonceSigner callbacks directly 5// Seed Keys should be treated as secrets 6const uSeed = "SUAIBDPBAUTWCWBKIO6XHQNINK5FWJW4OHLXC3HQ2KFE4PEJUA44CNHTC4"; 7const uJWT = "eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJFU1VQS1NSNFhGR0pLN0FHUk5ZRjc0STVQNTZHMkFGWERYQ01CUUdHSklKUEVNUVhMSDJBIiwiaWF0IjoxNTQ0MjE3NzU3LCJpc3MiOiJBQ1pTV0JKNFNZSUxLN1FWREVMTzY0VlgzRUZXQjZDWENQTUVCVUtBMzZNSkpRUlBYR0VFUTJXSiIsInN1YiI6IlVBSDQyVUc2UFY1NTJQNVNXTFdUQlAzSDNTNUJIQVZDTzJJRUtFWFVBTkpYUjc1SjYzUlE1V002IiwidHlwZSI6InVzZXIiLCJuYXRzIjp7InB1YiI6e30sInN1YiI6e319fQ.kCR9Erm9zzux4G6M-V2bp7wKMKgnSNqMBACX05nwePRWQa37aO_yObbhcJWFGYjo1Ix-oepOkoyVLxOJeuD8Bw"; 8 9// the nonceSigner function takes a seed key and returns the signed nonce 10let nc14 = await connect({url: 'tls://connect.ngs.global', 11 userJWT: uJWT, 12 nonceSigner: function(nonce:string): Buffer { 13 // fromSeed is from ts-nkeys 14 let sk = fromSeed(Buffer.from(uSeed)); 15 return sk.sign(Buffer.from(nonce)); 16 } 17}); 18 19// the user JWT can also be provided dynamically 20let nc15 = await connect({url: 'tls://connect.ngs.global', 21 userJWT: function():string { 22 return uJWT; 23 }, 24 nonceSigner: function(nonce:string): Buffer { 25 // fromSeed is from ts-nkeys 26 let sk = fromSeed(Buffer.from(uSeed)); 27 return sk.sign(Buffer.from(nonce)); 28 } 29}); 30
NATS typically buffers messages sent to the server to reduce the number of kernel calls. This yields greater performance. The NATS client automatically buffers commands from the client and sends them. This buffering behaviour allows a NATS client to disconnect and continue to publish messages and create subscriptions briefly. On reconnect, the client sends all buffered messages and subscriptions to the server.
Sometimes a client wants to make sure that the NATS
server has processed outgoing messages. The flush()
will call a user-provided callback when the round trip
to the server is done.
1// Flush the connection and get notified when the server has finished processing 2let ok = await nc.flush(); 3 4// or 5nc.flush(() => { 6 console.log('done'); 7});
A client can ensure that when the processing of incoming messages takes longer than some threshold, that time is given to other waiting IO tasks. In the example below, when processing takes longer than 10ms, the client will yield.
1let nc16 = await connect({port: PORT, yieldTime: 10});
Subscriptions can auto cancel after it has received a specified number of messages:
1nc.subscribe('foo', (err, msg) => { 2 // do something 3}, {max: 10});
Or if the expected message count is not received to timeout:
1// Timeout if 10 messages are not received in specified time: 2nc.subscribe('foo', (err, msg) => { 3 // do something 4}, {max: 10, timeout: 1000});
Timeouts and expected message counts can be specified via the subscription after it the subscription resolves:
1let sub2 = await nc.subscribe('foo', (err, msg) => { 2 // do something 3}); 4sub2.unsubscribe(10); 5sub2.setTimeout(1000);
Normally unsubscribe will cancel a subscription by sending a command to the server and then cancelling the message handler for the subscription. In such cases it is possible for messages waiting to be processed to be dropped.
To orderly unsubscribe, drain()
sends the unsubscribe to the server and
flushes with a handler that cancels the subscription. Because the cancel
happens when the flush completes, drain ensures that the server will not send
any additional messages to the subscription and that all sent messages have
been processed by the client.
This functionality is particularly useful to queue subscribers.
1let sub3 = await nc.subscribe('foo', (err, msg) => { 2 // do something 3}); 4// sometime in the future drain the subscription. Drain returns a promise 5// when the drain promise resolves, the subscription finished processing 6// all pending inbound messages for the subscription. 7let p = await sub3.drain();
Similarly to subscription drain()
, an entire connection can be drained.
1let nc17 = await connect(); 2 3// create subscriptions etc 4... 5 6// When drain is called: 7// - no additional subscriptions or requests can be made. 8// - all open subscriptions drain (including the global request/reply) 9// When all drain requests resolve: 10// - a flush is sent to the server to drain outbound messages 11// - client closes 12await nc17.drain();
Message payloads can be strings, binary, or JSON.
Payloads determine the type of msg.data
on subscriptions
string
, Buffer
, or javascript object
.
1let nc18 = await connect({payload: Payload.STRING}); 2let nc19 = await connect({payload: Payload.JSON}); 3let nc20 = await connect({payload: Payload.BINARY});
String encodings can be set to node supported string encodings.
The default encoding is utf-8
, it only affects string payloads.
1let nc21 = await connect({payload: Payload.STRING, encoding: "ascii"});
Connect and reconnect behaviours can be configured. You can specify the number
of attempts and the interval between attempts on reconnects. By default a NATS connection will try to reconnect to a server ten times, waiting 2 seconds between reconnect attempts.
If the maximum number of retries is reached, the client will close()
the connection.
1// Keep trying to reconnect forever, and attempt to reconnect every 250ms 2let nc22 = await connect({maxReconnectAttempts: -1, reconnectTimeWait: 250}); 3
The nats client is an EventEmitter
, and thus emits various notifications:
Event | Argument | Description |
---|---|---|
close | Emitted when the client closes. A closed client is finished, and cannot be reused. | |
connect | Client , url (string), ServerInfo | Emitted when the client first connects to a NATS server. Only happens once. |
disconnect | url | Emitted when the client disconnects from a server. |
error | NatsError | Emitted when the client receives an error. If an error handler is not set, the node process will exit. |
permissionError | NatsError | Emitted when the server emits a permission error when subscribing or publishing to a subject that the client is not allowed to. |
reconnect | Client , url (string) | Emitted when the server connects to a different server |
reconnecting | url (string) | Emitted when the server attempts to reconnect to a different server |
serversChanged | ServersChangedEvent | Emitted when the server gossips a list of other servers in the cluster. Only servers not specified in a connect list are deleted if they disapear. |
subscribe | SubEvent | Emitted when a subscription is created on the client |
unsubscribe | SubEvent | Emitted when a subscription is auto-unsubscribed |
yield | Emitted when the client's processing took longer than the specified yield option, and the client yielded. |
See examples for more information.
The following is the list of connection options and default values.
Option | Default | Description |
---|---|---|
encoding | "utf8" | Encoding specified by the client to encode/decode data |
maxPingOut | 2 | Max number of pings the client will allow unanswered before rasing a stale connection error |
maxReconnectAttempts | 10 | Sets the maximum number of reconnect attempts. The value of -1 specifies no limit |
name | Optional client name (useful for debugging a client on the server output -DV ) | |
nkey | The public NKey identifying the client | |
nkeyCreds | Path to a file containing seed nkey for the client. This property sets a nonceSigner and nkey automatically. | |
noEcho | false | If set, the client's matching subscriptions won't receive messages published by the client. Requires server support 1.2.0+. |
nonceSigner | A NonceSigner function that signs the server challenge. | |
noRandomize | false | If set, the order of user-specified servers is randomized. |
pass | Sets the password for a connection | |
payload | Payload.STRING | Sets the payload type [Payload.STRING , Payload.BINARY , or Payload.JSON ]. |
pedantic | false | Turns on strict subject format checks |
pingInterval | 120000 | Number of milliseconds between client-sent pings |
reconnect | true | If false server will not attempt reconnecting |
reconnectJitter | 100 | Number of millis to randomize after reconnectTimeWait . See jitter. |
reconnectJitterTLS | 1000 | Number of millis to randomize after reconnectTimeWait when TLS options are specified. See jitter. |
reconnectDelayHandler | Generated function | A function that returns the number of millis to wait before the next connection to a server it connected to. See jitter. |
reconnectTimeWait | 2000 | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts |
servers | Array of connection url s | |
timeout | 0 | Number of milliseconds to wait before timing out the initial connection. Must be greater than 0 . Note that waitOnFirst must be specified, and reconnectTimeWait and maxReconnectAttempts must have sensible values supporting the desired timeout. |
tls | undefined | This property can be a boolean or an Object. If true the client requires a TLS connection. If false a non-tls connection is required. undefined allows connecting to either secure or non-secured. The value can also be an object specifying TLS certificate data, which will implicitly require a secured connection. The properties ca , key , cert should contain the certificate file data. ca should be provided for self-signed certificates. key and cert are required for client provided certificates. rejectUnauthorized if true validates server's credentials |
token | Sets a authorization token for a connection | |
url | "nats://localhost:4222" | Connection url |
user | Sets the username for a connection | |
userCreds | Path to a properly formatted user credentials file containing the client's JWT and seed key for the client. This property sets a nonceSigner automatically. | |
userJWT | A string or a JWTProvider function which provides a JWT specifying the client's permissions | |
verbose | false | Turns on +OK protocol acknowledgements |
waitOnFirstConnect | false | If true the server will fall back to a reconnect mode if it fails its first connection attempt. |
yieldTime | If set and processing exceeds yieldTime, client will yield to IO callbacks before processing additional inbound messages |
The settings reconnectTimeWait
, reconnectJitter
, reconnectJitterTLS
, reconnectDelayHandler
are all related.
They control how long before the NATS client attempts to reconnect to a server it has previously connected.
The intention of the settings is to spread out the number of clients attempting to reconnect to a server over a period of time, and thus preventing a "Thundering Herd".
The relationship between these is:
reconnectDelayHandler
is specified, the client will wait the value returned by this function. No other value will be taken into account.reconnectJitterTLS
and add it to
reconnectTimeWait
.reconnectJitter
and add it to reconnectTimeWait
.Support policy for Nodejs versions follows Nodejs release support. We will support and build ts-nats on even Nodejs versions that are current or in maintenance.
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.
No vulnerabilities found.
Reason
no binaries found in the repo
Reason
license file detected
Details
Reason
Found 20/27 approved changesets -- score normalized to 7
Reason
project is archived
Details
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
project is not fuzzed
Details
Reason
security policy file not detected
Details
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
Reason
34 existing vulnerabilities detected
Details
Score
Last Scanned on 2024-12-16
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More