I implemented the celery result example from the documentation, and I am not getting the same results.
I have 2 dockers in the same network: my celery_1 docker which runs the celery worker and the redis_1 docker with runs the latest redis image.
I run my celery docker and I have this file tasks.py:
from celery import group
from tuna.celery import app
@app.task(trail=True)
def A(how_many):
return group(B.s(i) for i in range(how_many))()
@app.task(trail=True)
def B(i):
return pow2.delay(i)
@app.task(trail=True)
def pow2(i):
return i ** 2
This is how i start celery in the celery_1 docker:
celery -A tuna.celery_app.celery worker -l info -E
I can see it working and registering the tasks correctly:
celery_1 | -------------- celery@fc9a5ae8ab17 v5.3.4 (emerald-rush)
celery_1 | --- ***** -----
celery_1 | -- ******* ---- Linux-5.4.0-150-generic-x86_64-with-glibc2.29 2024-01-10 17:28:52
celery_1 | - *** --- * ---
celery_1 | - ** ---------- [config]
celery_1 | - ** ---------- .> app: celery_app:0x7f4f829d98e0
celery_1 | - ** ---------- .> transport: redis://mituna_redis:6379//
celery_1 | - ** ---------- .> results: redis://mituna_redis:6379/
celery_1 | - *** --- * --- .> concurrency: 72 (prefork)
celery_1 | -- ******* ---- .> task events: ON
celery_1 | --- ***** -----
celery_1 | -------------- [queues]
celery_1 | .> celery exchange=celery(direct) key=celery
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . tuna.celery_app.celery.A
celery_1 | . tuna.celery_app.celery.B
celery_1 | . tuna.celery_app.celery.async_call
celery_1 | . tuna.celery_app.celery.group_tasks
celery_1 | . tuna.celery_app.celery.pow2
Pretty much any function I have that does not use result.collect() works as expected, and I am able to see the results. But I am trying to collect these results in a non-blocking way with collect(), and it does not work for me.
This is the app settings:
app = Celery('celery_app',
broker_url=f"redis://mituna_redis:6379//",
result_backend=f"redis://mituna_redis:6379/",
include=['tuna.celery_app.celery',]) #note I also left the include part out - tasks are still regiestered correctly, I dont think this is needed
app.autodiscover_tasks()
app.conf.result_backend_transport_options = {'retry_policy': {'timeout': 5.0}}
logger = get_task_logger(__name__)
I use redis:latest for my image and celery 5.3.4
So in celery docker I start my python3 interpreter, and write the same code as in the documentation. https://docs.celeryq.dev/en/stable/reference/celery.result.html
Python 3.8.10 (default, May 26 2023, 14:05:08)
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from tuna.celery_app.celery import A
>>> from celery.result import ResultBase
>>> result = A.delay(10)
>>> [v for v in result.collect()
... if not isinstance(v, (ResultBase, tuple))]
[]
As you can see, result.collect() comes back empty in the interpreter.
I see the tasks being received and the results computed correctly in the celery worker:
celery_1 | [2024-01-10 15:01:14,312: INFO/MainProcess] Task tuna.celery_app.celery.pow2[78223919-cc49-4166-8546-6e4a48345478] received
celery_1 | [2024-01-10 15:01:14,312: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[77c7e134-1922-44db-9d55-e3f6b018fb89] succeeded in 0.00052880764008s: 1
celery_1 | [2024-01-10 15:01:14,313: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[78223919-cc49-4166-8546-6e4a48345478] succeeded in 0.00044824361801s: 0
celery_1 | [2024-01-10 15:01:14,314: INFO/MainProcess] Task tuna.celery_app.celery.pow2[1e6e530d-1383-4fca-8d90-1da1b892ad44] received
celery_1 | [2024-01-10 15:01:14,316: INFO/MainProcess] Task tuna.celery_app.celery.pow2[bbeef76f-e20f-4f92-9efb-d15eb37f8f8d] received
celery_1 | [2024-01-10 15:01:14,317: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[1e6e530d-1383-4fca-8d90-1da1b892ad44] succeeded in 0.00066126394272s: 36
celery_1 | [2024-01-10 15:01:14,317: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[bbeef76f-e20f-4f92-9efb-d15eb37f8f8d] succeeded in 0.0004749994278s: 49
celery_1 | [2024-01-10 15:01:14,319: INFO/MainProcess] Task tuna.celery_app.celery.pow2[84068265-de95-4d23-ba9f-cfa8006f65cd] received
celery_1 | [2024-01-10 15:01:14,321: INFO/MainProcess] Task tuna.celery_app.celery.pow2[851bd08b-59eb-40b4-8ab9-e40e622ad086] received
celery_1 | [2024-01-10 15:01:14,322: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[84068265-de95-4d23-ba9f-cfa8006f65cd] succeeded in 0.00038264656067s: 9
celery_1 | [2024-01-10 15:01:14,322: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[851bd08b-59eb-40b4-8ab9-e40e622ad086] succeeded in 0.00030431699753s: 4
celery_1 | [2024-01-10 15:01:14,323: INFO/MainProcess] Task tuna.celery_app.celery.pow2[d6926a01-fdfe-454f-966e-a347484e6574] received
celery_1 | [2024-01-10 15:01:14,326: INFO/MainProcess] Task tuna.celery_app.celery.pow2[861452de-746f-4d22-bd88-29f407bf599d] received
celery_1 | [2024-01-10 15:01:14,327: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[d6926a01-fdfe-454f-966e-a347484e6574] succeeded in 0.00024588489532s: 16
celery_1 | [2024-01-10 15:01:14,327: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[861452de-746f-4d22-bd88-29f407bf599d] succeeded in 0.00058472061157s: 64
celery_1 | [2024-01-10 15:01:14,328: INFO/MainProcess] Task tuna.celery_app.celery.pow2[753ab025-05bf-4cab-a2e5-0e4a8f004f30] received
celery_1 | [2024-01-10 15:01:14,330: INFO/MainProcess] Task tuna.celery_app.celery.pow2[f8143821-a9f4-45ba-b22e-c949d6ffe20c] received
celery_1 | [2024-01-10 15:01:14,332: INFO/ForkPoolWorker-63] Task tuna.celery_app.celery.pow2[753ab025-05bf-4cab-a2e5-0e4a8f004f30] succeeded in 0.00030650091171s: 81
celery_1 | [2024-01-10 15:01:14,332: INFO/ForkPoolWorker-64] Task tuna.celery_app.celery.pow2[f8143821-a9f4-45ba-b22e-c949d6ffe20c] succeeded in 0.00077548265457s: 25
celery_1 | [2024-01-10 15:03:20,991: INFO/MainProcess] Task tuna.celery_app.celery.A[43912ac2-a04f-457b-8f59-d5d8af4bcf0b] received
celery_1 | [2024-01-10 15:
when i do this in a blocking way, works like a charm
>>> from tuna.celery_app.celery import pow2
>>> g_result = group(pow2.s(2), pow2.s(4))
>>> res = g_result()
>>> res.get()
[4, 16]
Why is result.collect() not saving the results? According to the docs i should be getting this list: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] but mine is empty.