当前位置: 首页 > news >正文

NestJs bull 用法

bull简介

队列 bull

bull用法

https://github.com/OptimalBits/bull

Bull is currently in maintenance mode, we are only fixing bugs. For new features check BullMQ, a modern rewritten implementation in Typescript. You are still very welcome to use Bull if it suits your needs, which is a safe, battle tested librarySourceURL:https://github.com/OptimalBits/bull?tab=readme-ov-file 公牛目前处于维护模式,我们只是修复错误。有关新特性,请检查 BullMQ,这是一个用 Typecript 重写的现代实现。如果符合您的需要,您仍然非常欢迎使用 Bull,这是一个安全的、经过战斗考验的库。

真正用法用bullMq参考bullMq

bull参考bull

不过概念理解知识内容,多参考bullMq,文档更加丰富

https://github.com/taskforcesh/bullmq

What is BullMQ | BullMQ

@Injectable()
export class QueueConfigurationService implements OnApplicationBootstrap {constructor() {}private queueRegistry = new Map<string, Queue>();async onApplicationBootstrap() {const redisQueueConfig = {host: process.env['REDIS_HOST'],port: Number(process.env['REDIS_PORT']),password: process.env['REDIS_PASSWORD'],connectionTimeout: 30000,db: 4,};try {const standardQueue = new Queue('StandardQueue', { redis: redisQueueConfig });this.queueRegistry.set('StandardQueue', standardQueue);} catch (error) {throw error;}}async getQueueByLabel(queueLabel: string): Promise<Queue | undefined> {return this.queueRegistry.get(queueLabel);}
}

// 在其他地方使用
this.taskQueue = new Queue(serviceQueueName, {redis: {host: process.env['REDIS_HOST'],port: Number(process.env['REDIS_PORT']),password: process.env['REDIS_PASSWORD'],connectTimeout: 30000,db: 4,},
});
this.taskQueue.process(taskSubscriptionName, async (job, done) => {});

Nestjs bull用法

Documentation | NestJS - A progressive Node.js framework

import { Injectable, Inject } from '@nestjs/common';
import { InjectRedis } from 'nestjs-redis';
import { InjectQueue, BullModule } from '@nestjs/bull';
import { BaseService } from './base.service';
import { TypeOrmModule } from '@nestjs/typeorm';// 常量定义
const NORMAL_QUEUE_NAME = 'normal_queue';
const PRIORITY_QUEUE_NAME = 'priority_queue';
const TEST_QUEUE_NAME = 'test_queue';// 实现 OnApplicationBootstrap 接口
@Injectable()
export class QueueConfigService {// 构造函数constructor(@InjectRedis('redisDistribute') private readonly redisDistribute: any,@InjectQueue(NORMAL_QUEUE_NAME) private readonly normalQueue: any,@InjectQueue(PRIORITY_QUEUE_NAME) private readonly priorityQueue: any,@InjectQueue(TEST_QUEUE_NAME) private readonly testQueue: any,private readonly base: BaseService) {}// 根据队列名称返回对应的队列实例getQueueByName(queueName: string): any {const queueMap = {[NORMAL_QUEUE_NAME]: this.normalQueue,[PRIORITY_QUEUE_NAME]: this.priorityQueue,[TEST_QUEUE_NAME]: this.testQueue,};return queueMap[queueName];}
}

// 模块定义
@Module({imports: [BullModule.registerQueueAsync([{name: NORMAL_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},{name: PRIORITY_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},{name: TEST_QUEUE_NAME,useFactory: () => ({redis: {host: process.env['REDIS_HOST'],port: +process.env['REDIS_PORT'],password: process.env['REDIS_PASSWORD'],},}),},]),BaseModule,],providers: [QueueConfigService],exports: [QueueConfigService],
})
export class QueueConfigModule {}

queueMap[TEST_QUEUE_NAME].add('test', 20);

@Processor('TEST_QUEUE_NAME')
export class QueneService {constructor(@InjectQueue('task') private readonly taskQueue: Queue) {}@Process('test')async processTask(job: Job<number>) {console.log('Processing', job);console.log('Processing done', job.id);}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Linux驱动开发基础(sr04超声波模块)
  • 算法day16|654.最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树
  • filezilla使用教程(window下filezilla使用教程)
  • 梧桐数据库(WuTongDB):什么是“顺序扫描”
  • [GESP202312 四级] 田忌赛马
  • 今日算法:蓝桥杯基础题之“星系炸弹”
  • 掌握 Python列表:从基础到进阶技巧
  • FutureTask通常如何使用?
  • Ethercat设备数据 转IEC61850项目案例
  • django学生就业管理系统—计算机毕业设计源码24237
  • ★ 算法OJ题 ★ 力扣11 - 盛水最多的容器
  • qtlinux
  • MySQL中的COALESCE()函数用法,返回第一个非 NULL 的参数
  • 什么是数字人
  • How to install mysql 5.7 with podman in Ubuntu 24.04
  • 收藏网友的 源程序下载网
  • 《Javascript高级程序设计 (第三版)》第五章 引用类型
  • 【5+】跨webview多页面 触发事件(二)
  • 2018一半小结一波
  • ABAP的include关键字,Java的import, C的include和C4C ABSL 的import比较
  • Android Studio:GIT提交项目到远程仓库
  • HomeBrew常规使用教程
  • java8 Stream Pipelines 浅析
  • mysql_config not found
  • Python_OOP
  • Redis字符串类型内部编码剖析
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • Vue.js 移动端适配之 vw 解决方案
  • 从零开始学习部署
  • 大型网站性能监测、分析与优化常见问题QA
  • 漫谈开发设计中的一些“原则”及“设计哲学”
  • 使用SAX解析XML
  • 小程序上传图片到七牛云(支持多张上传,预览,删除)
  • 用 vue 组件自定义 v-model, 实现一个 Tab 组件。
  • hi-nginx-1.3.4编译安装
  • ​十个常见的 Python 脚本 (详细介绍 + 代码举例)
  • #define、const、typedef的差别
  • #Linux(权限管理)
  • #多叉树深度遍历_结合深度学习的视频编码方法--帧内预测
  • (160)时序收敛--->(10)时序收敛十
  • (Bean工厂的后处理器入门)学习Spring的第七天
  • (C)一些题4
  • (初研) Sentence-embedding fine-tune notebook
  • (第二周)效能测试
  • (强烈推荐)移动端音视频从零到上手(下)
  • (三)uboot源码分析
  • (一)硬件制作--从零开始自制linux掌上电脑(F1C200S) <嵌入式项目>
  • (转)jQuery 基础
  • (转)ORM
  • (转)Sql Server 保留几位小数的两种做法
  • (转)创业家杂志:UCWEB天使第一步
  • (转载)在C#用WM_COPYDATA消息来实现两个进程之间传递数据
  • **CI中自动类加载的用法总结
  • .net 7和core版 SignalR
  • .NET 8 中引入新的 IHostedLifecycleService 接口 实现定时任务