I am pretty new to Airflow and I have a DAG which should be used in 2 different ways depending on the a "run_mode" variable that will be passed when I trigger the DAG through the configuration like {"run_mode":"full"}
I need to be able to read this parameter and then use it inside a function to decide some other logic after.
I have tryed following code below, but function "which_mode_to_run" always goes on the else part even though my parameter value is "full".
How can I read the value of the parameter and use it in this function?
run_mode='{{ dag_run.conf.run_mode }}'
def which_mode_to_run(param_run_mode):
if param_run_mode=='full':
return ''
else:
return '-inc'
delta=which_mode_to_run(run_mode)
I have also tryed reading it like :
def get_param(**kwargs):
rn_md = kwargs['dag_run'].conf.get('run_mode')
return rn_md
run_mode = get_param
Thank you
Update :
I have managed to read the parameter and the which_mode_to_run return the right value. The problem is that I can't use the xCom result in other function. So based on that result I need to calculate a variable which will be passed in the configuration of the DAG.
def which_mode_to_run(param_json,**kwargs):
if param_json=='full':
return ''
elif param_json=='score':
return '-score'
else:
return '-inc'
which_mode_to_run_task = PythonOperator(
task_id="which_mode_to_run",
python_callable=which_mode_to_run,
op_kwargs={"param_json": "{{ dag_run.conf['run_mode'] }}"},
provide_context=True,
do_xcom_push=True)
So after this is executed, I need it's result in a variable that I can use it inside my python code to decide :
delta="{{ task_instance.xcom_pull(task_ids='which_mode_to_run') }}"
if delta=='-inc':
return 'application.conf1'
elif delta=='' or delta=='-score':
return 'application.conf2'
This application.conf will be passed in the SparkSubmitOperator , so that the DAG takes the right configuration to run.
Basically the main question can I use the xCom value, not in an airflow Operator but in a simple if?
This variables are not available before the run time, where they are related to the run itself, same for the jinja template data which are not available outside the run context, so you need to call your method in a python operator (or any other operator but python is the easier one), then you can use the result (stored in a xcom) in the other tasks:
To use the result in the other tasks: