Gathering detailed insights and metrics for node-resque
Gathering detailed insights and metrics for node-resque
Gathering detailed insights and metrics for node-resque
Gathering detailed insights and metrics for node-resque
npm install node-resque
Module System
Min. Node Version
Typescript Support
Node Version
NPM Version
1,371 Stars
1,427 Commits
151 Forks
40 Watching
7 Branches
70 Contributors
Updated on 25 Nov 2024
TypeScript (98.72%)
Shell (0.57%)
Ruby (0.34%)
Lua (0.28%)
JavaScript (0.09%)
Cumulative downloads
Total Downloads
Last day
-21.6%
3,104
Compared to previous day
Last week
-12.9%
19,771
Compared to previous week
Last month
5.2%
91,536
Compared to previous month
Last year
43.1%
991,243
Compared to previous year
1
Distributed delayed jobs in nodejs. Resque is a background job system backed by Redis (version 2.6.0 and up required). It includes priority queues, plugins, locking, delayed jobs, and more! This project is a very opinionated but API-compatible with Resque and Sidekiq (caveats). We also implement some of the popular Resque plugins, including resque-scheduler and resque-retry
The full API documentation for this package is automatically generated from the main
via typedoc branch and published to https://node-resque.actionherojs.com/
Resque is a queue based task processing system that can be thought of as a "Kanban" style factory. Workers in this factory can each only work one Job at a time. They pull Jobs from Queues and work them to completion (or failure). Each Job has two parts: instructions on how to complete the job (the perform
function), and any inputs necessary to complete the Job.
In our factory example, Queues are analogous to conveyor belts. Jobs are placed on the belts (Queues) and are held in order waiting for a Worker to pick them up. There are three types of Queues: regular work Queues, Delayed Job Queues, and the Failed Job Queue. The Delayed Job Queues contains Job definitions that are intended to be worked at or in a specified time. The Failed Job Queue is where Workers place any Jobs that have failed during execution.
Our Workers are the heart of the factory. Each Worker is assigned one or more Queues to check for work. After taking a Job from a Queue the Worker attempts to complete the Job. If successful, they go back to check out more work from the Queues. However, if there is a failure, the Worker records the job and its inputs in the Failed Jobs Queue before going back for more work.
The Scheduler can be thought of as a specialized type of Worker. Unlike other Workers, the Scheduler does not execute any Jobs, instead it manages the Delayed Job Queue. As Job definitions are added to the Delayed Job Queue they must specify when they can become available for execution. The Scheduler constantly checks to see if any Delayed Jobs are ready to execute. When a Delayed Job becomes ready for execution the Scheduler places a new instance of that Job in its defined Queue.
You can read the API docs for Node Resque @ node-resque.actionherojs.com. These are generated automatically from the master branch via TypeDoc
I learn best by examples:
1import { Worker, Plugins, Scheduler, Queue } from "node-resque"; 2 3async function boot() { 4 // //////////////////////// 5 // SET UP THE CONNECTION // 6 // //////////////////////// 7 8 const connectionDetails = { 9 pkg: "ioredis", 10 host: "127.0.0.1", 11 password: null, 12 port: 6379, 13 database: 0, 14 // namespace: 'resque', 15 // looping: true, 16 // options: {password: 'abc'}, 17 }; 18 19 // /////////////////////////// 20 // DEFINE YOUR WORKER TASKS // 21 // /////////////////////////// 22 23 let jobsToComplete = 0; 24 25 const jobs = { 26 add: { 27 plugins: [Plugins.JobLock], 28 pluginOptions: { 29 JobLock: { reEnqueue: true }, 30 }, 31 perform: async (a, b) => { 32 await new Promise((resolve) => { 33 setTimeout(resolve, 1000); 34 }); 35 jobsToComplete--; 36 tryShutdown(); 37 38 const answer = a + b; 39 return answer; 40 }, 41 }, 42 subtract: { 43 perform: (a, b) => { 44 jobsToComplete--; 45 tryShutdown(); 46 47 const answer = a - b; 48 return answer; 49 }, 50 }, 51 }; 52 53 // just a helper for this demo 54 async function tryShutdown() { 55 if (jobsToComplete === 0) { 56 await new Promise((resolve) => { 57 setTimeout(resolve, 500); 58 }); 59 await scheduler.end(); 60 await worker.end(); 61 process.exit(); 62 } 63 } 64 65 // ///////////////// 66 // START A WORKER // 67 // ///////////////// 68 69 const worker = new Worker( 70 { connection: connectionDetails, queues: ["math", "otherQueue"] }, 71 jobs, 72 ); 73 await worker.connect(); 74 worker.start(); 75 76 // //////////////////// 77 // START A SCHEDULER // 78 // //////////////////// 79 80 const scheduler = new Scheduler({ connection: connectionDetails }); 81 await scheduler.connect(); 82 scheduler.start(); 83 84 // ////////////////////// 85 // REGISTER FOR EVENTS // 86 // ////////////////////// 87 88 worker.on("start", () => { 89 console.log("worker started"); 90 }); 91 worker.on("end", () => { 92 console.log("worker ended"); 93 }); 94 worker.on("cleaning_worker", (worker, pid) => { 95 console.log(`cleaning old worker ${worker}`); 96 }); 97 worker.on("poll", (queue) => { 98 console.log(`worker polling ${queue}`); 99 }); 100 worker.on("ping", (time) => { 101 console.log(`worker check in @ ${time}`); 102 }); 103 worker.on("job", (queue, job) => { 104 console.log(`working job ${queue} ${JSON.stringify(job)}`); 105 }); 106 worker.on("reEnqueue", (queue, job, plugin) => { 107 console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`); 108 }); 109 worker.on("success", (queue, job, result, duration) => { 110 console.log( 111 `job success ${queue} ${JSON.stringify( 112 job, 113 )} >> ${result} (${duration}ms)`, 114 ); 115 }); 116 worker.on("failure", (queue, job, failure, duration) => { 117 console.log( 118 `job failure ${queue} ${JSON.stringify( 119 job, 120 )} >> ${failure} (${duration}ms)`, 121 ); 122 }); 123 worker.on("error", (error, queue, job) => { 124 console.log(`error ${queue} ${JSON.stringify(job)} >> ${error}`); 125 }); 126 worker.on("pause", () => { 127 console.log("worker paused"); 128 }); 129 130 scheduler.on("start", () => { 131 console.log("scheduler started"); 132 }); 133 scheduler.on("end", () => { 134 console.log("scheduler ended"); 135 }); 136 scheduler.on("poll", () => { 137 console.log("scheduler polling"); 138 }); 139 scheduler.on("leader", () => { 140 console.log("scheduler became leader"); 141 }); 142 scheduler.on("error", (error) => { 143 console.log(`scheduler error >> ${error}`); 144 }); 145 scheduler.on("cleanStuckWorker", (workerName, errorPayload, delta) => { 146 console.log( 147 `failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`, 148 ); 149 }); 150 scheduler.on("workingTimestamp", (timestamp) => { 151 console.log(`scheduler working timestamp ${timestamp}`); 152 }); 153 scheduler.on("transferredJob", (timestamp, job) => { 154 console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`); 155 }); 156 157 // ////////////////////// 158 // CONNECT TO A QUEUE // 159 // ////////////////////// 160 161 const queue = new Queue({ connection: connectionDetails }, jobs); 162 queue.on("error", function (error) { 163 console.log(error); 164 }); 165 await queue.connect(); 166 await queue.enqueue("math", "add", [1, 2]); 167 await queue.enqueue("math", "add", [1, 2]); 168 await queue.enqueue("math", "add", [2, 3]); 169 await queue.enqueueIn(3000, "math", "subtract", [2, 1]); 170 jobsToComplete = 4; 171} 172 173boot(); 174 175// and when you are done 176// await queue.end() 177// await scheduler.end() 178// await worker.end()
There are 3 main classes in node-resque
: Queue, Worker, and Scheduler
error
queue.
multiWorker
in Node Resque which will run many workers at once for you (see below).queue.enqueueIn
or queue.enqueueAt
) should be processed, but it performs some other jobs like checking for 'stuck' workers and general cluster cleanup.
node-resque
can not guarantee when a job is going to be executed, only when it will become available for execution (added to a Queue).new queue
requires only the "queue" variable to be set. If you intend to run plugins with beforeEnqueue
or afterEnqueue
hooks, you should also pass the jobs
object to it.new worker
has some additional options:1options = { 2 looping: true, 3 timeout: 5000, 4 queues: "*", 5 name: os.hostname() + ":" + process.pid, 6};
Note that when using "*"
queue:
The configuration hash passed to new NodeResque.Worker
, new NodeResque.Scheduler
or new NodeResque.Queue
can also take a connection
option.
1const connectionDetails = { 2 pkg: "ioredis", 3 host: "127.0.0.1", 4 password: "", 5 port: 6379, 6 database: 0, 7 namespace: "resque", // Also allow array of strings 8}; 9 10const worker = new NodeResque.Worker( 11 { connection: connectionDetails, queues: "math" }, 12 jobs, 13); 14 15worker.on("error", (error) => { 16 // handler errors 17}); 18 19await worker.connect(); 20worker.start(); 21 22// and when you are done 23// await worker.end()
You can also pass redis client directly.
1// assume you already initialized redis client before 2// the "redis" key can be IORedis.Redis or IORedis.Cluster instance 3 4const redisClient = new Redis(); 5const connectionDetails = { redis: redisClient }; 6 7// or 8 9const redisCluster = new Cluster(); 10const connectionDetails = { redis: redisCluster }; 11 12const worker = new NodeResque.Worker( 13 { connection: connectionDetails, queues: "math" }, 14 jobs, 15); 16 17worker.on("error", (error) => { 18 // handler errors 19}); 20 21await worker.connect(); 22worker.start(); 23 24// and when you are done 25await worker.end();
await worker.end()
, await queue.end()
and await scheduler.end()
before shutting down your application if you want to properly clear your worker status from resque.keyPrefix
if you are using the ioredis
(default) redis driver in this project (see https://github.com/actionhero/node-resque/issues/245 for more information.)beforeEnqueue
or afterEnqueue
, be sure to pass the jobs
argument to the new NodeResque.Queue()
constructorfailed
queue. You can then inspect these jobs, write a plugin to manage them, move them back to the normal queues, etc. Failure behavior by default is just to enter the failed
queue, but there are many options. Check out these examples from the ruby ecosystem for inspiration:
hostname:pid+unique_id
. For example:If you want to learn more about running Node-Resque with docker, please view the examples here: https://github.com/actionhero/node-resque/tree/master/examples/docker
1const name = os.hostname() + ":" + process.pid + "+" + counter; 2const worker = new NodeResque.Worker( 3 { connection: connectionDetails, queues: "math", name: name }, 4 jobs, 5);
DO NOT USE THIS IN PRODUCTION. In tests or special cases, you may want to process/work a job in-line. To do so, you can use worker.performInline(jobName, arguments, callback)
. If you are planning on running a job via #performInline, this worker should also not be started, nor should be using event emitters to monitor this worker. This method will also not write to redis at all, including logging errors, modify resque's stats, etc.
1const queue = new NodeResque.Queue({ connection: connectionDetails, jobs }); 2await queue.connect();
API documentation for the main methods you will be using to enqueue jobs to be worked can be found @ node-resque.actionherojs.com.
From time to time, your jobs/workers may fail. Resque workers will move failed jobs to a special failed
queue which will store the original arguments of your job, the failing stack trace, and additional metadata.
You can work with these failed jobs with the following methods:
let failedCount = await queue.failedCount()
failedCount
is the number of jobs in the failed queuelet failedJobs = await queue.failed(start, stop)
failedJobs
is an array listing the data of the failed jobs. Each element looks like:
{"worker": "host:pid", "queue": "test_queue", "payload": {"class":"slowJob", "queue":"test_queue", "args":[null]}, "exception": "TypeError", "error": "MyImport is not a function", "backtrace": [' at Worker.perform (/path/to/worker:111:24)', ' at <anonymous>'], "failed_at": "Fri Dec 12 2014 14:01:16 GMT-0800 (PST)"}
await queue.failed(0, -1)
We use a try/catch pattern to catch errors in your jobs. If any job throws an uncaught exception, it will be caught, and the job's payload moved to the error queue for inspection. Do not use domain, process.on("exit"), or any other method of "catching" a process crash.
The error payload looks like:
1{ worker: 'busted-worker-3', 2 queue: 'busted-queue', 3 payload: { class: 'busted_job', queue: 'busted-queue', args: [ 1, 2, 3 ] }, 4 exception: 'ERROR_NAME', 5 error: 'I broke', 6 failed_at: 'Sun Apr 26 2015 14:00:44 GMT+0100 (BST)' }
await queue.removeFailed(failedJob)
failedJob
is an expanded node object representing the failed job, retrieved via queue.failed
await queue.retryAndRemoveFailed(failedJob)
failedJob
is an expanded node object representing the failed job, retrieved via queue.failed
By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every timeout
(default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire.
To modify the 60 minute check, change stuckWorkerTimeout
when configuring your scheduler, ie:
1const scheduler = new NodeResque.Scheduler({ 2 stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms 3 connection: connectionDetails 4})
Set your scheduler's stuckWorkerTimeout = false
to disable this behavior.
1const scheduler = new NodeResque.Scheduler({
2 stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis
3 connection: connectionDetails
4})
Sometimes a worker crashes is a severe way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, await queue.cleanOldWorkers(age)
is available.
Because there are no 'heartbeats' in resque, it is impossible for the application to know if a worker has been working on a long job or it is dead. You are required to provide an "age" for how long a worker has been "working", and all those older than that age will be removed, and the job they are working on moved to the error queue (where you can then use queue.retryAndRemoveFailed
) to re-enqueue the job.
If you know the name of a worker that should be removed, you can also call await queue.forceCleanWorker(workerName)
directly, and that will also remove the worker and move any job it was working on into the error queue. This method will still proceed for workers which are only partially in redis, indicting a previous connection failure. In this case, the job which the worker was working on is irrecoverably lost.
You may want to use node-resque to schedule jobs every minute/hour/day, like a distributed CRON system. There are a number of excellent node packages to help you with this, like node-schedule and node-cron. Node-resque makes it possible for you to use the package of your choice to schedule jobs with.
Assuming you are running node-resque across multiple machines, you will need to ensure that only one of your processes is actually scheduling the jobs. To help you with this, you can inspect which of the scheduler processes is currently acting as leader, and flag only the master scheduler process to run the schedule. A full example can be found at /examples/scheduledJobs.ts, but the relevant section is:
1const NodeResque = require("node-resque"); 2const schedule = require("node-schedule"); 3const queue = new NodeResque.Queue({ connection: connectionDetails }, jobs); 4const scheduler = new NodeResque.Scheduler({ connection: connectionDetails }); 5await scheduler.connect(); 6scheduler.start(); 7 8schedule.scheduleJob("10,20,30,40,50 * * * * *", async () => { 9 // do this job every 10 seconds, CRON style 10 // we want to ensure that only one instance of this job is scheduled in our environment at once, 11 // no matter how many schedulers we have running 12 if (scheduler.leader) { 13 console.log(">>> enqueuing a job"); 14 await queue.enqueue("time", "ticktock", new Date().toString()); 15 } 16});
Just like ruby's resque, you can write worker plugins. They look like this. The 4 hooks you have are beforeEnqueue
, afterEnqueue
, beforePerform
, and afterPerform
. Plugins are classes
which extend NodeResque.Plugin
1const { Plugin } = require("node-resque"); 2 3class MyPlugin extends Plugin { 4 constructor(...args) { 5 // @ts-ignore 6 super(...args); 7 this.name = "MyPlugin"; 8 } 9 10 beforeEnqueue() { 11 // console.log("** beforeEnqueue") 12 return true; // should the job be enqueued? 13 } 14 15 afterEnqueue() { 16 // console.log("** afterEnqueue") 17 } 18 19 beforePerform() { 20 // console.log("** beforePerform") 21 return true; // should the job be run? 22 } 23 24 afterPerform() { 25 // console.log("** afterPerform") 26 } 27}
And then your plugin can be invoked within a job like this:
1const jobs = { 2 add: { 3 plugins: [MyPlugin], 4 pluginOptions: { 5 MyPlugin: { thing: "stuff" }, 6 }, 7 perform: (a, b) => { 8 let answer = a + b; 9 return answer; 10 }, 11 }, 12};
notes
true
or false
on the before hooks. true
indicates that the action should continue, and false
prevents it. This is called toRun
.this.worker.error
in your plugin. If this.worker.error
is null, no error will be logged in the resque error queue.src/plugins/*
directory. You can write your own and include it like this:1const jobs = { 2 add: { 3 plugins: [require("Myplugin").Myplugin], 4 pluginOptions: { 5 MyPlugin: { thing: "stuff" }, 6 }, 7 perform: (a, b) => { 8 let answer = a + b; 9 return answer; 10 }, 11 }, 12};
The plugins which are included with this package are:
DelayQueueLock
JobLock
QueueLock
Retry
node-resque
provides a wrapper around the Worker
class which will auto-scale the number of resque workers. This will process more than one job at a time as long as there is idle CPU within the event loop. For example, if you have a slow job that sends email via SMTP (with low overhead), we can process many jobs at a time, but if you have a math-heavy operation, we'll stick to 1. The MultiWorker
handles this by spawning more and more node-resque workers and managing the pool.
1const NodeResque = require("node-resque"); 2 3const connectionDetails = { 4 pkg: "ioredis", 5 host: "127.0.0.1", 6 password: "", 7}; 8 9const multiWorker = new NodeResque.MultiWorker( 10 { 11 connection: connectionDetails, 12 queues: ["slowQueue"], 13 minTaskProcessors: 1, 14 maxTaskProcessors: 100, 15 checkTimeout: 1000, 16 maxEventLoopDelay: 10, 17 }, 18 jobs, 19); 20 21// normal worker emitters 22multiWorker.on("start", (workerId) => { 23 console.log("worker[" + workerId + "] started"); 24}); 25multiWorker.on("end", (workerId) => { 26 console.log("worker[" + workerId + "] ended"); 27}); 28multiWorker.on("cleaning_worker", (workerId, worker, pid) => { 29 console.log("cleaning old worker " + worker); 30}); 31multiWorker.on("poll", (workerId, queue) => { 32 console.log("worker[" + workerId + "] polling " + queue); 33}); 34multiWorker.on("ping", (workerId, time) => { 35 console.log("worker[" + workerId + "] check in @ " + time); 36}); 37multiWorker.on("job", (workerId, queue, job) => { 38 console.log( 39 "worker[" + workerId + "] working job " + queue + " " + JSON.stringify(job), 40 ); 41}); 42multiWorker.on("reEnqueue", (workerId, queue, job, plugin) => { 43 console.log( 44 "worker[" + 45 workerId + 46 "] reEnqueue job (" + 47 plugin + 48 ") " + 49 queue + 50 " " + 51 JSON.stringify(job), 52 ); 53}); 54multiWorker.on("success", (workerId, queue, job, result) => { 55 console.log( 56 "worker[" + 57 workerId + 58 "] job success " + 59 queue + 60 " " + 61 JSON.stringify(job) + 62 " >> " + 63 result, 64 ); 65}); 66multiWorker.on("failure", (workerId, queue, job, failure) => { 67 console.log( 68 "worker[" + 69 workerId + 70 "] job failure " + 71 queue + 72 " " + 73 JSON.stringify(job) + 74 " >> " + 75 failure, 76 ); 77}); 78multiWorker.on("error", (workerId, queue, job, error) => { 79 console.log( 80 "worker[" + 81 workerId + 82 "] error " + 83 queue + 84 " " + 85 JSON.stringify(job) + 86 " >> " + 87 error, 88 ); 89}); 90multiWorker.on("pause", (workerId) => { 91 console.log("worker[" + workerId + "] paused"); 92}); 93multiWorker.on("multiWorkerAction", (verb, delay) => { 94 console.log( 95 "*** checked for worker status: " + 96 verb + 97 " (event loop delay: " + 98 delay + 99 "ms)", 100 ); 101}); 102 103multiWorker.start();
The Options available for the multiWorker are:
connection
: The redis configuration options (same as worker)queues
: Array of ordered queue names (or *
) (same as worker)minTaskProcessors
: The minimum number of workers to spawn under this multiWorker, even if there is no work to do. You need at least one, or no work will ever be processed or checkedmaxTaskProcessors
: The maximum number of workers to spawn under this multiWorker, even if the queues are long and there is available CPU (the event loop isn't entirely blocked) to this node process.checkTimeout
: How often to check if the event loop is blocked (in ms) (for adding or removing multiWorker children),maxEventLoopDelay
: How long the event loop has to be delayed before considering it blocked (in ms),This package was featured heavily in this presentation I gave about background jobs + node.js. It contains more examples!
No vulnerabilities found.
Reason
no dangerous workflow patterns detected
Reason
no binaries found in the repo
Reason
license file detected
Details
Reason
5 commit(s) and 0 issue activity found in the last 90 days -- score normalized to 4
Reason
security policy file detected
Details
Reason
7 existing vulnerabilities detected
Details
Reason
dependency not pinned by hash detected -- score normalized to 2
Details
Reason
Found 0/5 approved changesets -- score normalized to 0
Reason
detected GitHub workflow tokens with excessive permissions
Details
Reason
no effort to earn an OpenSSF best practices badge detected
Reason
project is not fuzzed
Details
Reason
SAST tool is not run on all commits -- score normalized to 0
Details
Score
Last Scanned on 2024-11-25
The Open Source Security Foundation is a cross-industry collaboration to improve the security of open source software (OSS). The Scorecard provides security health metrics for open source projects.
Learn More