I would like to run 5 queries asynchronously using the psycopg_pool. I am not entirely familiar with asyncio and running into an issue. I am hoping someone would be helpful in explaining and debuging my code below as I am not sure what I am doing wrong.
import psycopg
import psycopg_pool
from psycopg.rows import dict_row
class AsyncDBPool:
def __init__(self, pool_size:int):
self.pool = psycopg_pool.AsyncConnectionPool(
conninfo="host={} port={} dbname={} user={} password={}" \
.format(HOSTNAME, PORT, DBNAME, USERNAME, PASSWORD),
min_size= 1,
max_size= pool_size,
timeout=30,
open=False
)
async def open_pool(self):
await self.pool.open()
await self.pool.wait()
async def execute(self, query:str, args:tuple=()):
async with self.pool.connection() as conn:
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(query, args)
await conn.commit()
async def execute_all(self, queries:list):
assert isinstance(queries, list), "must be list"
asyncio.gather(*(self.execute(qa['query'], qa['args']) for qa in queries), return_exceptions=True)
if __name__ == "__main__":
queries = [
{
"query": "insert into test (entry) values (%s);",
"args": n
} for n in range(10)
]
asyncio.run(AsyncDBPool(5).execute_all(queries))
I am not getting any errors but the queries are not committing...
Update: I was able to figure it out, I did not open the pool within the execute_all function and did not await the gathering of tasks. If someone could explain why it didn't work for first time, that would be great :)
Below is the working code: