I am trying to efficiently update a row in a postgresql table. For that I use a temp table and try to use COPY. My code looks like this:

conn = psycopg.connect(
    host="localhost", dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD
)


with conn.cursor(name="wordfreq_cursor") as cur, conn.cursor() as ins_cur:
        ins_cur.execute("CREATE TEMP TABLE temp_frequency(id INTEGER NOT NULL, frequency FLOAT4) ON COMMIT DROP")
        cur.itersize = 20_000
        cur.execute("SELECT id, lang_code, word FROM etymology LIMIT 3000") # The limit is only for debugging
        i = 1 
        pbar = tqdm(total=20_000_000)
        with ins_cur.copy("COPY temp_frequency (id, frequency) FROM STDIN") as copy:
            for row in cur:
                id, lang_code, word = row
                if lang_code in langs: # langs is a set of string
                    frequency = zipf_frequency(word, lang_code)
                    copy.write_row((id, frequency))
                pbar.update(1)
            ins_cur.execute("UPDATE etymology e SET e.frequency = t.frequency FROM temp_frequency t WHERE e.id = t.id")
            i += 1
        conn.commit()

However, I am hitting a psycopg.OperationalError: sending query and params failed: another command is already in progress. When googling for this error I only get mistakes related to concurrency, but in my case I don't make any use of multithreading.

How can I fix this error?

I am using Windows 10, Python 3.12, psycopg 3.1.18. The full stack trace is as follows:

aceback (most recent call last):
  File "c:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\ebook_dictionary_creator\add_wordfreq_to_db.py", line 75, in <module>
    temp_table_solution()
  File "c:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\ebook_dictionary_creator\add_wordfreq_to_db.py", line 70, in temp_table_solution
    ins_cur.execute("UPDATE etymology e SET e.frequency = t.frequency FROM temp_frequency t WHERE e.id = t.id")
  File "C:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\.venv\Lib\site-packages\psycopg\cursor.py", line 732, in execute
    raise ex.with_traceback(None)
psycopg.OperationalError: sending query failed: another command is already in progress
  0%|                                                                                  | 3000/20000000 [00:04<7:26:13, 746.90it/s]
PS C:\Users\hanne\Documents\Programme\ultimate-dictionary-api> & C:/Users/hanne/Documents/Programme/ultimate-dictionary-api/ebook_dictionary_creator/.venv/Scripts/python.exe c:/Users/hanne/Documents/Programme/ultimate-dictionary-api/ebook_dictionary_creator/ebook_dictionary_creator/add_wordfreq_to_db.py
  0%|                                                                                                | 0/20000000 [00:00<?, ?it/s]Traceback (most recent call last):
  File "c:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\ebook_dictionary_creator\add_wordfreq_to_db.py", line 75, in <module>
    temp_table_solution()
  File "c:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\ebook_dictionary_creator\add_wordfreq_to_db.py", line 64, in temp_table_solution
    for row in cur:
  File "C:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\.venv\Lib\site-packages\psycopg\server_cursor.py", line 332, in __iter__
    recs = self._conn.wait(self._fetch_gen(self.itersize))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\.venv\Lib\site-packages\psycopg\connection.py", line 969, in wait
    return waiting.wait(gen, self.pgconn.socket, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\.venv\Lib\site-packages\psycopg\waiting.py", line 228, in wait_select
    s = next(gen)
        ^^^^^^^^^
  File "C:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\.venv\Lib\site-packages\psycopg\server_cursor.py", line 173, in _fetch_gen
    res = yield from self._conn._exec_command(query, result_format=self._format)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\hanne\Documents\Programme\ultimate-dictionary-api\ebook_dictionary_creator\.venv\Lib\site-packages\psycopg\connection.py", line 467, in _exec_command
    self.pgconn.send_query_params(command, None, result_format=result_format)
  File "psycopg_binary\\pq/pgconn.pyx", line 276, in psycopg_binary.pq.PGconn.send_query_params
psycopg.OperationalError: sending query and params failed: another command is already in progress
1

There are 1 best solutions below

0
Adrian Klaver On

Not really an answer more a long form comment with suggestions.

  1. Eliminate the whole COPY step by using a plpythonu to create a function that imports zipf_frequency from whatever package you are using. Then you could do a direct UPDATE query.

UPDATE etymology SET frequency = <plpythonu_func>(word, lang_code)

Caveats:

a) You need to be a superuser to install plpythonu.

b) The Python package you import zipf_frequency from would need to be accessible to the Postgres server instance of Python.

Item b) could be dealt with by just using the source code of the zipf_frequency() directly in the plpythonu function, subject to license terms.

  1. Disentangle the existing code. One thought that comes to mind is writing the results of cur.execute("SELECT id, lang_code, word FROM etymology LIMIT 3000") into a list of lists with the calculated frequency value added. Then use COPY to iterate over that to populate the temporary table.