Airflow : Trigger DAG depending on dag_run.conf content

1.9k Views Asked by At

I am pretty new to Airflow and I have a DAG which should be used in 2 different ways depending on the a "run_mode" variable that will be passed when I trigger the DAG through the configuration like {"run_mode":"full"}

I need to be able to read this parameter and then use it inside a function to decide some other logic after.

I have tryed following code below, but function "which_mode_to_run" always goes on the else part even though my parameter value is "full".

How can I read the value of the parameter and use it in this function?

run_mode='{{ dag_run.conf.run_mode }}'
def which_mode_to_run(param_run_mode):
    if param_run_mode=='full':
       return ''
    else:
       return '-inc'
delta=which_mode_to_run(run_mode)

I have also tryed reading it like :

def get_param(**kwargs):
    rn_md = kwargs['dag_run'].conf.get('run_mode')
    return rn_md

run_mode = get_param

Thank you

Update :

I have managed to read the parameter and the which_mode_to_run return the right value. The problem is that I can't use the xCom result in other function. So based on that result I need to calculate a variable which will be passed in the configuration of the DAG.

def which_mode_to_run(param_json,**kwargs):
if param_json=='full':
    return ''
elif param_json=='score':
    return '-score'
else:
    return '-inc'

which_mode_to_run_task = PythonOperator(
               task_id="which_mode_to_run",
               python_callable=which_mode_to_run,
               op_kwargs={"param_json": "{{ dag_run.conf['run_mode'] }}"},
               provide_context=True,
               do_xcom_push=True)

So after this is executed, I need it's result in a variable that I can use it inside my python code to decide :

delta="{{ task_instance.xcom_pull(task_ids='which_mode_to_run') }}"
if delta=='-inc':
    return 'application.conf1'
elif delta=='' or delta=='-score':
    return 'application.conf2'

This application.conf will be passed in the SparkSubmitOperator , so that the DAG takes the right configuration to run.

Basically the main question can I use the xCom value, not in an airflow Operator but in a simple if?

1

There are 1 best solutions below

1
Hussein Awala On

This variables are not available before the run time, where they are related to the run itself, same for the jinja template data which are not available outside the run context, so you need to call your method in a python operator (or any other operator but python is the easier one), then you can use the result (stored in a xcom) in the other tasks:

from airflow.operators.python import PythonOperator

def which_mode_to_run(param_run_mode):
    if '{{ dag_run.conf["run_mode"] }}'=='full':
       return ''
    else:
       return '-inc'

which_mode_to_run_task = PythonOperator(
        task_id="which_mode_to_run", python_callable=which_mode_to_run
    )

To use the result in the other tasks:

"{{ task_instance.xcom_pull(task_ids='which_mode_to_run_task') }}"