im new to airflow and wanted to try to create a dag that removes xcoms. This is my simple code:
@dag(
start_date=datetime(2024, 3, 15),
schedule_interval='@daily',
catchup=False
)
def delete_xcom():[enter image description here](https://i.stack.imgur.com/W9LnA.png)
@provide_session
@task(trigger_rule=TriggerRule.ALL_DONE)
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.dag_id == "delete_xcom").delete()
cleanup_xcom()
delete_xcom()
but the task keep failing and this is the log:
[2024-03-16, 20:38:21 IST] {standard_task_runner.py:107} ERROR - Failed to execute job 203 for task cleanup_xcom ((sqlite3.OperationalError) database is locked
[SQL: INSERT INTO log (dttm, dag_id, task_id, map_index, event, execution_date, owner, owner_display_name, extra) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('2024-03-16 18:38:16.535825', 'delete_xcom', 'cleanup_xcom', -1, <TaskInstanceState.SUCCESS: 'success'>, '2024-03-16 18:38:13.928078', 'airflow', None, None)]
(Background on this error at: https://sqlalche.me/e/14/e3q8); 757)
[2024-03-16, 20:38:21 IST] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-03-16, 20:38:21 IST] {taskinstance.py:3312} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2024-03-16, 20:38:22 IST] {scheduler_job_runner.py:1755} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/delete_xcom.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'delete_xcom', 'Task Id': 'cleanup_xcom', 'Run Id': 'manual__2024-03-16T18:38:13.928078+00:00', 'Hostname': 'b0c160b98f88'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f8db16939d0>, 'is_failure_callback': True} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)
I tried to restart the airflow container, create new dag. from what iv seen online this code should work.
The error was clear.
Don't use SQLite. Integrate your airflow with any external
postgresormysql.