xcom pull in BigQueryInsertJobOperator Airflow

49 Views Asked by At

I am using PythonOperator to push values using xcom and I have to use these values in BigQueryInsertJobOperator as parameters and pass to sql file.

def func1(**context):

    context['ti'].xcom_push(key='param1', value=param1)


func1 = PythonOperator(
            task_id = "func1",
            python_callable = func1,
            dag = dag,
            provide_context=True)

bq_op_load = BigQueryInsertJobOperator(
task_id="bq_op_load",
configuration={
    "query": {
        "query": "{% include 'sql_file_path.sql' %}",
        "useLegacySql": False,
    }
    },
params = {
"param1":'{{ task_instance.xcom_pull(task_ids="func1", key="param1") }}'
},   
)

The sql file is receiving this parameters as below:

CALL `project.db.storedproc`('{{params.param1}}');

But instead of getting the value I am getting '{{ task_instance.xcom_pull(task_ids="func1", key="param1") }}'

1

There are 1 best solutions below

0
Hussein Awala On

That's because you're using a Jinja template in another Jinja template, but the operator renders your templates once.

To fix it, you can provide the first template directly in your file without using params:

CALL `project.db.storedproc`('{{ task_instance.xcom_pull(task_ids="func1", key="param1") }}');