I'm working on a FastApi application that uses Celery to execute tasks, and I'm trying to integrate a gRPC client within a Celery task. However, I'm running into an InactiveRpcError that I can't seem to resolve.
Here's my setup: I have a Celery worker that I run with the following command: Celery worker :
celery -A worker.worker.celery_app worker --loglevel=info --pool=gevent --concurrency=500
My gRPC client works fine outside of the Celery task, and my other Celery tasks work fine without the gRPC client. However, when I try to use the gRPC client within a Celery task, I get the following error:
grpc._channel._InactiveRpcError: <grpc._channel._RPCState object at 0x7f0adb16d600>
Here's the relevant code:
async def send_latest_grpc_packets(filter_type: PaquetTypeEnum, lastest_iec_packets: Optional[Iec104Packets] = None):
with grpc.insecure_channel(f"{settings.GRPC_SERVER_HOST}:{settings.GRPC_SERVER_PORT}") as channel:
stub = ioa_packets_pb2_grpc.PacketsStub(channel)
if lastest_iec_packets is None:
lastest_iec_packets = get_lastest_iec_packets(filter_type)
formatted_iec_packets = format_iec_instance_to_schema(lastest_iec_packets)
packets_request = ioa_packets_pb2.PacketsRequest(
packets=cast_packet_to_proto_schema(formatted_iec_packets),
type=filter_type,
)
response = stub.SendPackets(packets_request, timeout=30)
@celery_app.task()
def send_cyclical_TM_task():
asyncio.run(send_latest_grpc_packets(PaquetTypeEnum.TM))
I'm using the following versions of the libraries:
- grpcio = "^1.53.0"
- celery = "^5.2.7"
I've tried changing the Celery execution pool from prefork to gevent, but that didn't solve the problem.
My current horrible workaround is to start a task that calls one of my own endpoints, which then uses the gRPC client. However, I'd like to find a better solution, as I'd like to keep the process non-blocking and periodic.
If anyone has experience with integrating gRPC and Celery, or can help me understand why they seem to be incompatible in my case, I would greatly appreciate it. Thank you!