Gathering detailed insights and metrics for @nats-io/nats-core
Gathering detailed insights and metrics for @nats-io/nats-core
Gathering detailed insights and metrics for @nats-io/nats-core
Gathering detailed insights and metrics for @nats-io/nats-core
npm install @nats-io/nats-core
Typescript
Module System
Node Version
NPM Version
88.7
Supply Chain
97.8
Quality
95.7
Maintenance
100
Vulnerability
100
License
TypeScript (95.71%)
JavaScript (4.23%)
Makefile (0.04%)
Dockerfile (0.02%)
Total Downloads
80,474
Last Day
243
Last Week
6,788
Last Month
29,568
Last Year
80,474
73 Stars
1,193 Commits
7 Forks
5 Watching
14 Branches
17 Contributors
Latest Version
3.0.0-48
Package Id
@nats-io/nats-core@3.0.0-48
Unpacked Size
498.63 kB
Size
101.30 kB
File Count
96
NPM Version
10.9.0
Node Version
23.2.0
Publised On
11 Dec 2024
Cumulative downloads
Total Downloads
Last day
8.5%
243
Compared to previous day
Last week
-2.6%
6,788
Compared to previous week
Last month
22.9%
29,568
Compared to previous month
Last year
0%
80,474
Compared to previous year
2
4
The core module implements the core functionality for JavaScript clients:
A native transports (node, deno) modules are a peer module that export a
connect
function which returns a concrete instance of a NatsConnection
. The
transport library re-exports all the functionality in this module, to make it
the entry point into the NATS JavaScript ecosystem.
You can use this module as a runtime agnostic dependency and implement functionality that uses a NATS client connection without binding your code to a particular runtime. For example, the @nats-io/jetstream library depends on @nats-io/nats-core to implement all of its JetStream protocol.
The core module also offers a for W3C Websocket transport (aka browser, Deno,
and Node v22) via the exported wsconnect
function. This function is
semantically equivalent to the traditional connect
, but returns a
NatsConnection
that is backed by a W3C WebSocket.
Note that wsconnect assumes wss://
connections. If you provide a port, it
likewise resolve to wss://localhost:443
. If you specify a ws://
URL, the
client assumes port 80, which is likely not the port. Check your server
configuration as the port for WebSocket protocol is NOT 4222.
If you are not implementing a NATS client compatible module, you can use this repository to view the documentation of the NATS core functionality. Your NATS client instance already uses and re-exports the module implemented here, so there's no need for you to directly depend on this library.
Note that this module is distributed in two different registries:
require
) and ESM (import
) for
node specific projectsimport
) compatible runtimes (deno, browser, node)If your application doesn't use require
, you can simply depend on the JSR
version.
The NPM registry hosts a node-only compatible version of the library @nats-io/nats-core supporting both CJS and ESM:
1npm install @nats-io/nats-core
The JSR registry hosts the ESM-only @nats-io/nats-core version of the library.
1deno add jsr:@nats-io/nats-core
1npx jsr add @nats-io/nats-core
1yarn dlx jsr add @nats-io/nats-core
1bunx jsr add @nats-io/nats-core
Once you import the library, you can reference in your code as:
1import * as nats_core from "@nats-io/nats-core"; 2// or in node (only when using CJS) 3const nats_core = require("@nats-io/nats-core");
The main entry point for this library is the NatsConnection
.
To connect to a server you use the connect()
exposed by your selected
transport. The connect function returns a connection which implements the
NatsConnection
that you can use to interact with the server.
The connect()
function will accept a ConnectOptions
which customizes some of
the client behaviors. In general all options apply to all transports where
possible. Options that are non-sensical on a particular runtime will be
documented by your transport module.
By default, connect()
will attempt a connection on 127.0.0.1:4222
. If the
connection is dropped, the client will attempt to reconnect. You can customize
the server you want to connect to by specifying port
(for local connections),
or full hostport on the servers
option. Note that the servers
option can be
a single hostport (a string) or an array of hostports.
The example below will attempt to connect to different servers by specifying
different ConnectionOptions
. At least two of them should work if your internet
is working.
1// import the connect function from a transport 2import { connect } from "@nats-io/transport-deno"; 3 4const servers = [ 5 {}, 6 { servers: ["demo.nats.io:4442", "demo.nats.io:4222"] }, 7 { servers: "demo.nats.io:4443" }, 8 { port: 4222 }, 9 { servers: "localhost" }, 10]; 11servers.forEach(async (v) => { 12 try { 13 const nc = await connect(v); 14 console.log(`connected to ${nc.getServer()}`); 15 // this promise indicates the client closed 16 const done = nc.closed(); 17 // do something with the connection 18 19 // close the connection 20 await nc.close(); 21 // check if the close was OK 22 const err = await done; 23 if (err) { 24 console.log(`error closing:`, err); 25 } 26 } catch (_err) { 27 console.log(`error connecting to ${JSON.stringify(v)}`); 28 } 29});
To disconnect from the nats-server, call close()
on the connection. A
connection can also be terminated when an unexpected error happens. For example,
the server returns a run-time error. In those cases, the client will re-initiate
a connection, it the connection options allow it.
By default, the client will always attempt to reconnect if the connection is
disrupted for a reason other than calling close()
. To get notified when the
connection is closed, await the resolution of the Promise returned by
closed()
. If closed resolves to a value, the value is a NatsError
indicating
why the connection closed.
The basic NATS client operations are publish
to send messages and subscribe
to receive messages.
Messages are published to a subject. A subject is like a URL with the
exception that it doesn't specify an actual endpoint. Subjects can be any
string, but until you learn more about NATS stick to the simple rule that
subjects that are just simple ASCII printable letters and number characters. All
recipients that have expressed interest in a subject will receive messages
addressed to that subject (provided they have access and permissions to get it).
To express interest in a subject, you create a subscription
.
In JavaScript clients subscriptions work as an async iterator - clients simply loop to process messages as they happen.
NATS messages are payload agnostic. Payloads are Uint8Arrays
or string
.
Messages also provide a string()
and json()
that allows you to convert the
underlying Uint8Array
into a string or parse by using JSON.parse()
(which
can fail to parse if the payload is not the expected format).
To cancel a subscription and terminate your interest, you call unsubscribe()
or drain()
on a subscription. Unsubscribe will typically terminate regardless
of whether there are messages in flight for the client. Drain ensures that all
messages that are inflight are processed before canceling the subscription.
Connections can also be drained as well. Draining a connection closes it, after
all subscriptions have been drained and all outbound messages have been sent to
the server.
1// import the connect function from a transport 2import { connect } from "@nats-io/transport-deno"; 3 4// to create a connection to a nats-server: 5const nc = await connect({ servers: "demo.nats.io:4222" }); 6 7// create a simple subscriber and iterate over messages 8// matching the subscription 9const sub = nc.subscribe("hello"); 10(async () => { 11 for await (const m of sub) { 12 console.log(`[${sub.getProcessed()}]: ${m.string()}`); 13 } 14 console.log("subscription closed"); 15})(); 16 17nc.publish("hello", "world"); 18nc.publish("hello", "again"); 19 20// we want to ensure that messages that are in flight 21// get processed, so we are going to drain the 22// connection. Drain is the same as close, but makes 23// sure that all messages in flight get seen 24// by the iterator. After calling drain, 25// the connection closes. 26await nc.drain();
1interface Person { 2 name: string; 3} 4 5// create a simple subscriber and iterate over messages 6// matching the subscription 7const sub = nc.subscribe("people"); 8(async () => { 9 for await (const m of sub) { 10 // typescript will see this as a Person 11 const p = m.json<Person>(); 12 console.log(`[${sub.getProcessed()}]: ${p.name}`); 13 } 14})(); 15 16const p = { name: "Memo" } as Person; 17nc.publish("people", JSON.stringify(p));
Subjects can be used to organize messages into hierarchies. For example, a subject may contain additional information that can be useful in providing a context to the message, such as the ID of the client that sent the message, or the region where a message originated.
Instead of subscribing to each specific subject, you can create subscriptions
that have subjects with wildcards. Wildcards match one or more tokens in a
subject. A token is a string following a period (.
).
All subscriptions are independent. If two different subscriptions match a subject, both will get to process the message:
1import { connect } from "@nats-io/transport-deno"; 2import type { Subscription } from "@nats-io/transport-deno"; 3const nc = await connect({ servers: "demo.nats.io:4222" }); 4 5// subscriptions can have wildcard subjects 6// the '*' matches any string in the specified token position 7const s1 = nc.subscribe("help.*.system"); 8const s2 = nc.subscribe("help.me.*"); 9// the '>' matches any tokens in that position or following 10// '>' can only be specified at the end 11const s3 = nc.subscribe("help.>"); 12 13async function printMsgs(s: Subscription) { 14 const subj = s.getSubject(); 15 console.log(`listening for ${subj}`); 16 const c = 13 - subj.length; 17 const pad = "".padEnd(c); 18 for await (const m of s) { 19 console.log( 20 `[${subj}]${pad} #${s.getProcessed()} - ${m.subject} ${ 21 m.data ? " " + m.string() : "" 22 }`, 23 ); 24 } 25} 26 27printMsgs(s1); 28printMsgs(s2); 29printMsgs(s3); 30 31// don't exit until the client closes 32await nc.closed();
Request/Reply is NATS equivalent to an HTTP request. To make requests you
publish messages as you did before, but also specify a reply
subject. The
reply
subject is where a service will publish (send) your response.
NATS provides syntactic sugar, for publishing requests. The request()
API will
generate a reply subject and manage the creation of a subscription under the
covers to receive the reply. It will also start a timer to ensure that if a
response is not received within your specified time, the request fails. The
example also illustrates a graceful shutdown.
Here's an example of a service. It is a bit more complicated than expected simply to illustrate not only how to create responses, but how the subject itself is used to dispatch different behaviors.
1import { connect, Subscription } from "@nats-io/nats-deno"; 2 3// create a connection 4const nc = await connect({ servers: "demo.nats.io" }); 5 6// this subscription listens for `time` requests and returns the current time 7const sub = nc.subscribe("time"); 8(async (sub: Subscription) => { 9 console.log(`listening for ${sub.getSubject()} requests...`); 10 for await (const m of sub) { 11 if (m.respond(sc.encode(new Date().toISOString()))) { 12 console.info(`[time] handled #${sub.getProcessed()}`); 13 } else { 14 console.log(`[time] #${sub.getProcessed()} ignored - no reply subject`); 15 } 16 } 17 console.log(`subscription ${sub.getSubject()} drained.`); 18})(sub); 19 20// this subscription listens for admin.uptime and admin.stop 21// requests to admin.uptime returns how long the service has been running 22// requests to admin.stop gracefully stop the client by draining 23// the connection 24const started = Date.now(); 25const msub = nc.subscribe("admin.*"); 26(async (sub: Subscription) => { 27 console.log(`listening for ${sub.getSubject()} requests [uptime | stop]`); 28 // it would be very good to verify the origin of the request 29 // before implementing something that allows your service to be managed. 30 // NATS can limit which client can send or receive on what subjects. 31 for await (const m of sub) { 32 const chunks = m.subject.split("."); 33 console.info(`[admin] #${sub.getProcessed()} handling ${chunks[1]}`); 34 switch (chunks[1]) { 35 case "uptime": 36 // send the number of millis since up 37 m.respond(sc.encode(`${Date.now() - started}`)); 38 break; 39 case "stop": { 40 m.respond(sc.encode(`[admin] #${sub.getProcessed()} stopping....`)); 41 // gracefully shutdown 42 nc.drain() 43 .catch((err) => { 44 console.log("error draining", err); 45 }); 46 break; 47 } 48 default: 49 console.log( 50 `[admin] #${sub.getProcessed()} ignoring request for ${m.subject}`, 51 ); 52 } 53 } 54 console.log(`subscription ${sub.getSubject()} drained.`); 55})(msub); 56 57// wait for the client to close here. 58await nc.closed().then((err?: void | Error) => { 59 let m = `connection to ${nc.getServer()} closed`; 60 if (err) { 61 m = `${m} with an error: ${err.message}`; 62 } 63 console.log(m); 64});
Here's a simple example of a client making a simple request from the service above:
1import { connect, Empty, StringCodec } from "../../src/types.ts"; 2 3// create a connection 4const nc = await connect({ servers: "demo.nats.io:4222" }); 5 6// create an encoder 7const sc = StringCodec(); 8 9// the client makes a request and receives a promise for a message 10// by default the request times out after 1s (1000 millis) and has 11// no payload. 12await nc.request("time", Empty, { timeout: 1000 }) 13 .then((m) => { 14 console.log(`got response: ${sc.decode(m.data)}`); 15 }) 16 .catch((err) => { 17 console.log(`problem with request: ${err.message}`); 18 }); 19 20await nc.close();
Of course you can also use a tool like the nats cli:
1> nats -s demo.nats.io req time "" 211:39:59 Sending request on "time" 311:39:59 Received with rtt 97.814458ms 42024-06-26T16:39:59.710Z 5 6> nats -s demo.nats.io req admin.uptime "" 711:38:41 Sending request on "admin.uptime" 811:38:41 Received with rtt 99.065458ms 961688 10 11>nats -s demo.nats.io req admin.stop "" 1211:39:08 Sending request on "admin.stop" 1311:39:08 Received with rtt 100.004959ms 14[admin] #5 stopping....
Queue groups allow scaling of services horizontally. Subscriptions for members of a queue group are treated as a single service. When you send a message to a queue group subscription, only a single client in a queue group will receive it.
There can be any number of queue groups. Each group is treated as its own independent unit. Note that non-queue subscriptions are also independent of subscriptions in a queue group.
1import { connect } from "@nats-io/transport-deno"; 2import type { NatsConnection, Subscription } from "@nats-io/transport-deno"; 3 4async function createService( 5 name: string, 6 count = 1, 7 queue = "", 8): Promise<NatsConnection[]> { 9 const conns: NatsConnection[] = []; 10 for (let i = 1; i <= count; i++) { 11 const n = queue ? `${name}-${i}` : name; 12 const nc = await connect( 13 { servers: "demo.nats.io:4222", name: `${n}` }, 14 ); 15 nc.closed() 16 .then((err) => { 17 if (err) { 18 console.error( 19 `service ${n} exited because of error: ${err.message}`, 20 ); 21 } 22 }); 23 // create a subscription - note the option for a queue, if set 24 // any client with the same queue will be the queue group. 25 const sub = nc.subscribe("echo", { queue: queue }); 26 const _ = handleRequest(n, sub); 27 console.log(`${n} is listening for 'echo' requests...`); 28 conns.push(nc); 29 } 30 return conns; 31} 32 33// simple handler for service requests 34async function handleRequest(name: string, s: Subscription) { 35 const p = 12 - name.length; 36 const pad = "".padEnd(p); 37 for await (const m of s) { 38 // respond returns true if the message had a reply subject, thus it could respond 39 if (m.respond(m.data)) { 40 console.log( 41 `[${name}]:${pad} #${s.getProcessed()} echoed ${m.string()}`, 42 ); 43 } else { 44 console.log( 45 `[${name}]:${pad} #${s.getProcessed()} ignoring request - no reply subject`, 46 ); 47 } 48 } 49} 50 51// let's create two queue groups and a standalone subscriber 52const conns: NatsConnection[] = []; 53conns.push(...await createService("echo", 3, "echo")); 54conns.push(...await createService("other-echo", 2, "other-echo")); 55conns.push(...await createService("standalone")); 56 57const a: Promise<void | Error>[] = []; 58conns.forEach((c) => { 59 a.push(c.closed()); 60}); 61await Promise.all(a);
Run it and publish a request to the subject echo
to see what happens.
NATS headers are similar to HTTP headers. Headers are enabled automatically if the server supports them. Note that if you publish a message using headers but the server doesn't support them, an Error is thrown. Also note that even if you are publishing a message with a header, it is possible for the recipient to not support them.
1import { connect, createInbox, Empty, headers } from "../../src/types.ts"; 2import { nuid } from "../../nats-base-client/nuid.ts"; 3 4const nc = await connect( 5 { 6 servers: `demo.nats.io`, 7 }, 8); 9 10const subj = createInbox(); 11const sub = nc.subscribe(subj); 12(async () => { 13 for await (const m of sub) { 14 if (m.headers) { 15 for (const [key, value] of m.headers) { 16 console.log(`${key}=${value}`); 17 } 18 // reading a header is not case sensitive 19 console.log("id", m.headers.get("id")); 20 } 21 } 22})().then(); 23 24// header names can be any printable ASCII character with the exception of `:`. 25// header values can be any ASCII character except `\r` or `\n`. 26// see https://www.ietf.org/rfc/rfc822.txt 27const h = headers(); 28h.append("id", nuid.next()); 29h.append("unix_time", Date.now().toString()); 30nc.publish(subj, Empty, { headers: h }); 31 32await nc.flush(); 33await nc.close();
Requests can fail for many reasons. A common reason for a failure is the lack of
interest in the subject. Typically these surface as a timeout error. If the
server is enabled to use headers, it will also enable a no responders
feature.
If you send a request for which there's no interest, the request will be
immediately rejected:
1import { connect } from "@nats-io/transport-deno"; 2import { 3 NoRespondersError, 4 RequestError, 5 TimeoutError, 6} from "@nats-io/transport-deno"; 7 8const nc = await connect({ 9 servers: `demo.nats.io`, 10}); 11 12try { 13 const m = await nc.request("hello.world"); 14 console.log(m.data); 15} catch (err) { 16 if (err instanceof RequestError) { 17 if (err.cause instanceof TimeoutError) { 18 console.log("someone is listening but didn't respond"); 19 } else if (err.cause instanceof NoRespondersError) { 20 console.log("no one is listening to 'hello.world'"); 21 } else { 22 console.log( 23 `failed due to unknown error: ${(err.cause as Error)?.message}`, 24 ); 25 } 26 } else { 27 console.log(`request failed: ${(err as Error).message}`); 28 } 29} 30 31await nc.close();
NATS supports many different forms of credentials:
For user/password and token authentication, you can simply provide them as
ConnectionOptions
- see user
, pass
, token
. Internally these mechanisms
are implemented as an Authenticator
. An Authenticator
is simply a function
that handles the type of authentication specified.
Setting the user
/pass
or token
options, simply initializes an
Authenticator
and sets the username/password.
1// if the connection requires authentication, provide `user` and `pass` or 2// `token` options in the NatsConnectionOptions 3import { connect } from "@nats-io/transport-deno"; 4 5const nc1 = await connect({ 6 servers: "127.0.0.1:4222", 7 user: "jenny", 8 pass: "867-5309", 9}); 10const nc2 = await connect({ port: 4222, token: "t0pS3cret!" });
NKEYs and JWT authentication are more complex, as they cryptographically respond to a server challenge.
Because NKEY and JWT authentication may require reading data from a file or an HTTP cookie, these forms of authentication will require a bit more from the developer to activate them. The work is related to accessing these resources varies depending on the platform.
After the credential artifacts are read, you can use one of these functions to
create the authenticator. You then simply assign it to the authenticator
property of the ConnectionOptions
:
nkeyAuthenticator(seed?: Uint8Array | (() => Uint8Array)): Authenticator
jwtAuthenticator(jwt: string | (() => string), seed?: Uint8Array | (()=> Uint8Array)): Authenticator
credsAuthenticator(creds: Uint8Array | (() => Uint8Array)): Authenticator
Note that the authenticators provide the ability to specify functions that return the desired value. This enables dynamic environments such as a browser where values accessed by fetching a value from a cookie.
Here's an example:
1// read the creds file as necessary, in the case it 2// is part of the code for illustration purposes (this a partial creds) 3const creds = `-----BEGIN NATS USER JWT----- 4 eyJ0eXAiOiJqdSDJB.... 5 ------END NATS USER JWT------ 6 7************************* IMPORTANT ************************* 8 NKEY Seed printed below can be used sign and prove identity. 9 NKEYs are sensitive and should be treated as secrets. 10 11 -----BEGIN USER NKEY SEED----- 12 SUAIBDPBAUTW.... 13 ------END USER NKEY SEED------ 14`; 15 16const nc = await connect( 17 { 18 port: 4222, 19 authenticator: credsAuthenticator(new TextEncoder().encode(creds)), 20 }, 21);
Flush sends a PING
protocol message to the server. When the server responds
with PONG
you are guaranteed that all pending data was sent and received by
the server. Note ping()
effectively adds a server round-trip. All NATS clients
handle their buffering optimally, so ping(): Promise<void>
shouldn't be used
except in cases where you are writing some sort of test, as you may be degrading
the performance of the client.
1nc.publish("foo"); 2nc.publish("bar"); 3await nc.flush();
PublishOptions
When you publish a message you can specify some options:
reply
- this is a subject to receive a reply (you must set up a subscription
on the reply subject) before you publish.headers
- a set of headers to decorate the message.SubscriptionOptions
You can specify several options when creating a subscription:
max
: maximum number of messages to receive - auto unsubscribetimeout
: how long to wait for the first messagequeue
: the queue group name the subscriber belongs tocallback
: a function with the signature
(err: Error|null, msg: Msg) => void;
that should be used for handling the
message. Subscriptions with callbacks are NOT iterators.1// subscriptions can auto unsubscribe after a certain number of messages 2nc.subscribe("foo", { max: 10 });
1// create subscription with a timeout, if no message arrives 2// within the timeout, the subscription throws a timeout error 3const sub = nc.subscribe("hello", { timeout: 1000 }); 4(async () => { 5 for await (const _m of sub) { 6 // handle the messages 7 } 8})().catch((err) => { 9 if (err instanceof TimeoutError) { 10 console.log(`sub timed out!`); 11 } else { 12 console.log(`sub iterator got an error!`); 13 } 14 nc.close(); 15});
RequestOptions
When making a request, there are several options you can pass:
timeout
: how long to wait for the responseheaders
: optional headers to include with the messagenoMux
: create a new subscription to handle the request. Normally a shared
subscription is used to receive response messages.reply
: optional subject where the reply should be sent.noMux
and reply
Under the hood, the request API simply uses a wildcard subscription to handle all requests you send.
In some cases, the default subscription strategy doesn't work correctly. For example, a client may be constrained by the subjects where it can receive replies.
When noMux
is set to true
, the client will create a normal subscription for
receiving the response to a generated inbox subject before the request is
published. The reply
option can be used to override the generated inbox
subject with an application provided one. Note that setting reply
requires
noMux
to be true
:
1const m = await nc.request( 2 "q", 3 Empty, 4 { reply: "bar", noMux: true, timeout: 1000 }, 5);
Draining provides for a graceful way to unsubscribe or close a connection without losing messages that have already been dispatched to the client.
You can drain a subscription or all subscriptions in a connection.
When you drain a subscription, the client sends an unsubscribe
protocol
message to the server followed by a flush
. The subscription handler is only
removed after the server responds. Thus all pending messages for the
subscription have been processed.
Draining a connection, drains all subscriptions. However when you drain the connection it becomes impossible to make new subscriptions or send new requests. After the last subscription is drained it also becomes impossible to publish a message. These restrictions do not exist when just draining a subscription.
Clients can get notification on various event types by calling
status(): AsyncIterable<Status>
on the connection, the currently included
status type
s include:
disconnect
- the client disconnected from the specified server
reconnect
- the client reconnected to the specified server
reconnecting
- the client is in its reconnect loopupdate
- the cluster configuration has been updated, if servers were added
the added
list will specify them, if servers were deleted servers the
deleted
list will specify them.ldm
- the server has started its lame duck mode and will evict clientserror
- an async error (such as a permission violation) was received, the
error is specified in the error
property. Note that permission errors for
subscriptions are also notified to the subscription.ping
- the server has not received a response for client pings, the number
of outstanding pings are notified in the pendingPings
property. Note that
this should onlyl be 1
under normal operations.staleConnection
- the connection is stale (client will reconnect)forceReconnect
- the client has been instructed to reconnect because of
user-code (reconnect()
)1const nc = await connect(opts); 2(async () => { 3 console.info(`connected ${nc.getServer()}`); 4 for await (const s of nc.status()) { 5 switch (s.type) { 6 case "disconnect": 7 case "reconnect": 8 console.log(s); 9 break; 10 default: 11 // ignored 12 } 13 } 14})().then(); 15 16nc.closed() 17 .then((err) => { 18 console.log( 19 `connection closed ${err ? " with error: " + err.message : ""}`, 20 ); 21 });
Be aware that when a client closes, you will need to wait for the closed()
promise to resolve. When it resolves, the client is done and will not reconnect.
Previous versions of the JavaScript NATS clients specified callbacks for message processing. This required complex handling logic when a service required coordination of operations. Callbacks are an inversion of control anti-pattern.
The async APIs trivialize complex coordination and makes your code easier to maintain. With that said, there are some implications:
In a traditional callback-based library, I/O happens after all data yielded by a read in the current event loop completes processing. This means that callbacks are invoked as part of processing. With async, the processing is queued in a microtask queue. At the end of the event loop, the runtime processes the microtasks, which in turn resumes your functions. As expected, this increases latency, but also provides additional liveliness.
To reduce async latency, the NATS client allows processing a subscription in the
same event loop that dispatched the message. Simply specify a callback
in the
subscription options. The signature for a callback is
(err: (NatsError|null), msg: Msg) => void
. When specified, the subscription
iterator will never yield a message, as the callback will intercept all
messages.
Note that callback
likely shouldn't even be documented, as likely it is a
workaround to an underlying application problem where you should be considering
a different strategy to horizontally scale your application, or reduce pressure
on the clients, such as using queue workers, or more explicitly targeting
messages. With that said, there are many situations where using callbacks can be
more performant or appropriate.
The following is the list of connection options and default values.
Option | Default | Description |
---|---|---|
authenticator | none | Specifies the authenticator function that sets the client credentials. |
debug | false | If true , the client prints protocol interactions to the console. Useful for debugging. |
ignoreClusterUpdates | false | If true the client will ignore any cluster updates provided by the server. |
ignoreAuthErrorAbort | false | Prevents client connection aborts if the client fails more than twice in a row with an authentication error |
inboxPrefix | "_INBOX" | Sets de prefix for automatically created inboxes - createInbox(prefix) |
maxPingOut | 2 | Max number of pings the client will allow unanswered before raising a stale connection error. |
maxReconnectAttempts | 10 | Sets the maximum number of reconnect attempts. The value of -1 specifies no limit. |
name | Optional client name - recommended to be set to a unique client name. | |
noAsyncTraces | false | When true the client will not add additional context to errors associated with request operations. Setting this option to true will greatly improve performance of request/reply and JetStream publishers. |
noEcho | false | Subscriptions receive messages published by the client. Requires server support (1.2.0). If set to true, and the server does not support the feature, an error with code NO_ECHO_NOT_SUPPORTED is emitted, and the connection is aborted. Note that it is possible for this error to be emitted on reconnect when the server reconnects to a server that does not support the feature. |
noRandomize | false | If set, the order of user-specified servers is randomized. |
noResolve | none | If true, client will not resolve host names. |
pass | Sets the password for a connection. | |
pedantic | false | Turns on strict subject format checks. |
pingInterval | 120000 | Number of milliseconds between client-sent pings. |
port | 4222 | Port to connect to (only used if servers is not specified). |
reconnect | true | If false, client will not attempt reconnecting. |
reconnectDelayHandler | Generated function | A function that returns the number of millis to wait before the next connection to a server it connected to ()=>number . |
reconnectJitter | 100 | Number of millis to randomize after reconnectTimeWait . |
reconnectJitterTLS | 1000 | Number of millis to randomize after reconnectTimeWait when TLS options are specified. |
reconnectTimeWait | 2000 | If disconnected, the client will wait the specified number of milliseconds between reconnect attempts. |
servers | "localhost:4222" | String or Array of hostport for servers. |
timeout | 20000 | Number of milliseconds the client will wait for a connection to be established. If it fails it will emit a connection_timeout event with a NatsError that provides the hostport of the server where the connection was attempted. |
tls | TlsOptions | A configuration object for requiring a TLS connection (not applicable to nats.ws). |
token | Sets a authorization token for a connection. | |
user | Sets the username for a connection. | |
verbose | false | Turns on +OK protocol acknowledgements. |
waitOnFirstConnect | false | If true the client will fall back to a reconnect mode if it fails its first connection attempt. |
Option | Default | Description |
---|---|---|
ca | N/A | CA certificate |
caFile | CA certificate filepath | |
cert | N/A | Client certificate |
certFile | N/A | Client certificate file path |
key | N/A | Client key |
keyFile | N/A | Client key file path |
handshakeFirst | false | Connects to the server directly as TLS rather than upgrade the connection. Note that the server must be configured accordingly. |
In some Node and Deno clients, having the option set to an empty option, requires the client have a secured connection.
The settings reconnectTimeWait
, reconnectJitter
, reconnectJitterTLS
,
reconnectDelayHandler
are all related. They control how long before the NATS
client attempts to reconnect to a server it has previously connected.
The intention of the settings is to spread out the number of clients attempting to reconnect to a server over a period of time, and thus preventing a "Thundering Herd".
The relationship between these is:
reconnectDelayHandler
is specified, the client will wait the value
returned by this function. No other value will be taken into account.reconnectJitterTLS
and add it to reconnectTimeWait
.reconnectJitter
and add it to reconnectTimeWait
.No vulnerabilities found.
Reason
no dangerous workflow patterns detected
Reason
30 commit(s) and 19 issue activity found in the last 90 days -- score normalized to 10
Reason
no binaries found in the repo
Reason
license file detected
Details
Reason
0 existing vulnerabilities detected
Reason
Found 26/30 approved changesets -- score normalized to 8
Reason
detected GitHub workflow tokens with excessive permissions
Details
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
project is not fuzzed
Details
Reason
security policy file not detected
Details
Reason
dependency not pinned by hash detected -- score normalized to 0
Details
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
Score
Last Scanned on 2024-12-16
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More