celery apply_async returning missing argument error

53 Views Asked by At

I have a celery function with definition -

@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str): 

previously it wasn't a celery task it was being called like this -

n.publish_msg_from_lock(addr, unhexlify(payload), gateway_euid) 

After making it a celery task I updated the call in this way -

n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid),)

I've also tried -

n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid), kwargs={})

and

n.publish_msg_from_lock.apply_async(kwargs={"mac": addr, "data": unhexlify(payload), "gateway_euid": gateway_euid})

But I'm getting error -

File "/usr/local/lib/python3.8/dist-packages/celery/app/task.py", line 531, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: publish_msg_from_lock() missing 1 required positional argument: 'gateway_euid'

Can you please help to fix it?

1

There are 1 best solutions below

1
MaximumLasagna On

Your task fail because it is not bound. To use the parameter self in the function signature you need to add bind=True to the celery decorator.

Example:

@async_worker.task(ignore_result=True, queue="data_path", bind=True)
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str): 

It allows to access some celery functionality like described here https://docs.celeryq.dev/en/stable/userguide/tasks.html#bound-tasks

You could also remove the parameter self if you don't need it.

@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(mac: str, data: bytes, gateway_euid: str):