Method "add a job" is never awaited

58 Views Asked by At

Method this.aQueue.add(*args) which returns an job instance is endlessly awaited. After receiving and deserializing data i try to get a job instance in the method RequestService.sendData of app.service.ts module, but can't do it and the rest of code is not running

app.service.ts

import { Injectable } from '@nestjs/common';
import { Job, Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
import { plainToClass } from 'class-transformer';
import { RequestScheme, ResponseScheme } from './app.schemes';
 
 
@Injectable()
export class RequestService {
  constructor(
    @InjectQueue('request') private requestQueue: Queue
  ){}
 
   async sendData(data: RequestScheme): Promise<ResponseScheme> {
    let responseData: ResponseScheme
    data = plainToClass(RequestScheme, data)
    console.log("data in controller", data) // data is deserialized as i expect
 
    const jobInstance = await this.requestQueue.add(
      'request', data, { delay: data.wait }
    ) // this method is running and never awaited
 
    console.log(`Job: ${jobInstance}`)
 
    async function setJobData(jobInstance: Job){
      return new Promise((resolve, reject) => {
        this.requestQueue.on('completed', function(job: Job, result: ResponseScheme, error){
        if (jobInstance.id == job.id) {
          responseData = result
          job.remove()
          resolve(result)
        }
        if (error) reject(error)
      })
    })}
    
    await setJobData(jobInstance)
    return responseData
   }
  }

app.processor.ts

import { Job } from 'bull';
import { 
  Processor, 
  Process, 
  OnQueueActive,
  OnQueueCompleted
} from '@nestjs/bull';
import { ResponseScheme } from './app.schemes';
 
@Processor('request')
export class RequestConsumer {
 
  @Process('request')
  async process_request(job: Job){
    console.log(`Job ${job.id} proceed`)
  }
 
  @OnQueueActive()
  onActive(job: Job){
    console.log(`Data ${job.data} were sended`)
  }
 
  @OnQueueCompleted()
  onComplete(job: Job){
    const response = new ResponseScheme()
    response.answer = job.data.answer
    return response
  }
}

app.module.ts

import { Global, Module, NestModule } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { RequestController } from './app.controller';
import { RequestService } from './app.service';
import { RequestConsumer } from './app.processor';
 
@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
        maxRetriesPerRequest: null
      }
    }),
    BullModule.registerQueue({
      name: 'request'
    })
  ],
  controllers: [RequestController],
  providers: [
    RequestService
  ],
  exports: [
    RequestService
  ]
})
export class AppModule {
  configure(consumer: RequestConsumer){}
}

app.controller.ts

import { Job } from 'bull';
import { Body, Controller, Get, HttpException, HttpStatus, Post, Res } from '@nestjs/common';
import { RequestService } from './app.service';
import { RequestScheme, ResponseScheme } from './app.schemes';
 
@Controller('request')
export class RequestController {
  constructor(private readonly requestService: RequestService) {}
 
  @Get()//this works good
  about(): string{
    return "Hello! This is the request"
  }
 
  @Post()
  async getHello(@Body() data: RequestScheme): Promise<ResponseScheme> {
    console.log("POST", "data", data) //client data, good as it's expected
    let responseData: ResponseScheme
    responseData = await this.requestService.sendData(data)
    return responseData
  }
}

Based on manual[https://docs.nestjs.com/techniques/queues] this method is standard for nestjs.

const job = await this.audioQueue.add(
  {
    foo: 'bar',
  },
  { lifo: true },
);

But my variant is running endlessly - it doesn’t matter what data I have in wait (string or number):

const jobInstance = await this.requestQueue.add(
      'request', data, { delay: data.wait }
    )

Also, i tried to get hardcode data from RequestService.sendData by hidding "add a job" method, and it works. But i need to add a job.

1

There are 1 best solutions below

14
Marek Kapusta-Ognicki On

Oooookay. And what do you want to achieve with this setup? :)

Don't get me wrong but from my place it looks as if you were dispatching a job from controller, to be processed by a processor that doesn't do anything except for console.logging, but controller needs to wait until the job is done and returns some data.

Also, the code from inside a promise:

        if (jobInstance.id == job.id) {
          responseData = result
          job.remove()
          resolve(result)
        }

is supposed to change the responseData which is outside of its scope, and rather inaccessible from there, as that promise is not even executed in the context of the sendData, but run from within another promise function, also this is never defined, as your processor returns nothing. Plus, job.remove() is a promise itself.

Also you have a if-else clause, that says:

  1. if jobInstance.id is equal-ish to job.id, then do sth,
  2. in other case if we have an error, then reject,
  3. in all other cases (jobInstance.id isn't equal-ish to job.id, but there's no error) - do nothing. Wait. Wait. Wait. Just wait ;)

Also a promise function with async inside of an async function that's something that might lead to many weird stuff going on. Besides, when you do async function xyz(...) { this.requestQueue...}, what do you mean by this?


If your intention was to withhold controller until job is finished, then mutate some data inside of the controller method, to display it to the end-user, then it's absolutely not the way. But I can't determine if that was actually your intention, so I don't know if that's the direction you're interested in :)