How to execute parallel queries with PyGreSQL?

185 Views Asked by At

I am trying to run multiple queries in parallel with PyGreSQL and multiprocessing, but below code hangs without returning:

from pg import DB
from multiprocessing import Pool
from functools import partial


def create_query(table_name):
  return f"""create table {table_name} (id integer);
  CREATE INDEX ON {table_name} USING BTREE (id);"""

my_queries = [ create_query('foo'), create_query('bar'), create_query('baz') ]


def execute_query(conn_string, query):
  con = DB(conn_string)
  con.query(query)
  con.close()

rs_conn_string = "host=localhost port=5432 dbname=postgres user=postgres password="
pool = Pool(processes=len(my_queries))
pool.map(partial(execute_query,rs_conn_string), my_queries)

Is there any way to make it work? Also is it possible make the 3 running queries in same "transaction" in case one query fails and the other get rolled back?

2

There are 2 best solutions below

0
Cito On BEST ANSWER

One obvious problem is that you always run the pool.map, not only in the main process, but also when the interpreters used in the parallel sub-processes import the script. You should do something like this instead:

def run_all():
    with Pool(processes=len(my_queries)) as pool:
        pool.map(partial(execute_query,rs_conn_string), my_queries)

if __name__ == '__main__':
    run_all()

Regarding your second question, that's not possible since the transaction are per connection, which live in separate processes if you do it like that.

Asynchronous command processing might be what you want, but it is not yet supported by PyGreSQL. Psygopg + aiopg is probably better suited for doing things like that.

1
Tanner On

PyGreSql added async with the connection.poll() method. As far as pooling, I like to override MySQL.connectors pooling wrappers to handle pgdb connection objects. There’s a few ‘optional’ connection method calls that will fail that you have to comment out (I.e. checking connection status, etc. these can be implemented on the Pgdb connection object level if you want them, but the calls don’t match MySQL.connectors api interface). There’s probably some low-level bugs associated as the libs are only abstracted similarly, but this solution has been running in prod for a few months now without any problems.