system: celery 5.2.7, redis 5.0.3
Trying to find a way to update my database when a task has been "queued" (by queued is when tasks are received and waiting to be started). If I set concurrency to 1, and send 10 tasks, I want to capture the signal when its received and insert into my database that a task was received. Then update it along the way for other signals. I MUST be able to access the task's kwargs (It carries metadata I need to insert into my database).
I reviewed the documentation for celery.signals here https://docs.celeryq.dev/en/v5.2.7/userguide/signals.html
I tried using task_recieved, but I can only access "request" as an input. I tried using before_task_publish, and after_task_publish, but the signal does not seem to be captured when I use it.
My tasks code:
from src.app.tasks_signals import recieved, prerun, success, failure
@celery.task(name='customers')
def backend(**kwargs):
outcome = customers_work(**kwargs)
return outcome
My signal captures:
from celery.signals import task_success, task_failure, task_prerun, before_task_publish
@before_task_publish.connect
def recieved(sender=None, body=None, **kwargs):
task_id = body['id']
task_kwargs = kwargs.get('kwargs',{})
print("RECIEVED KWARGS IN task_recieved")
print(task_kwargs)
task_kwargs['correlation_id'] = task_id
task_kwargs['result'] = "incomplete"
task_kwargs['status'] = "QUEUED"
task_kwargs['update_type'] = "create"
task_kwargs['response_code'] = "200"
update_job(**task_kwargs)
How can I capture a celery signal when tasks are queued (received?), and also have access to kwargs to run my update_job to update the database?