PROBLEM
I have a batch Job that runs when user updates a row the UI. User is allowed to update multiple rows simultaneously which will trigger multiple batch jobs each with a unique run_id .
This job creates a CSV file and inserts the values into a table (allocations_update)
After the values are dumped into this table, we update a second table (allocations_od) using the values from the previous table (allocations_update).
The Query to update allocations_od is:
UPDATE db.allocations_od target
SET rec_alloc = src.rec_alloc
FROM db.allocations_update src
WHERE src.run_id = '{run_id}'
AND src.col1 = target.col1
AND src.col2 = target.col2
However, sometimes when user trigger's multiple instances of this Job (by updating multiple columns simiultaneously), i get a deadlock error when it tries to run the 2nd Update Query for allocations_od.
The complete ERROR Message is as shown below:
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 15455 waits for ShareLock on transaction 62597603; blocked by process 15538.
Process 15538 waits for ShareLock on transaction 62597592; blocked by process 15455.
HINT: See server log for query details.
CONTEXT: while updating tuple (479821,43) in relation \""allocations_od_20230514\""
I want to know what is causing the Deadlock. My best guess is that some other instance of Job is still running the 1st Query which acquires a lock on allocations_update so both this process are blocked.
MY CODE
Entire Batch process is quite long and complex, however this is the final piece which is causing the issue
def update_alloc_query(self, final_data, stage_location):
""" Method to bulk update allocations od table"""
# stage_location is the s3 path of csv file.
last_created_date = self.get_last_created_date()
last_created_date = last_created_date.strftime('%Y-%m-%d')
final_data['created_date'] = last_created_date
run_id = final_data['run_id'].unique()[0]
s3.s3_upload_df(stage_location, final_data)
UITableLoader.bulk_upload_from_csv(db_model=AllocationsUpdate,
file_location=stage_location,
data_types={"rsid": "str", "passenger_class": "str",
"journey_origin": "str",
"journey_destination": "str",
"bucket_code": "str",
"eff_departure_date": "str",
"recommended_allocation": "float",
"run_id": "str"},
sep="|",
created_date=last_created_date)
self.logger.info("Added table into new data")
allo_sql = f"UPDATE db.allocations_od target\
set rec_alloc = src.rec_alloc FROM\
db.allocations_update src\
WHERE src.run_id = '{run_id}' AND \
src.col1 = target.col1 AND\
src.col2 = target.col2'"
execute_sql_statement(allo_sql)
self.logger.info("executed update query")
# UITableLoader.bulk_upload_from_csv
@staticmethod
def bulk_upload_from_csv(db_model, file_location, data_types=None, sep=',',
created_date=None, chunk_size=1000):
"""Function uploads data from local csv file to sql alchemy db."""
LOGGER.info("Bulk loading data.",
file_location=file_location, table=db_model.__table__)
record_count = 0
chunks = pd.read_csv(
file_location,
dtype=data_types,
chunksize=chunk_size,
sep=sep,
on_bad_lines='skip'
)
for chunk in chunks:
chunk = chunk.where((pd.notnull(chunk)), None)
chunk = chunk.replace({np.nan: None})
record_count += chunk.shape[0]
if created_date is not None:
chunk['created_date'] = created_date
rows = chunk.to_dict(orient='records')
sqa_save(db_model, rows, save_many=True)
return record_count
def execute_sql_statement(sql_statement, conn_string=None): # pragma: no cover
"""Executes the given sql_statement"""
if not sql_statement:
return
if not conn_string:
conn_string = get_db_connection_string()
dbsession = get_db_session(conn_string)
try:
dbsession.execute(sql_statement)
dbsession.commit()
except SQLAlchemyError as ex:
LOGGER.exception(f"Error executing sql statement '{sql_statement}'")
dbsession.rollback()
raise ex
finally:
dbsession.close()
I guess there are concurrent updates on the
allocations_odtable. You could try a row-level lock with theFOR UPDATEclause: