I have a very simple celery task inside my Django application:
@app.task(name='test', bind=True)
def my_celery_task(self, schema=None, ref_id=None, user_id=None):
print('Start Task 1')
with schema_context(schema):
print(self.get_task_result())
print(self.get_task_result().pk)
print('Start Task 2')
return {'status': 'Task finished successfully'}
I also have the base class for Celery tasks
class MyTaskBase(Task):
def delay(self, *args, **kwargs):
print('Delaying task: {}'.format(self.name))
argsrepr = [arg for arg in args]
kwargsrepr = {key: value for key, value in kwargs.items()}
new_task = super().s(*args, **kwargs)
return new_task.apply_async(
argsrepr=json.dumps(argsrepr),
kwargsrepr=json.dumps(kwargsrepr)
)
def get_task_result(self):
from django_celery_results.models import TaskResult
task_result = TaskResult.objects.get_task(self.request.id)
return task_result
I also have the trigger for a TaskResult on_save:
@receiver(post_save, sender=TaskResult)
def create_task_entry(sender, instance=None, created=None, **kwargs):
print("INSIDE THE POST_SAVE")
...
TaskEntry.objects.create(...)
I want to test this configuration:
SO... : I wrote this little test:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_test(self):
my_celery_task.delay(schema=self.get_test_schema_name())
print(TaskResult.objects.all())
print(TaskEntry.objects.all())
The prints are:
Delaying task: my_celery_task
Start Task 1
<Task: 7a0e8063-2be9-44b1-b438-1c0d37e33456 (PENDING)>
None
Start Task 2
<QuerySet []>
<QuerySet []>
From this, I conclude:
- the task was run
- The TaskResult was created but never saved to the DB
If I omit the CELERY_TASK_ALWAYS_EAGER and run this
def test_test(self):
my_celery_task.delay(schema=self.get_test_schema_name())
print(TaskResult.objects.all())
print(TaskEntry.objects.all())
Then the prints are on the worker:
Delaying task: my_celery_task
Start Task 1
<Task: 7a0e8063-2be9-44b1-b438-1c0d37e33456 (PENDING)>
None
Start Task 2
INSIDE THE POST_SAVE
Pool callback raised exception: ProgrammingError('relation "tasks_taskentry" does not exist\nLINE 1: INSERT INTO "tasks_taskentry" ("uuid", "created_on", "modifi...\n ^\n')
2023-12-15 11:33:38 Traceback (most recent call last):
2023-12-15 11:33:38 File "/usr/local/lib/python3.7/site-packages/django/db/models/query.py", line 581, in get_or_create
2023-12-15 11:33:38 return self.get(**kwargs), False
2023-12-15 11:33:38 File "/usr/local/lib/python3.7/site-packages/django/db/models/query.py", line 437, in get
2023-12-15 11:33:38 self.model._meta.object_name
2023-12-15 11:33:38 django_celery_results.models.TaskResult.DoesNotExist: TaskResult matching query does not exist.
2023-12-15 11:33:38
2023-12-15 11:33:38 During handling of the above exception, another exception occurred:
2023-12-15 11:33:38
2023-12-15 11:33:38 Traceback (most recent call last):
2023-12-15 11:33:38 File "/usr/local/lib/python3.7/site-packages/django/db/backends/utils.py", line 84, in _execute
2023-12-15 11:33:38 return self.cursor.execute(sql, params)
2023-12-15 11:33:38 psycopg2.errors.UndefinedTable: relation "tasks_taskentry" does not exist
2023-12-15 11:33:38 LINE 1: INSERT INTO "tasks_taskentry" ("uuid", "created_on", "modifi...
So if I run it without eager, the save is called but can't be saved because DB error.
How to fix this. I would like to test and inspect the TestEntry object after some task was called.