Gathering detailed insights and metrics for @crabas0npm/enim-fugiat-labore
Gathering detailed insights and metrics for @crabas0npm/enim-fugiat-labore
npm install @crabas0npm/enim-fugiat-labore
Typescript
Module System
Node Version
NPM Version
Cumulative downloads
Total Downloads
Last day
0%
2
Compared to previous day
Last week
0%
5
Compared to previous week
Last month
100%
8
Compared to previous month
Last year
0%
121
Compared to previous year
33
Shared file system queue for Node.js.
direct_handler
option to the QlobberFSQ constructor
and the direct
option to publish
.qlobber-pg
, can be used when you need access from multiple hosts. It's API-compatible with @crabas0npm/enim-fugiat-labore
and requires a PostgreSQL database.Example:
1var QlobberFSQ = require('@crabas0npm/enim-fugiat-labore').QlobberFSQ; 2var fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' }); 3fsq.subscribe('foo.*', function (data, info) 4{ 5 console.log(info.topic, data.toString('utf8')); 6 var assert = require('assert'); 7 assert.equal(info.topic, 'foo.bar'); 8 assert.equal(data, 'hello'); 9}); 10fsq.on('start', function () 11{ 12 this.publish('foo.bar', 'hello'); 13});
You can publish messages using a separate process if you like:
1var QlobberFSQ = require('@crabas0npm/enim-fugiat-labore').QlobberFSQ; 2var fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' }); 3fsq.stop_watching(); 4fsq.on('stop', function () 5{ 6 this.publish('foo.bar', 'hello'); 7});
Or use the streaming interface to read and write messages:
1const { QlobberFSQ } = require('@crabas0npm/enim-fugiat-labore'); 2const fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' }); 3function handler(stream, info) 4{ 5 const data = []; 6 7 stream.on('readable', function () 8 { 9 let chunk; 10 while (chunk = this.read()) 11 { 12 data.push(chunk); 13 } 14 }); 15 16 stream.on('end', function () 17 { 18 const str = Buffer.concat(data).toString('utf8'); 19 console.log(info.topic, str); 20 const assert = require('assert'); 21 assert.equal(info.topic, 'foo.bar'); 22 assert.equal(str, 'hello'); 23 }); 24} 25handler.accept_stream = true; 26fsq.subscribe('foo.*', handler); 27fsq.on('start', function () 28{ 29 fsq.publish('foo.bar').end('hello'); 30});
The API is described here.
1npm install @crabas0npm/enim-fugiat-labore
@crabas0npm/enim-fugiat-labore
provides no guarantee that the order messages are given to subscribers is the same as the order in which the messages were written. If you want to maintain message order between readers and writers then you'll need to do it in your application (using ACKs, sliding windows etc). Alternatively, use the order_by_expiry
constructor option to have messages delivered in order of the time they expire.
@crabas0npm/enim-fugiat-labore
does its best not to lose messages but in exceptional circumstances (e.g. process crash, file system corruption) messages may get dropped. You should design your application to be resilient against dropped messages.
@crabas0npm/enim-fugiat-labore
makes no assurances about the security or privacy of messages in transit or at rest. It's up to your application to encrypt messages if required.
@crabas0npm/enim-fugiat-labore
supports Node 6 onwards.
Note: When using a distributed file system with @crabas0npm/enim-fugiat-labore
, ensure that you synchronize the time and date on all the computers you're using.
When using the FraunhoferFS distributed file system, set the following options in fhgfs-client.conf
:
tuneFileCacheType = none
tuneUseGlobalFileLocks = true
@crabas0npm/enim-fugiat-labore
has been tested with FraunhoferFS 2014.01 on Ubuntu 14.04 and FraunhoferFS 2012.10 on Ubuntu 13.10.
@crabas0npm/enim-fugiat-labore
has been tested with CephFS 0.80 on Ubuntu 14.04. Note that you'll need to upgrade your kernel to at least 3.14.1 in order to get the fix for a bug in CephFS.
Under the directory you specify for fsq_dir
, @crabas0npm/enim-fugiat-labore
creates the following sub-directories:
staging
Whilst it's being published, each message is written to a file in the staging area. The filename itself contains the message's topic, when it expires, whether it should be read by one subscriber or many and a random sequence of characters to make it unique.messages
Once published to the staging area, each message is moved into this directory. @crabas0npm/enim-fugiat-labore
actually creates a number of sub-directories (called buckets) under messages
and distributes message between buckets according to the hash of their filenames. This helps to reduce the number of directory entries that have to be read when a single message is written.topics
If a message's topic is long, a separate topic file is created for it in this directory.update
This contains one file, UPDATE
, which is updated with a random sequence of bytes (called a stamp) every time a message is moved into the messages
directory. UPDATE
contains a separate stamp for each bucket.@crabas0npm/enim-fugiat-labore
reads UPDATE
at regular intervals to determine whether a new message has been written to a bucket. If it has then it processes each filename in the bucket's directory listing.
If the expiry time in the filename has passed then it deletes the message.
If the filename indicates the message can be read by many subscribers:
qlobber
to pattern match topics to subscribers.If the filename indicates the message can be read by only one subscriber (i.e. work queue semantics):
flock
. If it fails to lock the file then stop processing this filename.To run the default tests:
1grunt test [--fsq-dir=<path>] [--getdents_size=<buffer size>] [--disruptor]
If you don't specify --fsq-dir
then the default will be used (a directory named fsq
in the test
directory).
If you specify --getdents_size
then use of getdents
will be included in the tests.
If you specify --disruptor
then use of shared memory LMAX Disruptors will be included in the tests.
To run the stress tests (multiple queues in a single Node process):
1grunt test-stress [--fsq-dir=<path>] [--disruptor]
To run the multi-process tests (each process publishing and subscribing to different messages):
1grunt test-multi [--fsq-dir=<path>] [--queues=<number of queues>] [--disruptor]
If you omit --queues
then one process will be created per core (detected with os.cpus()
).
To run the distributed tests (one process per remote host, each one publishing and subscribing to different messages):
1grunt test-multi --fsq-dir=<path> --remote=<host1> --remote=<host2>
You can specify as many remote hosts as you like. The test uses cp-remote
to run a module on each remote host. Make sure on each host:
@crabas0npm/enim-fugiat-labore
module is installed at the same location.--fsq-dir
. FraunhoferFS and CephFS are the only distributed file systems currently supported.Please note the distributed tests don't run on Windows.
1grunt lint
1grunt coverage [--fsq-dir=<path>]
c8 results are available here.
Coveralls page is here.
To run the benchmark:
1grunt bench [--fsq-dir=<path>] \ 2 --rounds=<number of rounds> \ 3 --size=<message size> \ 4 --ttl=<message time-to-live in seconds> \ 5 [--disruptor] \ 6 [--num_elements=<number of disruptor elements>] \ 7 [--element_size=<disruptor element size>] \ 8 [--bucket_stamp_size=<number of bytes to write to UPDATE file] \ 9 [--getdents_size=<buffer size>] \ 10 [--ephemeral] \ 11 [--refresh_ttl=<period between expiration check in seconds>] \ 12 (--queues=<number of queues> | \ 13 --remote=<host1> --remote=<host2> ...)
If you don't specify --fsq-dir
then the default will be used (a directory named fsq
in the bench
directory).
If you provide at least one --remote=<host>
argument then the benchmark will be distributed across multiple hosts using cp-remote
. Make sure on each host:
@crabas0npm/enim-fugiat-labore
module is installed at the same location.--fsq-dir
. FraunhoferFS and CephFS are the only distributed file systems currently supported.Creates a new
QlobberFSQ
object for publishing and subscribing to a file system queue.
Parameters:
{Object} [options]
Configures the file system queue. Valid properties are listed below:
{String} fsq_dir
The path to the file system queue directory. Note that the following sub-directories will be created under this directory if they don't exist: messages
, staging
, topics
and update
. Defaults to a directory named fsq
in the @crabas0npm/enim-fugiat-labore
module directory.
{Boolean} encode_topics
Whether to hex-encode message topics. Because topic strings form part of message filenames, they're first hex-encoded. If you can ensure that your message topics contain only valid filename characters, set this to false
to skip encoding. Defaults to true
.
{Integer} split_topic_at
Maximum number of characters in a short topic. Short topics are contained entirely in a message's filename. Long topics are split so the first split_topic_at
characters go in the filename and the rest are written to a separate file in the topics
sub-directory. Obviously long topics are less efficient. Defaults to 200, which is the maximum for most common file systems. Note: if your fsq_dir
is on an ecryptfs
file system then you should set split_topic_at
to 100.
{Integer} bucket_base
, {Integer} bucket_num_chars
Messages are distributed across different buckets for efficiency. Each bucket is a sub-directory of the messages
directory. The number of buckets is determined by the bucket_base
and bucket_num_chars
options. bucket_base
is the radix to use for bucket names and bucket_num_chars
is the number of digits in each name. For example, bucket_base: 26
and bucket_num_chars: 4
results in buckets 0000
through pppp
. Defaults to base_base: 16
and bucket_num_chars: 2
(i.e. buckets 00
through ff
). The number of buckets is available as the num_buckets
property of the QlobberFSQ
object.
{Integer} bucket_stamp_size
The number of bytes to write to the UPDATE
file when a message is published. The UPDATE
file (in the update
directory) is used to determine whether any messages have been published without having to scan all the bucket directories. Each bucket has a section in the UPDATE
file, bucket_stamp_size
bytes long. When a message is written to a bucket, its section is filled with random bytes. Defaults to 32. If you set this to 0, the UPDATE
file won't be written to and all the bucket directories will be scanned even if no messages have been published.
{Integer} flags
Extra flags to use when reading and writing files. You shouldn't need to use this option but if you do then it should be a bitwise-or of values in the (undocumented) Node constants
module (e.g. constants.O_DIRECT | constants.O_SYNC
). Defaults to 0.
{Integer} unique_bytes
Number of random bytes to append to each message's filename (encoded in hex), in order to avoid name clashes. Defaults to 16. If you increase it (or change the algorithm to add some extra information like the hostname), be sure to reduce split_topic_at
accordingly.
{Integer} single_ttl
Default time-to-live (in milliseconds) for messages which should be read by at most one subscriber. This value is added to the current time and the resulting expiry time is put into the message's filename. After the expiry time, the message is ignored and deleted when convenient. Defaults to 1 hour.
{Integer} multi_ttl
Default time-to-live (in milliseconds) for messages which can be read by many subscribers. This value is added to the current time and the resulting expiry time is put into the message's filename. After the expiry time, the message is ignored and deleted when convenient. Defaults to 1 minute.
{Integer} poll_interval
@crabas0npm/enim-fugiat-labore
reads the UPDATE
file at regular intervals to check whether any messages have been written. poll_interval
is the time (in milliseconds) between each check. Defaults to 1 second.
{Boolean} notify
Whether to use fs.watch
to watch for changes to the UPDATE
file. Note that this will be done in addition to reading it every poll_interval
milliseconds because fs.watch
(inotify
underneath) can be unreliable, especially under high load. Defaults to true
.
{Integer} retry_interval
Some I/O operations can fail with an error indicating they should be retried. retry_interval
is the time (in milliseconds) to wait before retrying. Defaults to 1 second.
{Integer} message_concurrency
The number of messages in each bucket to process at once. Defaults to 1.
{Integer} bucket_concurrency
The number of buckets to process at once. Defaults to 1.
{Integer} handler_concurrency
By default, a message is considered handled by a subscriber only when all its data has been read. If you set handler_concurrency
to non-zero, a message is considered handled as soon as a subscriber receives it. The next message will then be processed straight away. The value of handler_concurrency
limits the number of messages being handled by subscribers at any one time. Defaults to 0 (waits for all message data to be read).
{Boolean} order_by_expiry
Pass messages to subscribers in order of their expiry time. If true
then bucket_base
and bucket_num_chars
are forced to 1 so messages are written to a single bucket. Defaults to false
.
{Boolean} dedup
Whether to ensure each handler function is called at most once when a message is received. Defaults to true
.
{Boolean} single
Whether to process messages meant for at most one subscriber (across all QlobberFSQ
objects), i.e. work queues. This relies on the optional dependency fs-ext
. Defaults to true
if fs-ext
is installed, otherwise false
(in which case a single_disabled
event will be emitted).
{String} separator
The character to use for separating words in message topics. Defaults to .
.
{String} wildcard_one
The character to use for matching exactly one word in a message topic to a subscriber. Defaults to *
.
{String} wildcard_some
The character to use for matching zero or more words in a message topic to a subscriber. Defaults to #
.
{Integer} getdents_size
If positive, use getdents
to enumerate messages in bucket directories. getdents_size
is the buffer size to use with getdents
. Otherwise, use fs.readdir
(which is the default). If getdents
is requested but unavailable, a getdents_disabled
event will be emitted.
{Function (info, handlers, cb(err, ready, filtered_handlers)) | Array} filter
Function called before each message is processed.
You can use this to filter the subscribed handler functions to be called for the message (by passing the filtered list as the third argument to cb
).
If you want to ignore the message at this time then pass false
as the second argument to cb
. filter
will be called again later with the same message.
Defaults to a function which calls cb(null, true, handlers)
.
handlers
is an ES6 Set, or array if options.dedup
is falsey.
filtered_handlers
should be an ES6 Set, or array if options.dedup
is falsey. If not, new Set(filtered_handlers)
or Array.from(filtered_handlers)
will be used to convert it.
You can supply an array of filter functions - each will be called in turn with the filtered_handlers
from the previous one.
An array containing the filter functions is also available as the filters
property of the QlobberFSQ
object and can be modified at any time.
{Function (bucket)} [get_disruptor]
You can speed up message processing on a single multi-core server by using shared memory LMAX Disruptors. Message metadata and (if it fits) payload will be sent through the Disruptor. get_disruptor
will be called for each bucket number and should return the Disruptor to use for that bucket or null
. The same disruptor can be used for more than one bucket if you wish.
{Integer} refresh_ttl
If you use a shared memory LMAX Disruptor for a bucket (see get_disruptor
above), notification of new messages in the bucket is received through the Disruptor. However, checking for expired messages still needs to read the filesystem. refresh_ttl
is the time (in milliseconds) between checking for expired messages when a Disruptor is in use. Defaults to 10 seconds.
{Integer} disruptor_spin_interval
If a Disruptor is shared across multiple buckets or multiple QlobberFSQ
instances, contention can occur when publishing a message. In this case publish
will try again until it succeeds. disruptor_spin_interval
is the time (in milliseconds) to wait before retrying. Defaults to 0.
{Object} [direct_handler]
Object with the following methods, used for transferring messages direct from publisher to subscribers without writing them to disk:
{Function (filename, direct)} get_stream_for_publish
Called by publish
when truthy options.direct
is passed to it instead of writing data to disk. This method receives the name of the file to which data would have been written plus the value of options.direct
that was passed to publish
. Whatever it returns will be returned by the call to publish
.
{Function (filename)} get_stream_for_subscribers
Called when a stream published by calling publish
with truthy options.direct
needs to be given to subscribers. It receives the name of the file to which data would have been written. It must return a Readable stream.
{Function (filename, stream)} publish_stream_destroyed
Called when a stream returned by get_stream_for_publish()
has been destroyed or the message has expired. It receives the name of the file passed to `get_stream_for_publish() and the destroyed stream.
{Function (filename)} publish_stream_expired
Called when a stream returned by get_stream_for_publish()
has expired and should be destroyed. It receives the name of the file passed to get_stream_for_publish()
.
{Function (filename, stream)} subscriber_stream_destroyed
Called when a stream returned by get_stream_for_subscribers()
has been destroyed or the message has expired. It receives the name of the file passed to get_stream_for_subscribers()
and the destroyed stream.
{Function (filename)} subscriber_stream_ignored
Called when a stream published by calling publish
with truthy options.direct
doesn't have any subscribers. It receives the name of the file to which data would have been written.
Go: TOC
Subscribe to messages in the file system queue.
Parameters:
{String} topic
Which messages you're interested in receiving. Message topics are split into words using .
as the separator. You can use *
to match exactly one word in a topic or #
to match zero or more words. For example, foo.*
would match foo.bar
whereas foo.#
would match foo
, foo.bar
and foo.bar.wup
. Note you can change the separator and wildcard characters by specifying the separator
, wildcard_one
and wildcard_some
options when constructing QlobberFSQ
objects. See the qlobber
documentation for more information.
{Function} handler
Function to call when a new message is received on the file system queue and its topic matches against topic
. handler
will be passed the following arguments:
{Readable|Buffer} data
Readable stream or message content as a Buffer. By default you'll receive the message content. If handler
has a property accept_stream
set to a truthy value then you'll receive a stream. Note that all subscribers will receive the same stream or content for each message. You should take this into account when reading from the stream. The stream can be piped into multiple Writable streams but bear in mind it will go at the rate of the slowest one.
{Object} info
Metadata for the message, with the following properties:
{String} fname
Name of the file in which the message is stored.{String} path
Full path to the file in which the message is stored.{String} topic
Topic the message was published with.{String} [topic_path]
Full path to the file in which the topic overspill is stored (only present if the topic is too long to fit in the file name).{Integer} expires
When the message expires (number of milliseconds after 1 January 1970 00:00:00 UTC).{Boolean} single
Whether this message is being given to at most one subscriber (across all QlobberFSQ
objects).{Integer} size
Message size in bytes.{Function} done
Function to call once you've handled the message. Note that calling this function is only mandatory if info.single === true
, in order to delete and unlock the file. done
takes two arguments:
{Object} err
If an error occurred then pass details of the error, otherwise pass null
or undefined
.{Function} [finish]
Optional function to call once the message has been deleted and unlocked, in the case of info.single === true
, or straight away otherwise. It will be passed the following argument:
{Object} err
If an error occurred then details of the error, otherwise null
.{Object} [options]
Optional settings for this subscription:
{Boolean} subscribe_to_existing
If true
then handler
will be called with any existing, unexpired messages that match topic
, as well as new ones. Note that handler
will only receive new streams that were published by calling publish
with truthy options.direct
, not existing direct streams. Defaults to false
(only new messages).{Function} [cb]
Optional function to call once the subscription has been registered. This will be passed the following argument:
{Object} err
If an error occurred then details of the error, otherwise null
.Go: TOC | QlobberFSQ.prototype
Unsubscribe from messages in the file system queue.
Parameters:
{String} [topic]
Which messages you're no longer interested in receiving via the handler
function. This should be a topic you've previously passed to subscribe
. If topic is undefined
then all handlers for all topics are unsubscribed.{Function} [handler]
The function you no longer want to be called with messages published to the topic topic
. This should be a function you've previously passed to subscribe
. If you subscribed handler
to a different topic then it will still be called for messages which match that topic. If handler
is undefined
, all handlers for the topic topic
are unsubscribed.{Function} [cb]
Optional function to call once handler
has been unsubscribed from topic
. This will be passed the following argument:
{Object} err
If an error occurred then details of the error, otherwise null
.Go: TOC | QlobberFSQ.prototype
Publish a message to the file system queue.
Parameters:
{String} topic
Message topic. The topic should be a series of words separated by .
(or the separator
character you provided to the QlobberFSQ constructor
). Topic words can contain any character, unless you set encode_topics
to false
in the QlobberFSQ constructor
. In that case they can contain any valid filename character for your file system, although it's probably sensible to limit it to alphanumeric characters, -
, _
and .
.
{String | Buffer} [payload]
Message payload. If you don't pass a payload then publish
will return a Writable stream for you to write the payload into.
{Object} [options]
Optional settings for this publication:
{Boolean} single
If true
then the message will be given to at most one interested subscriber, across all QlobberFSQ
objects scanning the file system queue. Otherwise all interested subscribers will receive the message (the default).
{Integer} ttl
Time-to-live (in milliseconds) for this message. If you don't specify anything then single_ttl
or multi_ttl
(provided to the QlobberFSQ constructor
) will be used, depending on the value of single
. After the time-to-live for the message has passed, the message is ignored and deleted when convenient.
{String} encoding
If payload
is a string, the encoding to use when writing it out to the message file. Defaults to utf8
.
{Integer} mode
The file mode (permissions) to set on the message file. Defaults to octal 0666
(readable and writable to everyone).
{Function} hasher
A hash function to use for deciding into which bucket the message should be placed. The hash function should return a Buffer
at least 4 bytes long. It defaults to running md5
on the message file name. If you supply a hasher
function it will be passed the following arguments:
{String} fname
Message file name.{Integer} expires
When the message expires (number of milliseconds after 1 January 1970 00:00:00 UTC).{String} topic
Message topic.{String|Buffer} payload
Message payload.{Object} options
The optional settings for this publication.{Integer} bucket
Which bucket to write the message into, instead of using hasher
to calculate it.
{Boolean} ephemeral
This applies only if a shared memory LMAX Disruptor is being used for the message's bucket (see the get_disruptor
option to the QlobberFSQ constructor
). By default, the message is written both to the Disruptor and the filesystem. If ephemeral
is truthy, the message is written only to the Disruptor.
If the Disruptor's elements aren't large enough to contain the message's metadata, the message won't be written to the Disruptor and cb
(below) will receive an error with a property code
equal to the string buffer-too-small
.
However, if the Disruptor's elements aren't large enough for the message's payload, the message will be written to disk. The amount of space available in the Disruptor for the payload can be found via the ephemeral_size
property on the stream returned by this function. If your message won't fit and you don't want to write it to disk, emit an error
event on the stream without ending it.
{Any} direct
Defaults to false
. If truthy:
direct_handler.get_stream_for_publish()
method that was passed to the QlobberFSQ constructor
. It will be passed the name of the file to which data would otherwise have been written and the value of direct
.direct_handler.get_stream_for_publish()
returns.single
(i.e. publish to all interested subscribers).{Function} [cb]
Optional function to call once the message has been written to the file system queue. This will be called after the message has been moved into its bucket and is therefore available to subscribers in any QlobberFSQ
object scanning the queue. It will be passed the following arguments:
{Object} err
If an error occurred then details of the error, otherwise null
.
{Object} info
Metadata for the message. See subscribe
for a description of info
's properties.
Return:
{Stream | undefined}
A Writable stream if no payload
was passed, otherwise undefined
.
Go: TOC | QlobberFSQ.prototype
Stop scanning for new messages.
Parameters:
{Function} [cb]
Optional function to call once scanning has stopped. Alternatively, you can listen for the stop
event.Go: TOC | QlobberFSQ.prototype
Check the
UPDATE
file now rather than waiting for the next periodic check to occur
Go: TOC | QlobberFSQ.prototype
Scan for new messages in the
messages
sub-directory without checking whether theUPDATE
file has changed.
Go: TOC | QlobberFSQ.prototype
Given a radix to use for characters in bucket names and the number of digits in each name, return the number of buckets that can be represented.
Parameters:
{Integer} bucket_base
Radix for bucket name characters.{Integer} bucket_num_chars
Number of characters in bucket names.Return:
{Integer}
The number of buckets that can be represented.
Go: TOC | QlobberFSQ
start
event
QlobberFSQ
objects fire a start
event when they're ready to publish messages. Don't call publish
until the start
event is emitted or the message may be dropped. You can subscribe
to messages before start
is fired, however.
A start
event won't be fired after a stop
event.
Go: TOC | QlobberFSQ.events
stop
event
QlobberFSQ
objects fire a stop
event after you call stop_watching
and they've stopped scanning for new messages. Messages already read may still be being processed, however.
Go: TOC | QlobberFSQ.events
error
event
QlobberFSQ
objects fire an error
event if an error occurs before start
is emitted. The QlobberFSQ
object is unable to continue at this point and is not scanning for new messages.
Parameters:
{Object} err
The error that occurred.Go: TOC | QlobberFSQ.events
warning
event
QlobberFSQ
objects fire a warning
event if an error occurs after start
is emitted. The QlobberFSQ
object will still be scanning for new messages after emitting a warning
event.
Parameters:
{Object} err
The error that occurred.Go: TOC | QlobberFSQ.events
single_disabled
event
QlobberFSQ
objects fire a single_disabled
event if they can't support work queue semantics.
Parameters:
{Object} err
The error that caused single-subscriber messages not to be supported.Go: TOC | QlobberFSQ.events
getdents_disabled
event
QlobberFSQ
objects fire a getdents_disabled
event if they can't support enumerating bucket directories using getdents
.
Parameters:
{Object} err
The error that caused getdents
to be unavailable.Go: TOC | QlobberFSQ.events
—generated by apidox—
No vulnerabilities found.
No security vulnerabilities found.