I am trying to send DTOs to my Redis queue with a Bull framework and handle these DTOs in processors. Sometimes job pass to processor (1 of 100) but most of the time failed with error: job stalled more than allowable limit
and I have no idea how to fix it.
I give you a small intro and below you can see my code. I have created queue-api
module which serve as wrapper for my queues, for instance order queue. This module then I import to modules from which I want publish DTO into queues, in my case order-module
.
queue-api module files
// queue-api.module.ts
@Module({
imports: [
BullModule.registerQueue(
{
name: 'order-queue',
defaultJobOptions: {
backoff: 10000,
attempts: Number.MAX_SAFE_INTEGER,
},
},
),
...
],
providers: [OrderQueue],
exports: [OrderQueue],
})
export class QueueApiModule {}
// order-queue.ts
@Injectable()
export class OrderQueue extends AbstractQueue {
constructor(
@InjectQueue('order-queue')
private readonly queue: Queue,
) {}
async sendSubmitMail(dto: SendSubmitMailDto): Promise<void> {
const job = await this.queue.add('send-submit-mail', dto)
console.log(`Job ${job.id} created.`)
}
}
order-module files
// order.module.ts
@Module({
imports: [
QueueApiModule,
...
],
providers: [
OrderProcessor,
...
]
})
export class OrderModule {}
// order-processor.ts
@Processor('order-queue')
export class OrderProcessor {
constructor(private readonly queue: OrderQueue) {}
@Process('send-submit-mail')
async onProcessSubmitMail(job: Job): Promise<void> {
console.log(`Processing of job ${job.id}`)
}
}
this processor handler is almost never called.
Do you have any idea what is wrong with my code? Thank you in advice.
A little late, it's still better to have it written here
It happens because of this line
constructor(private readonly queue: OrderQueue) {}
And more precisely it's because the DI mechanism, Probably the reason the service is
Scope.REQUEST
(Or one of its injected services, which makes the host service also a Scope.REQUEST service, the whole injection sub-tree is request scoped)@Process()
runs the handler in separate process, and therefore has no access to the Injector.If you look at the error that results from trying to process the job.data, yo'll see something like this (In my case trying to inject EmailService):
stacktrace ["TypeError: this.request.get is not a function\n at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)\n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)\n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)\n at processTicksAndRejections (node:internal/process/task_queues:95:5)\n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)\n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)\n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)\n at async Promise.all (index 0)\n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)\n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)","TypeError: this.request.get is not a function\n at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)\n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)\n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)\n at processTicksAndRejections (node:internal/process/task_queues:95:5)\n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)\n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)\n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)\n at async Promise.all (index 0)\n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)\n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)"]