I have been trying to get a slack message callback to trigger on SLA misses. I've noticed that:
SLA misses get registered successfully in the Airflow web UI at
slamiss/list/on_failure_callbackworks successfully
However, the sla_miss_callback function itself will never get triggered.
What I've tried:
Different combinations adding
slaandsla_miss_callbackat thedefault_argslevel, the DAG level, and the task levelChecking logs on our scheduler and workers for SLA related messages (see also here), but we haven't seen anything
The slack message callback function works if called from any other
basic task or function
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': send_task_failed_msg_to_slack,
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=send_sla_miss_message_to_slack,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
def sleep():
""" Sleep for 2 minutes """
time.sleep(90)
LOGGER.info("Slept for 2 minutes")
def simple_print(**context):
""" Prints a message """
print("Hello World!")
sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
dag=dag
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
dag=dag
)
sleep >> simple_task
A lot of these answers are 90% complete so I wanted to share my example using bash operators which combined what I found from all of the responses above and other resources
The most important things being how you define sla_miss_callback in the dag definition and not in the default_args, and not passing context to the sla function.