psycopg2.errors.DeadlockDetected: deadlock detected

880 Views Asked by At

PROBLEM

I have a batch Job that runs when user updates a row the UI. User is allowed to update multiple rows simultaneously which will trigger multiple batch jobs each with a unique run_id .

This job creates a CSV file and inserts the values into a table (allocations_update)

After the values are dumped into this table, we update a second table (allocations_od) using the values from the previous table (allocations_update).

The Query to update allocations_od is:

UPDATE db.allocations_od target
SET rec_alloc = src.rec_alloc 
FROM db.allocations_update src
WHERE  src.run_id = '{run_id}' 
  AND  src.col1 = target.col1
  AND  src.col2 = target.col2

However, sometimes when user trigger's multiple instances of this Job (by updating multiple columns simiultaneously), i get a deadlock error when it tries to run the 2nd Update Query for allocations_od.

The complete ERROR Message is as shown below:

psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 15455 waits for ShareLock on transaction 62597603; blocked by process 15538.
Process 15538 waits for ShareLock on transaction 62597592; blocked by process 15455.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (479821,43) in relation \""allocations_od_20230514\""

I want to know what is causing the Deadlock. My best guess is that some other instance of Job is still running the 1st Query which acquires a lock on allocations_update so both this process are blocked.

MY CODE

Entire Batch process is quite long and complex, however this is the final piece which is causing the issue

    
    def update_alloc_query(self, final_data, stage_location):
        """ Method to bulk update allocations od table"""

        # stage_location is the s3 path of csv file.

        last_created_date = self.get_last_created_date()
        last_created_date = last_created_date.strftime('%Y-%m-%d')
        final_data['created_date'] = last_created_date
        run_id = final_data['run_id'].unique()[0]
        s3.s3_upload_df(stage_location, final_data)
        UITableLoader.bulk_upload_from_csv(db_model=AllocationsUpdate,
                                           file_location=stage_location,
                                           data_types={"rsid": "str", "passenger_class": "str",
                                                       "journey_origin": "str",
                                                       "journey_destination": "str",
                                                       "bucket_code": "str",
                                                       "eff_departure_date": "str",
                                                       "recommended_allocation": "float",
                                                       "run_id": "str"},
                                           sep="|",
                                           created_date=last_created_date)
        self.logger.info("Added table into new data")
        allo_sql = f"UPDATE db.allocations_od target\
                    set rec_alloc = src.rec_alloc FROM\
                    db.allocations_update src\
                    WHERE src.run_id = '{run_id}' AND  \
                    src.col1 = target.col1 AND\
                    src.col2 = target.col2'"
        execute_sql_statement(allo_sql)
        self.logger.info("executed update query")



# UITableLoader.bulk_upload_from_csv


 @staticmethod
 def bulk_upload_from_csv(db_model, file_location, data_types=None, sep=',',
                created_date=None, chunk_size=1000):
        """Function uploads data from local csv file to sql alchemy db."""
        LOGGER.info("Bulk loading data.",
                    file_location=file_location, table=db_model.__table__)
        record_count = 0
        chunks = pd.read_csv(
            file_location,
            dtype=data_types,
            chunksize=chunk_size,
            sep=sep,
            on_bad_lines='skip'
        )

        for chunk in chunks:
            chunk = chunk.where((pd.notnull(chunk)), None)
            chunk = chunk.replace({np.nan: None})
            record_count += chunk.shape[0]
            if created_date is not None:
                chunk['created_date'] = created_date
            rows = chunk.to_dict(orient='records')
            sqa_save(db_model, rows, save_many=True)

        return record_count


def execute_sql_statement(sql_statement, conn_string=None):  # pragma: no cover
    """Executes the given sql_statement"""
    if not sql_statement:
        return
    if not conn_string:
        conn_string = get_db_connection_string()
    dbsession = get_db_session(conn_string)
    try:
        dbsession.execute(sql_statement)
        dbsession.commit()
    except SQLAlchemyError as ex:
        LOGGER.exception(f"Error executing sql statement '{sql_statement}'")
        dbsession.rollback()
        raise ex
    finally:
        dbsession.close()

1

There are 1 best solutions below

0
nadine On

I guess there are concurrent updates on the allocations_od table. You could try a row-level lock with the FOR UPDATE clause:

allo_sql = f"UPDATE db.allocations_od target \
                 SET rec_alloc = src.rec_alloc \
                 FROM db.allocations_update src \
                 WHERE src.run_id = '{run_id}' \
                 AND src.col1 = target.col1 \
                 AND src.col2 = target.col2 \
                 FOR UPDATE"