Airflow DAG parameter max_active_runs doesn't limits number of active runs

1.8k Views Asked by At

I am running an Airflow instance hosted on kubernetes. My use case needs multiple trigger of my ETL dag from master dag. Here the locations for which the runs of ETL dag that have to be made are decided in one of the tasks of the master dag itself. Now to achieve this dynamic flow i am using the PythonOperator in master dag to loop throught paths for which ETL dag has to be triggered and doing post call to trigger dag(is there a better way to do this?). Now since the pipeline inside ETL dag has to run one after the other, I want the runs of ETL dags to be queued and ran once the previous run has completed. For this i am trying to use max_active_runs param of dag to try and queue the dag runs of ETL dag. Reference taken from here. But when i trigger multiple runs of ETL dag it still doesn't queues the dags runs and keep them in running state and they get executed as soon the as first execution finishes.

Can anyone provide any alternative solution or fix to the above problem.

2

There are 2 best solutions below

0
Sagar On BEST ANSWER

So to solve this issue beside defining max_active_runs=1 in dag config (not that is helped), I took the following steps: Firstly defined a task using PythonOperator in the master DAG, which in turn used TriggerDagRunOperator to trigger n runs of ETL DAG, here I tweaked the allowed states to have failure state as well, so that I can leave the failure scenario to be handled in ETL DAG. (If you want rest of the runs to not be executed you can remove failure state and the loop will break and the task failed.)

    def trigger_n_runs(**kwargs):
        from airflow.operators.trigger_dagrun import TriggerDagRunOperator
        from airflow.utils.state import State
        run = n //logic to find number of runs
        for n in range(run):
            triggersplitrun = TriggerDagRunOperator(
                task_id="n_runs_trigger",
                trigger_dag_id=kwargs["trigger_dag_name"],
                conf={},
                poke_interval=60,
                wait_for_completion=True,
                allowed_states=[State.SUCCESS, State.FAILED],
                do_xcom_push=True
            )
            trigger_state = triggersplitrun.execute(kwargs)

The tasks will be executed only after previous instance of run is completed, since 'wait_for_completion' is set to True.

Secondly, I defined a task using BranchPythonOperator in the beginning of the ETL DAG to check if the last run was successful and control the flow accordingly. (The dag_ref is the dag context name 'with DAG () as dag:')

    LastRunStatus = BranchPythonOperator(
        task_id="LastRunStatus",
        python_callable=last_dag_run,
        op_kwargs={
            "task_id_on_success": "SuccessJob",
            "task_id_on_failure": "FailureJob",
            "dag_ref": dag
        },
        provide_context=True
    )

The callable looked something like this:

def last_dag_run(**kwargs):
    this_dag_run = kwargs["dag_ref"].get_last_dagrun(include_externally_triggered=True)
    previous_dag_run = this_dag_run.get_previous_dagrun()
    if previous_dag_run is None:
        return kwargs["task_id_on_success"]
    else:
        previous_run_status = previous_dag_run.state
        if previous_run_status == "success":
            return kwargs["task_id_on_success"]
        else:
            return kwargs["task_id_on_failure"]
1
Hussein Awala On

When you set max_active_runs to 0, Airflow will not automatically schedules new runs, if there is a not finished run in the dag. But if you create a run manually, it will be scheduled and executed normally.

If you want to block the run completely if there is another one with smaller execution_date, you can create a sensor on the beginning of your dag, which check if there is a run with execution_date < current execution_date, and state != succeeded.

And if for example you have run1: (A1, B1, C1), and run2: (A2, B2, C2), and you want to run A1 before A2, B1 before B2 and C1 before C2, but there is no problem if B2 is running in parallel with C1... you can activate the option depends_on_past on all the tasks of your dag.

Another option is using a pool with size 1 for all the tasks, but in this case you will not able to run multiple task in parallel in the same run, and if you run fail, airflow will run the second one without waiting.