I am writing tests for a FastAPI app using pytest. I am using pytest_asyncio to test async functions.
def get_queue():
return Queue()
async def task_runner():
...
@app.post('/run-task')
async def run_task(queue: Queue = Depends(get_queue)):
...
queue.enqueue(task_runner)
in the tests file I have the following:
def get_test_queue():
return Queue(is_async=False)
app.dependency_overrides[get_queue] = get_test_queue
@pytest.mark.asyncio
async def test_run_task(client):
await client.post('/run-task')
# Assertions (requires task to be complete)
I set is_async=False as in the docs
When I run the tests I get the following error:
RuntimeError: Cannot run the event loop while another loop is running
Does anyone has faced similar issue or has a work-around for this problem?
I tried to patch the _execute method in the rq.job.Job class but I am not used to asyncio library.