prin" /> prin" /> prin"/>

Airflow : Task wise dependency check with another Dag using ExternalTaskSensor

52 Views Asked by At

ExternalTaskSensor not working as expected. Here is example of dags.

Dag1(test_first_dag.py):

Task 1 -> print("Dag1-Task1-Hello World")
Task 2 -> print("Dag1-Task2-Hello World")

Dag2(test_second_dag.py):

Task 1 -> print("Dag2-Task1-Hello World")
Task 2 -> print("Dag2-Task2-Hello World")

Once Dag 1 triggered - it can run as normal. Once the second Dag is triggered , the Task1 of Dag2 should check whether the task 1 of Dag 1 has completed or not. For testing , I used to run the Dag2 first . So it should start then will wait until task1 of Dag1 completes. In the below code, I have two Dags. On execution the Task1 of Dag2 keeps on waiting , even though the Dag1 completes.
Can some one help why the trigger is not working properly ? test_first_dag.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import time

# Define the default arguments for the DAG
default_args = {
    'depends_on_past': False,
    'start_date': datetime(2022, 1, 1),
}


dag = DAG(
    'test_first_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)


def task1():
    print("Dag1-Task1-Hello World")

def task2():
    time.sleep(10)  # Delay for 10 seconds
    print("Dag1-Task2-Hello World")

task1 = PythonOperator(
    task_id='task1',
    python_callable=task1,
    dag=dag,
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=task2,
    dag=dag,
)


task1 >> task2

test_second_dag.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta


default_args = {
    'start_date': datetime(2022, 1, 2),  # Make sure it's after the start date of Dag1
}


dag = DAG(
    'test_second_dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)


def task1():
    print("Dag2-Task1-Hello World")

def task2():
    print("Dag2-Task2-Hello World")

# Wait for Dag1's task1 to be completed
wait_for_dag1_task1 = ExternalTaskSensor(
    task_id='wait_for_dag1_task1',
    external_dag_id='test_first_dag',
    external_task_id='task1',
    dag=dag,
)

task1 = PythonOperator(
    task_id='task1',
    python_callable=task1,
    dag=dag,
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=task2,
    dag=dag,
)


wait_for_dag1_task1 >> task1 >> task2
1

There are 1 best solutions below

0
Danila Ganchar On

The problem is probably related to executor, start_date's or poke_interval. Let's do a little test with LocalExecutor.

test_first_dag.py:

import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    'test_first_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval=timedelta(days=1),
    max_active_runs=1,
)


def task1():
    print('Dag1-Task1-Hello World')


def task2():
    time.sleep(10)
    print('Dag1-Task2-Hello World')


operator1 = PythonOperator(
    task_id='task1',
    python_callable=task1,
    dag=dag,
)

operator2 = PythonOperator(
    task_id='task2',
    python_callable=task2,
    dag=dag,
)

operator1 >> operator2

test_second_dag.py:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

from test_first_dag import dag as first_dag, operator1 as dag1_operator

dag = DAG(
    'test_second_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval=timedelta(days=1),
    max_active_runs=1,
)


def task1():
    print('Dag2-Task1-Hello World')


def task2():
    print('Dag2-Task2-Hello World')


wait_for_dag1_task1 = ExternalTaskSensor(
    task_id='wait_for_dag1_task1',
    external_dag_id=first_dag.dag_id,
    external_task_id=dag1_operator.task_id,
    allowed_states=['success'],
    timeout=60,
    poke_interval=5,
    dag=dag,
)

task1 = PythonOperator(
    task_id='task1',
    python_callable=task1,
    dag=dag,
)

task2 = PythonOperator(
    task_id='task2',
    python_callable=task2,
    dag=dag,
)


wait_for_dag1_task1 >> task1 >> task2

let's turn on only test_second_dag and check wait_for_dag1_task1 logs. You'll see something like:

[2024-02-21, 00:13:51 UTC] {external_task.py:244} INFO - Poking for tasks ['task1'] in dag test_first_dag on 2024-01-01T00:00:00+00:00
[2024-02-21, 00:13:56 UTC] {external_task.py:244} INFO - Poking for tasks ['task1'] in dag test_first_dag on 2024-01-01T00:00:00+00:00
[2024-02-21, 00:14:01 UTC] {external_task.py:244} INFO - Poking for tasks ['task1'] in dag test_first_dag on 2024-01-01T00:00:00+00:00
[2024-02-21, 00:14:06 UTC] {external_task.py:244} INFO - Poking for tasks ['task1'] in dag test_first_dag on 2024-01-01T00:00:00+00:00
airflow.exceptions.AirflowSensorTimeout: Sensor has timed out; run duration of 60.3617568549962 seconds exceeds the specified timeout of 60.0.

As you can see ExternalTaskSensor waits for ['task1'] in dag test_first_dag on 2024-01-01T00:00:00+00:00 but task was not completed and sensor failed by time out.

Now let's delete dag_run, turn on test_second_dag + test_first_dag and check logs again:

...
[2024-02-21, 00:41:11 UTC] {external_task.py:244} INFO - Poking for tasks ['task1'] in dag test_first_dag on 2024-01-01T00:00:00+00:00
[2024-02-21, 00:41:17 UTC] {external_task.py:244} INFO - Poking for tasks ['task1'] in dag test_first_dag on 2024-01-01T00:00:00+00:00
[2024-02-21, 00:41:17 UTC] {base.py:293} INFO - Success criteria met. Exiting.

As you can see everything should work fine