get TaskResult object when testing the Celery task in Django

25 Views Asked by At

I have a very simple celery task inside my Django application:

@app.task(name='test', bind=True)
def my_celery_task(self, schema=None, ref_id=None, user_id=None):
    print('Start Task 1')
    with schema_context(schema):
        print(self.get_task_result())
        print(self.get_task_result().pk)
        print('Start Task 2')
        return {'status': 'Task finished successfully'}

I also have the base class for Celery tasks

class MyTaskBase(Task):

    def delay(self, *args, **kwargs):
        print('Delaying task: {}'.format(self.name))
        argsrepr = [arg for arg in args]
        kwargsrepr = {key: value for key, value in kwargs.items()}

        new_task = super().s(*args, **kwargs)
        return new_task.apply_async(
            argsrepr=json.dumps(argsrepr),
            kwargsrepr=json.dumps(kwargsrepr)
        )

    def get_task_result(self):
        from django_celery_results.models import TaskResult
        task_result = TaskResult.objects.get_task(self.request.id)
        return task_result

I also have the trigger for a TaskResult on_save:

@receiver(post_save, sender=TaskResult)
def create_task_entry(sender, instance=None, created=None, **kwargs):
    print("INSIDE THE POST_SAVE")
    ...
    TaskEntry.objects.create(...)

I want to test this configuration:

SO... : I wrote this little test:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_test(self):
    my_celery_task.delay(schema=self.get_test_schema_name())
    print(TaskResult.objects.all())
    print(TaskEntry.objects.all())

The prints are:

Delaying task: my_celery_task
Start Task 1
<Task: 7a0e8063-2be9-44b1-b438-1c0d37e33456 (PENDING)>
None
Start Task 2
<QuerySet []>
<QuerySet []>

From this, I conclude:

  • the task was run
  • The TaskResult was created but never saved to the DB

If I omit the CELERY_TASK_ALWAYS_EAGER and run this

def test_test(self):
    my_celery_task.delay(schema=self.get_test_schema_name())
    print(TaskResult.objects.all())
    print(TaskEntry.objects.all())

Then the prints are on the worker:

Delaying task: my_celery_task
Start Task 1
<Task: 7a0e8063-2be9-44b1-b438-1c0d37e33456 (PENDING)>
None
Start Task 2
INSIDE THE POST_SAVE
Pool callback raised exception: ProgrammingError('relation "tasks_taskentry" does not exist\nLINE 1: INSERT INTO "tasks_taskentry" ("uuid", "created_on", "modifi...\n                    ^\n')
2023-12-15 11:33:38 Traceback (most recent call last):
2023-12-15 11:33:38   File "/usr/local/lib/python3.7/site-packages/django/db/models/query.py", line 581, in get_or_create
2023-12-15 11:33:38     return self.get(**kwargs), False
2023-12-15 11:33:38   File "/usr/local/lib/python3.7/site-packages/django/db/models/query.py", line 437, in get
2023-12-15 11:33:38     self.model._meta.object_name
2023-12-15 11:33:38 django_celery_results.models.TaskResult.DoesNotExist: TaskResult matching query does not exist.
2023-12-15 11:33:38 
2023-12-15 11:33:38 During handling of the above exception, another exception occurred:
2023-12-15 11:33:38 
2023-12-15 11:33:38 Traceback (most recent call last):
2023-12-15 11:33:38   File "/usr/local/lib/python3.7/site-packages/django/db/backends/utils.py", line 84, in _execute
2023-12-15 11:33:38     return self.cursor.execute(sql, params)
2023-12-15 11:33:38 psycopg2.errors.UndefinedTable: relation "tasks_taskentry" does not exist
2023-12-15 11:33:38 LINE 1: INSERT INTO "tasks_taskentry" ("uuid", "created_on", "modifi...

So if I run it without eager, the save is called but can't be saved because DB error.

How to fix this. I would like to test and inspect the TestEntry object after some task was called.

0

There are 0 best solutions below