Gathering detailed insights and metrics for nestjs-affinity-queue
Gathering detailed insights and metrics for nestjs-affinity-queue
Gathering detailed insights and metrics for nestjs-affinity-queue
Gathering detailed insights and metrics for nestjs-affinity-queue
npm install nestjs-affinity-queue
Typescript
Module System
Node Version
NPM Version
TypeScript (70.76%)
Shell (28.04%)
JavaScript (1.21%)
Total Downloads
0
Last Day
0
Last Week
0
Last Month
0
Last Year
0
MIT License
17 Commits
1 Branches
1 Contributors
Updated on Jul 11, 2025
Latest Version
1.0.16
Package Id
nestjs-affinity-queue@1.0.16
Unpacked Size
192.74 kB
Size
34.64 kB
File Count
41
NPM Version
10.9.2
Node Version
23.11.0
Published on
Jul 11, 2025
Cumulative downloads
Total Downloads
Last Day
0%
NaN
Compared to previous day
Last Week
0%
NaN
Compared to previous week
Last Month
0%
NaN
Compared to previous month
Last Year
0%
NaN
Compared to previous year
一个基于 BullMQ 的 NestJS 队列模块,支持任务亲和性和分布式调度。
1npm install nestjs-affinity-queue
1import { Module } from '@nestjs/common'; 2import { QueueModule } from 'nestjs-affinity-queue'; 3 4@Module({ 5 imports: [ 6 QueueModule.forRoot({ 7 role: 'BOTH', // SCHEDULER, WORKER, 或 BOTH 8 workerOptions: { 9 maxBatchSize: 10, 10 workerCount: 2, 11 }, 12 redisOptions: { 13 host: 'localhost', 14 port: 6379, 15 }, 16 queueOptions: { 17 pendingQueueName: 'pending-tasks', 18 workerQueuePrefix: 'worker-', 19 schedulerInterval: 1000, 20 }, 21 electionOptions: { 22 electionLockTtl: 30000, 23 heartbeatInterval: 5000, 24 heartbeatTimeout: 15000, 25 }, 26 }), 27 ], 28}) 29export class AppModule {}
1import { Module } from '@nestjs/common'; 2import { ConfigModule, ConfigService } from '@nestjs/config'; 3import { QueueModule } from 'nestjs-affinity-queue'; 4 5@Module({ 6 imports: [ 7 ConfigModule.forRoot(), 8 QueueModule.forRootAsync({ 9 imports: [ConfigModule], 10 useFactory: async (configService: ConfigService) => ({ 11 role: 'BOTH', 12 workerOptions: { 13 maxBatchSize: configService.get('QUEUE_MAX_BATCH_SIZE', 10), 14 workerCount: configService.get('QUEUE_WORKER_COUNT', 2), 15 }, 16 redisOptions: { 17 host: configService.get('REDIS_HOST', 'localhost'), 18 port: configService.get('REDIS_PORT', 6379), 19 password: configService.get('REDIS_PASSWORD'), 20 db: configService.get('REDIS_DB', 0), 21 }, 22 queueOptions: { 23 pendingQueueName: configService.get('QUEUE_PENDING_NAME', 'pending-tasks'), 24 workerQueuePrefix: configService.get('QUEUE_WORKER_PREFIX', 'worker-'), 25 schedulerInterval: configService.get('QUEUE_SCHEDULER_INTERVAL', 1000), 26 }, 27 electionOptions: { 28 electionLockTtl: configService.get('ELECTION_LOCK_TTL', 30000), 29 heartbeatInterval: configService.get('ELECTION_HEARTBEAT_INTERVAL', 5000), 30 heartbeatTimeout: configService.get('ELECTION_HEARTBEAT_TIMEOUT', 15000), 31 }, 32 }), 33 inject: [ConfigService], 34 }), 35 ], 36}) 37export class AppModule {}
1import { Module } from '@nestjs/common'; 2import { QueueModule } from 'nestjs-affinity-queue'; 3 4@Module({ 5 imports: [ 6 // 注册一个名为 'email-queue' 的特性队列 7 QueueModule.forFeature({ 8 name: 'email-queue', 9 role: 'WORKER', 10 workerOptions: { 11 maxBatchSize: 5, 12 workerCount: 1, 13 }, 14 queueOptions: { 15 pendingQueueName: 'email-pending-tasks', 16 workerQueuePrefix: 'email-worker-', 17 workerStatePrefix: 'email-worker-state-', 18 schedulerInterval: 2000, 19 }, 20 electionOptions: { 21 electionLockTtl: 60000, 22 heartbeatInterval: 10000, 23 heartbeatTimeout: 30000, 24 }, 25 }), 26 ], 27}) 28export class EmailModule {}
1import { Module } from '@nestjs/common'; 2import { ConfigModule, ConfigService } from '@nestjs/config'; 3import { QueueModule } from 'nestjs-affinity-queue'; 4 5@Module({ 6 imports: [ 7 ConfigModule, 8 QueueModule.forFeatureAsync('email-queue', { 9 imports: [ConfigModule], 10 useFactory: async (configService: ConfigService) => ({ 11 name: 'email-queue', 12 role: 'WORKER', 13 workerOptions: { 14 maxBatchSize: configService.get('EMAIL_BATCH_SIZE', 5), 15 workerCount: configService.get('EMAIL_WORKER_COUNT', 1), 16 }, 17 queueOptions: { 18 pendingQueueName: configService.get('EMAIL_QUEUE_NAME', 'email-pending-tasks'), 19 workerQueuePrefix: configService.get('EMAIL_WORKER_PREFIX', 'email-worker-'), 20 workerStatePrefix: configService.get('EMAIL_WORKER_STATE_PREFIX', 'email-worker-state-'), 21 schedulerInterval: configService.get('EMAIL_SCHEDULER_INTERVAL', 2000), 22 }, 23 electionOptions: { 24 electionLockTtl: configService.get('EMAIL_ELECTION_LOCK_TTL', 60000), 25 heartbeatInterval: configService.get('EMAIL_HEARTBEAT_INTERVAL', 10000), 26 heartbeatTimeout: configService.get('EMAIL_HEARTBEAT_TIMEOUT', 30000), 27 }, 28 }), 29 inject: [ConfigService], 30 }), 31 ], 32}) 33export class EmailModule {}
1import { Module } from '@nestjs/common'; 2import { QueueModule } from 'nestjs-affinity-queue'; 3 4@Module({ 5 imports: [ 6 // 全局默认队列 7 QueueModule.forRoot({ 8 role: 'BOTH', 9 workerOptions: { maxBatchSize: 10 }, 10 queueOptions: { pendingQueueName: 'default-pending' }, 11 }), 12 13 // 邮件队列 14 QueueModule.forFeature({ 15 name: 'email-queue', 16 role: 'BOTH', 17 workerOptions: { maxBatchSize: 5 }, 18 queueOptions: { 19 pendingQueueName: 'email-pending', 20 workerQueuePrefix: 'email-worker-', 21 workerStatePrefix: 'email-state-', 22 }, 23 }), 24 25 // 文件处理队列 26 QueueModule.forFeature({ 27 name: 'file-queue', 28 role: 'WORKER', 29 workerOptions: { maxBatchSize: 3 }, 30 queueOptions: { 31 pendingQueueName: 'file-pending', 32 workerQueuePrefix: 'file-worker-', 33 workerStatePrefix: 'file-state-', 34 }, 35 }), 36 ], 37}) 38export class AppModule {}
1import { Injectable, Inject } from '@nestjs/common'; 2import { QueueService, QueueModule } from 'nestjs-affinity-queue'; 3 4@Injectable() 5export class TaskService { 6 constructor( 7 // 注入默认队列服务 8 private readonly defaultQueueService: QueueService, 9 10 // 注入指定名称的队列服务 11 @Inject(QueueModule.getQueueService('email-queue')) 12 private readonly emailQueueService: QueueService, 13 14 @Inject(QueueModule.getQueueService('file-queue')) 15 private readonly fileQueueService: QueueService, 16 ) {} 17 18 async addDefaultTask(task: any) { 19 return await this.defaultQueueService.add(task); 20 } 21 22 async addEmailTask(task: any) { 23 return await this.emailQueueService.add(task); 24 } 25 26 async addFileTask(task: any) { 27 return await this.fileQueueService.add(task); 28 } 29 30 async getEmailQueueStats() { 31 return await this.emailQueueService.getQueueStats(); 32 } 33}
1import { Injectable, Inject } from '@nestjs/common'; 2import { WorkerService, QueueModule } from 'nestjs-affinity-queue'; 3 4@Injectable() 5export class WorkerController { 6 constructor( 7 // 注入默认工作器服务 8 private readonly defaultWorkerService: WorkerService, 9 10 // 注入指定名称的工作器服务 11 @Inject(QueueModule.getWorkerService('email-queue')) 12 private readonly emailWorkerService: WorkerService, 13 ) {} 14 15 async registerEmailHandlers() { 16 this.emailWorkerService.registerHandler('send-email', async (payload) => { 17 console.log('Sending email:', payload); 18 // 邮件发送逻辑 19 return { success: true }; 20 }); 21 22 this.emailWorkerService.registerHandler('send-newsletter', async (payload) => { 23 console.log('Sending newsletter:', payload); 24 // 邮件群发逻辑 25 return { success: true }; 26 }); 27 } 28}
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
role | 'SCHEDULER' | 'WORKER' | 'BOTH' | - | 模块角色 |
name | string | 'default' | 队列名称 |
workerOptions.maxBatchSize | number | 10 | 最大批处理大小 |
workerOptions.workerCount | number | 1 | 工作器数量 |
redisOptions.host | string | 'localhost' | Redis 主机 |
redisOptions.port | number | 6379 | Redis 端口 |
redisOptions.password | string | - | Redis 密码 |
redisOptions.db | number | 0 | Redis 数据库 |
queueOptions.pendingQueueName | string | 'pending-tasks' | 待处理队列名称 |
queueOptions.workerQueuePrefix | string | 'worker-' | 工作器队列前缀 |
queueOptions.workerStatePrefix | string | 'worker-state-' | 工作器状态前缀 |
queueOptions.schedulerInterval | number | 1000 | 调度器间隔(毫秒) |
electionOptions.electionLockTtl | number | 30000 | 选举锁 TTL(毫秒) |
electionOptions.heartbeatInterval | number | 5000 | 心跳间隔(毫秒) |
electionOptions.heartbeatTimeout | number | 15000 | 心跳超时(毫秒) |
属性 | 类型 | 默认值 | 描述 |
---|---|---|---|
name | string | - | 必需 队列名称 |
role | 'SCHEDULER' | 'WORKER' | 'BOTH' | - | 必需 模块角色 |
queueOptions.pendingQueueName | string | - | 必需 待处理队列名称 |
其他选项 | - | - | 与 QueueModuleOptions 相同 |
属性 | 类型 | 描述 |
---|---|---|
useFactory | (...args: any[]) => Promise<Options> | Options | 工厂函数 |
inject | any[] | 依赖注入数组 |
imports | any[] | 导入模块数组 |
1interface Task { 2 type: string; // 任务类型 3 identifyTag: string; // 身份标识(亲和性标识) 4 payload: any; // 任务数据 5}
1// 获取队列服务注入令牌 2QueueModule.getQueueService('queue-name') 3 4// 获取工作器服务注入令牌 5QueueModule.getWorkerService('queue-name') 6 7// 获取调度器处理器注入令牌 8QueueModule.getSchedulerProcessor('queue-name')
1// 推荐使用有意义的队列名称 2QueueModule.forFeature({ 3 name: 'user-notifications', // 清晰描述队列用途 4 queueOptions: { 5 pendingQueueName: 'user-notifications-pending', 6 workerQueuePrefix: 'user-notifications-worker-', 7 workerStatePrefix: 'user-notifications-state-', 8 }, 9});
1// 生产环境建议分离调度器和工作器 2const role = process.env.APP_ROLE as 'SCHEDULER' | 'WORKER' | 'BOTH'; 3 4QueueModule.forRoot({ 5 role, 6 // 其他配置... 7});
1// .env 文件 2REDIS_HOST=localhost 3REDIS_PORT=6379 4EMAIL_QUEUE_MAX_BATCH=5 5FILE_QUEUE_MAX_BATCH=3 6APP_ROLE=BOTH
MIT
No vulnerabilities found.
No security vulnerabilities found.