Celery async result.collect() is empty [], what am I doing wrong?

26 Views Asked by At

I implemented the celery result example from the documentation, and I am not getting the same results.

I have 2 dockers in the same network: my celery_1 docker which runs the celery worker and the redis_1 docker with runs the latest redis image.

I run my celery docker and I have this file tasks.py:

from celery import group
from tuna.celery import app

@app.task(trail=True)
def A(how_many):
    return group(B.s(i) for i in range(how_many))()

@app.task(trail=True)
def B(i):
    return pow2.delay(i)

@app.task(trail=True)
def pow2(i):
    return i ** 2

This is how i start celery in the celery_1 docker:

celery -A tuna.celery_app.celery worker -l info -E

I can see it working and registering the tasks correctly:

celery_1  |  -------------- celery@fc9a5ae8ab17 v5.3.4 (emerald-rush)
celery_1  | --- ***** -----
celery_1  | -- ******* ---- Linux-5.4.0-150-generic-x86_64-with-glibc2.29 2024-01-10 17:28:52
celery_1  | - *** --- * ---
celery_1  | - ** ---------- [config]
celery_1  | - ** ---------- .> app:         celery_app:0x7f4f829d98e0
celery_1  | - ** ---------- .> transport:   redis://mituna_redis:6379//
celery_1  | - ** ---------- .> results:     redis://mituna_redis:6379/
celery_1  | - *** --- * --- .> concurrency: 72 (prefork)
celery_1  | -- ******* ---- .> task events: ON
celery_1  | --- ***** -----
celery_1  |  -------------- [queues]
celery_1  |                 .> celery           exchange=celery(direct) key=celery
celery_1  |
celery_1  |
celery_1  | [tasks]
celery_1  |   . tuna.celery_app.celery.A
celery_1  |   . tuna.celery_app.celery.B
celery_1  |   . tuna.celery_app.celery.async_call
celery_1  |   . tuna.celery_app.celery.group_tasks
celery_1  |   . tuna.celery_app.celery.pow2

Pretty much any function I have that does not use result.collect() works as expected, and I am able to see the results. But I am trying to collect these results in a non-blocking way with collect(), and it does not work for me.

This is the app settings:

app = Celery('celery_app',
             broker_url=f"redis://mituna_redis:6379//",
             result_backend=f"redis://mituna_redis:6379/",
             include=['tuna.celery_app.celery',]) #note I also left the include part out - tasks are still regiestered correctly, I dont think this is needed

app.autodiscover_tasks()
app.conf.result_backend_transport_options = {'retry_policy': {'timeout': 5.0}}
logger = get_task_logger(__name__)

I use redis:latest for my image and celery 5.3.4

So in celery docker I start my python3 interpreter, and write the same code as in the documentation. https://docs.celeryq.dev/en/stable/reference/celery.result.html

Python 3.8.10 (default, May 26 2023, 14:05:08)
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tuna.celery_app.celery import A
>>> from celery.result import ResultBase
>>> result = A.delay(10)
>>> [v for v in result.collect()
...  if not isinstance(v, (ResultBase, tuple))]
[]

As you can see, result.collect() comes back empty in the interpreter.

I see the tasks being received and the results computed correctly in the celery worker:

celery_1  | [2024-01-10 15:01:14,312: INFO/MainProcess] Task tuna.celery_app.celery.pow2[78223919-cc49-4166-8546-6e4a48345478] received
celery_1  | [2024-01-10 15:01:14,312: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[77c7e134-1922-44db-9d55-e3f6b018fb89] succeeded in 0.00052880764008s: 1
celery_1  | [2024-01-10 15:01:14,313: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[78223919-cc49-4166-8546-6e4a48345478] succeeded in 0.00044824361801s: 0
celery_1  | [2024-01-10 15:01:14,314: INFO/MainProcess] Task tuna.celery_app.celery.pow2[1e6e530d-1383-4fca-8d90-1da1b892ad44] received
celery_1  | [2024-01-10 15:01:14,316: INFO/MainProcess] Task tuna.celery_app.celery.pow2[bbeef76f-e20f-4f92-9efb-d15eb37f8f8d] received
celery_1  | [2024-01-10 15:01:14,317: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[1e6e530d-1383-4fca-8d90-1da1b892ad44] succeeded in 0.00066126394272s: 36
celery_1  | [2024-01-10 15:01:14,317: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[bbeef76f-e20f-4f92-9efb-d15eb37f8f8d] succeeded in 0.0004749994278s: 49
celery_1  | [2024-01-10 15:01:14,319: INFO/MainProcess] Task tuna.celery_app.celery.pow2[84068265-de95-4d23-ba9f-cfa8006f65cd] received
celery_1  | [2024-01-10 15:01:14,321: INFO/MainProcess] Task tuna.celery_app.celery.pow2[851bd08b-59eb-40b4-8ab9-e40e622ad086] received
celery_1  | [2024-01-10 15:01:14,322: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[84068265-de95-4d23-ba9f-cfa8006f65cd] succeeded in 0.00038264656067s: 9
celery_1  | [2024-01-10 15:01:14,322: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[851bd08b-59eb-40b4-8ab9-e40e622ad086] succeeded in 0.00030431699753s: 4
celery_1  | [2024-01-10 15:01:14,323: INFO/MainProcess] Task tuna.celery_app.celery.pow2[d6926a01-fdfe-454f-966e-a347484e6574] received
celery_1  | [2024-01-10 15:01:14,326: INFO/MainProcess] Task tuna.celery_app.celery.pow2[861452de-746f-4d22-bd88-29f407bf599d] received
celery_1  | [2024-01-10 15:01:14,327: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[d6926a01-fdfe-454f-966e-a347484e6574] succeeded in 0.00024588489532s: 16
celery_1  | [2024-01-10 15:01:14,327: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[861452de-746f-4d22-bd88-29f407bf599d] succeeded in 0.00058472061157s: 64
celery_1  | [2024-01-10 15:01:14,328: INFO/MainProcess] Task tuna.celery_app.celery.pow2[753ab025-05bf-4cab-a2e5-0e4a8f004f30] received
celery_1  | [2024-01-10 15:01:14,330: INFO/MainProcess] Task tuna.celery_app.celery.pow2[f8143821-a9f4-45ba-b22e-c949d6ffe20c] received
celery_1  | [2024-01-10 15:01:14,332: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[753ab025-05bf-4cab-a2e5-0e4a8f004f30] succeeded in 0.00030650091171s: 81
celery_1  | [2024-01-10 15:01:14,332: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[f8143821-a9f4-45ba-b22e-c949d6ffe20c] succeeded in 0.00077548265457s: 25
celery_1  | [2024-01-10 15:03:20,991: INFO/MainProcess] Task tuna.celery_app.celery.A[43912ac2-a04f-457b-8f59-d5d8af4bcf0b] received
celery_1  | [2024-01-10 15:

when i do this in a blocking way, works like a charm

>>> from tuna.celery_app.celery import pow2
>>> g_result = group(pow2.s(2), pow2.s(4))
>>> res = g_result()
>>> res.get()
[4, 16]

Why is result.collect() not saving the results? According to the docs i should be getting this list: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] but mine is empty.

0

There are 0 best solutions below