Airflow: How to pass a variable in json / dict format to an operator?

2.8k Views Asked by At

I need to know how to pass a registered dictionary as a variable in the parameters of an operator to launch a databricks notebook, for example.

In my case I have tried some things but not works.

I have this variable saved in Airflow:

"dictionaries" : {
                     "dict1" : {
                                "a" : 1,
                                "b" : 2
                               }

and this code to try to retrieve the variable

dict = "dict1"
values = f"{{{{ var.json.dictionaries.{dict} }}}}"

to later pass it as a dictionary along with other values in the parameters of an operator

task1 = DatabricksRunNowOperator(
            task_id=f'Databricks_{dict1}',
            databricks_conn_id='databricks',
            job_id= 1111,
            notebook_params={"param1": "param1" , **values}

This is failing because the variable comes as str "TypeError: 'str' object is not a mapping", so I have tried to use the json library to try to convert the format but I have not been successful, I get the error message "Expecting property name enclosed in double quotes"

So I suspect that it may be retrieving the variable with single quotes, I have also tried using replace to try to change them to doubles but that didn't work either.

json.loads(values.replace("'","\""))

Being using jinja to retrieve this variable may not behave the same, I'm a bit lost with this, I have tried last

values = json.loads(f"{{{{ (var.json.dictionaries.{dict1}).replace('\'','\"') }}}}")
values = json.loads((f"{{{{ var.json.dictionaries.{dict1} }}}}").replace('\'','\"'))

also inside the operator

notebook_params={"param1": "param1", **json.loads((f"{{{{ var.json.dictionaries.{dict1} }}}}").replace('\'','\"'))}

but get the same error "json.decoder.JSONDecodeError: Expecting property name enclosed in double quotes"

Surely i am misinterpreting the way to recover the variable and transform it, if someone can help me with this I would appreciate it.

Regards

1

There are 1 best solutions below

0
Dinesh Marimuthu On

You'll have to save the variable's value as shown below:

{
 "dictionaries" : {
                   "dict1" : {
                             "a" : 1,
                             "b" : 2
                            }
                   }
}

And when you access the variable in the code, pass the deserialize_json=True. Lets say your variable was called 'json' then,

from airflow.models import Variable
json_data = Variable.get('json', deserialize_json=True)
dict1 = json_data['dictionaries']['dict1']