Gathering detailed insights and metrics for @ramplex/clujo
Gathering detailed insights and metrics for @ramplex/clujo
Gathering detailed insights and metrics for @ramplex/clujo
Gathering detailed insights and metrics for @ramplex/clujo
Schedule interdependent tasks on a cron-like schedule with parallel task execution and shared, type-safe, context. Built in distributed locking to prevent overlapping executions in a clustered environment.
npm install @ramplex/clujo
Module System
Min. Node Version
Typescript Support
Node Version
NPM Version
4 Stars
146 Commits
1 Watching
1 Branches
1 Contributors
Updated on 29 Nov 2024
TypeScript (100%)
Cumulative downloads
Total Downloads
Last day
6,300%
64
Compared to previous day
Last week
650%
240
Compared to previous week
Last month
380.5%
370
Compared to previous month
Last year
0%
447
Compared to previous year
2
IMPORTANT: clujo is now published under @ramplex/clujo
on npm and jsr. If you are using the old clujo
package, please update your dependencies to use @ramplex/clujo
instead. All future versions will be published under the new package name.
Clujo is a flexible solution for managing scheduled tasks in your distributed Node.js / Deno applications. It would not be possible without the amazing work of the following projects:
ioredis
instance is provided to start method) - used to ensure single execution in a distributed environmentComing soon: validated bun support.
runOnStartup
feature, Clujo allows you to run tasks immediately when needed, in addition to their scheduled times.Clujo is available on jsr and npm, and supports Node.js and Deno v2.0.0 or later.
Install Clujo using npm, pnpm, yarn:
1npm install @ramplex/clujo 2yarn add @ramplex/clujo 3pnpm install @ramplex/clujo
1deno add jsr:@ramplex/clujo
1npx jsr add @ramplex/clujo 2yarn dlx jsr add @ramplex/clujo 3pnpm dlx jsr add @ramplex/clujo
Here's a simple example to get you started with Clujo:
1import { TaskGraph, Clujo } from '@ramplex/clujo'; 2// or (in node.js) 3// const { TaskGraph, Clujo } = require('@ramplex/clujo'); 4 5// Define your tasks 6const tasks = new TaskGraph({ 7 // Optional: provide initial context value 8 contextValue: { initialData: "some value" }, 9 // Optional: provide dependencies available to all tasks 10 dependencies: { logger: console } 11}) 12 .addTask({ 13 id: "task1", 14 execute: async ({ deps, ctx }) => { 15 deps.logger.log("Task 1 executing"); 16 deps.logger.log("Initial data:", ctx.initial.initialData); 17 return "Task 1 result"; 18 }, 19 }) 20 .addTask({ 21 id: "task2", 22 execute: async ({ deps, ctx }) => { 23 deps.logger.log("Task 2 executing"); 24 // since task2 depends on task1, it will have access to the result of task1 25 deps.logger.log("Task 1 result:", ctx.task1); 26 return "Task 2 result"; 27 }, 28 // will only execute after task 1 completes 29 dependencies: ["task1"], 30 }) 31 .addTask({ 32 id: "task3", 33 execute: async ({ deps, ctx }) => { 34 deps.logger.log("Task 3 executing"); 35 return "Task 3 result"; 36 }, 37 // since task3 has no dependencies, it will run in parallel with task1 at the start of execution and it does not have guaranteed access to any other task's result 38 }) 39 .build({ 40 // Optional: provide a (sync or async) function to run when the job completes that takes in the completed context object 41 // dependencies, and errors (list of TaskError for each task that failed if any errors occurred, otherwise null) 42 onTasksCompleted: (ctx, deps, errors) => console.log(ctx, deps, errors), 43 }); 44 45// Create a Clujo instance 46const clujo = new Clujo({ 47 id: "myClujoJob", 48 cron: { 49 // You can provide either a single pattern 50 pattern: "*/5 * * * * *", // Every 5 seconds 51 // OR multiple patterns 52 patterns: [ 53 "*/5 * * * * *", // Every 5 seconds 54 "0 */2 * * *", // Every 2 hours 55 new Date("2024-12-25T00:00:00") // One-time execution on Christmas 56 ], 57 // Optional: provide options for the Cron run (Croner `CronOptions`) 58 options: { tz: "America/New_York" } 59 }, 60 taskGraphRunner: tasks, 61 // Optional: provide an ioredis client for distributed locking 62 // In a clustered / multi-instance environment, this will prevent overlapping executions 63 redis: { client: new Redis() }, 64 // Optional: run the job immediately on startup, independent of the schedules 65 runOnStartup: true, 66}); 67 68// Start the job 69clujo.start(); 70 71// Trigger the job manually to get a complete context object 72const completedContext = await clujo.trigger(); 73 74// Gracefully stop the job by waiting until the current execution completes 75// Will force stop after timeout milliseconds 76await clujo.stop(timeoutMs);
In the event a Javascript Date
object is provided in the patterns array or as a singular pattern, the task graph
will be executed precisely once at the specific date/time specified for that pattern. Time is in ISO 8601 local time.
When using multiple patterns, if executions overlap, Clujo will prevent concurrent executions.
The context object contains the appropriate context for the task.
i
depends on tasks j_1,...,j_n
, then it can be guaranteed the context object will have the result of tasks j_1,...,j_n
under the keys j_1,...,j_n
. The value at these keys is the return of task j_i
, i = 1,...,n
.i-1
, i=1,...,N
. All tasks run sequentially1 <= i != j <= N
. N tasks where task i
depends on task j
. N\{i}
tasks run concurrently, task i
runs after task j
.i
depends on task j
, task j
depends on task i
. Cyclic dependencies will result in an error pre-execution.In the event a task execution fails, all further dependent tasks will not be executed. Other independent tasks will continue to run.
More complex cases can be built from the above examples.
1// Using a static context value 2const tasks = new TaskGraph({ 3 contextValue: { users: [], config: {} }, 4 dependencies: { logger: console } 5}) 6 .addTask({ 7 id: "task1", 8 execute: ({ deps, ctx }) => { 9 deps.logger.log(ctx.initial.users); 10 return "result"; 11 } 12 }) 13 .build(); 14 15// Using a (sync or async) context factory 16const tasks = new TaskGraph({ 17 contextFactory: async (deps) => { 18 const users = await fetchUsers(); 19 return { users }; 20 }, 21 dependencies: { logger: console } 22}) 23 .addTask({ 24 id: "task1", 25 execute: ({ deps, ctx }) => { 26 deps.logger.log(ctx.initial.users); 27 return "result"; 28 } 29 }) 30 .build();
The context and dependencies are type-safe, ensuring you can only access properties that actually exist.
Tasks can access their dependencies' results through the context object, and all tasks have access to the initial context (when set) under ctx.initial
.
When an ioredis
client is provided, Clujo will use it to acquire a lock for each task execution. This ensures that tasks are not executed concurrently in a distributed environment.
1import Redis from 'ioredis'; 2 3const client = new Redis(); 4 5new Clujo({ 6 id: "myClujoJob", 7 cron: { pattern: "*/5 * * * * *" }, 8 taskGraphRunner: tasks, 9 redis: { 10 client, 11 lockOptions: { /* optional redis-semaphore lock options */ } 12 }, 13});
You can run tasks immediately when the job starts by setting the runOnStartup
option to true
.
The triggered execution will prevent a scheduled execution from running at the same time in the event
the scheduled execution overlaps with the triggered execution.
1new Clujo({ 2 id: "myClujoJob", 3 cron: { pattern: "*/5 * * * * *" }, 4 taskGraphRunner: tasks, 5 runOnStartup: true, 6});
Tasks can have their own error handlers, allowing you to define custom logic for handling failures. The function can be synchronous or asynchronous, and has access to the same context as the execute function.
1.addTask({ 2 id: "taskWithErrorHandler", 3 execute: async ({ deps, ctx }) => { 4 // Task logic 5 }, 6 errorHandler: async (error, { deps, ctx }) => { 7 console.error("Task failed:", error); 8 } 9})
Specify a retry policy for a task to automatically retry failed executions. The task will be retried up to maxRetries
times, with a delay of retryDelayMs
between each retry.
1.addTask({ 2 id: "taskWithRetry", 3 execute: async ({ deps, ctx }) => { 4 // Task logic 5 }, 6 retryPolicy: { maxRetries: 3, retryDelayMs: 1000 } 7})
The Scheduler class provides a convenient way to manage multiple Clujo jobs together. It allows you to add, start, and stop groups of jobs in a centralized manner.
1import { Scheduler } from '@ramplex/clujo'; 2import { Redis } from 'ioredis'; 3 4const scheduler = new Scheduler(); 5 6// Add jobs to the scheduler 7scheduler.addJob(myFirstClujo); 8scheduler.addJob(mySecondClujo); 9 10// Add more jobs as needed
You can start all added jobs at once, optionally providing a Redis instance for distributed locking:
1// Start all jobs 2scheduler.start();
To stop all running jobs:
1// Stop all jobs with a default timeout of 5000ms 2await scheduler.stop(); 3 4// Or, specify a custom timeout in milliseconds 5await scheduler.stop(10000);
Contributions are welcome! Please describe the contribution in an issue before submitting a pull request. Attach the issue number to the pull request description and include tests for new features / bug fixes.
Clujo is MIT licensed.
No vulnerabilities found.
No security vulnerabilities found.