ExternalTaskSensor not working as expected. Here is example of dags.
Dag1(test_first_dag.py):
Task 1 -> print("Dag1-Task1-Hello World")
Task 2 -> print("Dag1-Task2-Hello World")
Dag2(test_second_dag.py):
Task 1 -> print("Dag2-Task1-Hello World")
Task 2 -> print("Dag2-Task2-Hello World")
Once Dag 1 triggered - it can run as normal. Once the second Dag is triggered , the Task1 of Dag2 should check whether the task 1 of Dag 1 has completed or not. For testing , I used to run the Dag2 first . So it should start then will wait until task1 of Dag1 completes. In the below code, I have two Dags. On execution the Task1 of Dag2 keeps on waiting , even though the Dag1 completes.
Can some one help why the trigger is not working properly ?
test_first_dag.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import time
# Define the default arguments for the DAG
default_args = {
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
}
dag = DAG(
'test_first_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def task1():
print("Dag1-Task1-Hello World")
def task2():
time.sleep(10) # Delay for 10 seconds
print("Dag1-Task2-Hello World")
task1 = PythonOperator(
task_id='task1',
python_callable=task1,
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2,
dag=dag,
)
task1 >> task2
test_second_dag.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta
default_args = {
'start_date': datetime(2022, 1, 2), # Make sure it's after the start date of Dag1
}
dag = DAG(
'test_second_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def task1():
print("Dag2-Task1-Hello World")
def task2():
print("Dag2-Task2-Hello World")
# Wait for Dag1's task1 to be completed
wait_for_dag1_task1 = ExternalTaskSensor(
task_id='wait_for_dag1_task1',
external_dag_id='test_first_dag',
external_task_id='task1',
dag=dag,
)
task1 = PythonOperator(
task_id='task1',
python_callable=task1,
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2,
dag=dag,
)
wait_for_dag1_task1 >> task1 >> task2
The problem is probably related to executor,
start_date's or poke_interval. Let's do a little test withLocalExecutor.test_first_dag.py:test_second_dag.py:let's turn on only
test_second_dagand checkwait_for_dag1_task1logs. You'll see something like:As you can see
ExternalTaskSensorwaits for['task1'] in dag test_first_dag on 2024-01-01T00:00:00+00:00but task was not completed and sensor failed by time out.Now let's delete
dag_run, turn ontest_second_dag+test_first_dagand check logs again:As you can see everything should work fine