asyncpg issue with large INSERTS in postgres DB

781 Views Asked by At

In postgres I am trying to insert a lot of data from one table to another - roughly 30 million records. It was taking about 40 minutes to do the whole thing so I thought about splitting the inserts up into a series of 30 smaller ones and doing them asynchronously with a library like asyncpg. Each INSERT INTO... SELECT takes, say, 1.5 minutes to insert approx 1 million records but when using asyncpg I'm not seeing any async behaviour. It looks as though the INSERTS are just happening serially, one after the other. Sample, slightly simplified, code is below. Can anyone spot what I'm doing wrong or suggest an alternative method. Is it even possible to do this? Thanks in advance for your help.

import asyncpg
import asyncio

async def test():
    
    conn = await asyncpg.connect(user='myuser', password='mypassword',
                                 database='mydb', host='myhost')
    values = await conn.fetch('''select count(*) from t1''')
    rows=values[0][0]
    await conn.close()

# get a list of 30 non-overlapping start/end rows for the SELECTS
    lst=range(rows)
    nums=(lst[i+1:i + int(len(lst)/30)] for i in range(0, len(lst), int(len(lst)/30)))

    pool =   await asyncpg.create_pool(user='myuser', password='mypassword',
                                 database='mydb', host='myhost')

# The idea is that the below for loop should fire off 
# around 30 INSERT... SELECTS  asynchronously
    for num in nums:
        start=num[0]
        end=num[len(num)-1]+1

        sqlstr=f"""
            insert  into t2
            select 
                   data
            from
            (
                select data , row_number() over() rn
                from t1
            ) as foo where rn between {start} and {end}
        """

        async with pool.acquire() as con:
        await con.execute(sqlstr)

    await pool.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(test())
0

There are 0 best solutions below