I have some problem with async functions, here is my code:
def get_redis_connection(connection_string: str):
hostport, *options = connection_string.split(",")
host, _, port = hostport.partition(":")
redis_retry = Retry(ExponentialBackoff(), 5)
arguments = {
"retry": redis_retry,
"retry_on_error": [BusyLoadingError, ConnectionError, TimeoutError],
}
for option in options:
opt: str
value: Union[int, str]
opt, _, value = option.partition("=")
if opt == "port":
value = int(value)
elif opt == "ssl":
value = value.lower() == "true"
elif opt == "abortConnect":
continue
arguments[opt] = value
return Redis(host, int(port), **arguments)
async def get_async_redis_connection(connection_string: str):
hostport, *options = connection_string.split(",")
host, _, port = hostport.partition(":")
_, _, password = options[0].partition("=")
redis_url = f'rediss://:{password}@{host}:{port}'
pool = aioredis.ConnectionPool.from_url(redis_url)
return aioredis.Redis(connection_pool=pool, ssl=True)
def put_redis_data(redis_connection, hash, hash_name, payload):
ttl = int(os.environ.get("TTL", 12))
redis_connection.hset(hash_key, hash_name, json.dumps(payload))
redis_connection.expire(hash_key, timedelta(hours=ttl))
class Worker():
def __init__(self, azure_connection_string: str, redis_connection_string: str):
super().__init__()
self._redis_connection = get_redis_connection(redis_connection_string)
self._redis_async_connection = asyncio.run(get_async_redis_connection(redis_connection_string))
self._connection_string = azure_connection_string
class MyFirstWorker(Worker):
def __init__(self, azure_connection_string: str, redis_connection_string: str):
super().__init__(azure_connection_string, redis_connection_string)
async def async_get_redis_data(self, key, opt):
async with self._redis_async_connection.client() as redis:
windows = await redis.hget(key, "windows")
doors = await redis.hget(key, "doors")
return windows, doors
def _process(self, opt):
key = str(os.getenv("KEY"))
windows, doors = asyncio.run(self.async_get_redis_data(key, opt))
matching_dataclass = MatchingPayload(json.loads(windows)["PayloadFields"])
logger.info(f"Matching completed")
put_redis_data(
self._redis_connection, key, "match:results", matching_dataclass
)
I have similar workers in separate threads (like MyFirstWorker, MySecondWorker, MyThirdWorker), one of them has an async redis data getter (async def async_get_redis_data(self, key, opt)) other operations with redis are performed synchronously throughout the project.
Here's the error I get:
RuntimeError: await wasn't used with future
I understand that the problem is due to the way I use aioredis, it is important for me to keep the _process method sync, but still perform the redis.hget operations async Please help me understand how to fix this error?
I thought it was because of this line:
windows, doors = asyncio.run(self.async_get_redis_data(key, opt))
I tried replacing it with
windows, doors = await self.async_get_redis_data(key, opt)
But in this case, I get another error related to the fact that the _process method is synchronous (TypeError: cannot unpack non-iterable coroutine object), but it should remain so.
Here is full traceback messages for my error:
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
│ │ └ <coroutine object FirstWorker.async_get_redis_data at 0x7f5acc1893c0>
│ └ <function BaseEventLoop.run_until_complete at 0x7f5ae568e550>
└ <_UnixSelectorEventLoop running=False closed=True debug=False>
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
│ └ <method 'result' of '_asyncio.Task' objects>
└ <Task finished name='Task-109' coro=<FirstWorker.async_get_redis_data() done, defined at /app/workers/matching_worker.py:2...
File "/app/workers/first_worker.py", line 22, in async_get_redis_data
windows = await redis.hget(key, "windows")
│ │ │ └ {'Action': 29, 'service': 0, 'ObjectType': 0, 'ObjectId': 'e8742fds-9b49-4cff-adaf-f4rg542d7da079', 'TaskId': 'c73605fb-403a-41...
│ │ └ 'e8742fds-9b49-4cff-adaf-f4rg542d7da079_c73605fb-403a-41cc-9c0a-a32f4c81f2f4_a9d8b4e0-fca9-e982-00f1-3c05c2cc207d'
│ └ <function Redis.hget at 0x7f5afs52160>
└ <unprintable Redis object>
File "/usr/local/lib/python3.8/dist-packages/aioredis/client.py", line 1085, in execute_command
return await self.parse_response(conn, command_name, **options)
│ │ │ │ └ {}
│ │ │ └ 'HGET'
│ │ └ SSLConnection<host=my-host.com,port=6380,db=0>
│ └ <function Redis.parse_response at 0x7f5ges6bd310>
└ <unprintable Redis object>
File "/usr/local/lib/python3.8/dist-packages/aioredis/client.py", line 1101, in parse_response
response = await connection.read_response()
│ └ <function Connection.read_response at 0x7f5acf7ab1f0>
└ SSLConnection<host=my-host.com,port=6380,db=0>
File "/usr/local/lib/python3.8/dist-packages/aioredis/connection.py", line 910, in read_response
await self.disconnect()
│ └ <function Connection.disconnect at 0x7f5acf4e50>
└ SSLConnection<host=my-host.com,port=6380,db=0>
File "/usr/local/lib/python3.8/dist-packages/aioredis/connection.py", line 806, in disconnect
await self._writer.wait_closed() # type: ignore[union-attr]
│ └ <member '_writer' of 'Connection' objects>
└ SSLConnection<host=my-host.com,port=6380,db=0>
File "/usr/lib/python3.8/asyncio/streams.py", line 359, in wait_closed
await self._protocol._get_close_waiter(self)
│ │ │ └ <StreamWriter transport=<asyncio.sslproto._SSLProtocolTransport object at 0x7f5bnn56772e0> reader=<StreamReader transport=<asy...
│ │ └ <function StreamReaderProtocol._get_close_waiter at 0x7f5ae5696ca0>
│ └ <asyncio.streams.StreamReaderProtocol object at 0x7f5acc369ac0>
└ <StreamWriter transport=<asyncio.sslproto._SSLProtocolTransport object at 0x7f5acc3772e0> reader=<StreamReader transport=<asy...
RuntimeError: await wasn't used with future