Airflow v2 Create Dynamic Task Mapping based on input config from UI

79 Views Asked by At

I have a dag with an input Param like so:

@dag(
    start_date=datetime(2021, 1, 1),
    schedule=None,
    params={
        "param1": Param(
            ["all"],
            "Select from the list of options.",
            type="array",
            title="Param 1",
            examples=["all", 'half'],
        )
    }
)
def updateDAG():
...

I want to create dynamic task mapping based on the number of elements in the input list. I.e. Maximum of 2 tasks and minimum of 1.

I know that I can use the .expand() feature to run a task N times:

links = task1.override(task_id="run_task").expand(arg1='{{ dag_run.conf.param1 }}')

The above reads '{{ dag_run.conf.param1 }}' as a string and creates a task for each character in the string. But I want to pass in the input parameter like ["all"] as given in the input from UI.

1

There are 1 best solutions below

2
Kombajn zbożowy On

Have a separate task that uses get_current_context to retrieve params and write it into XCom.

@task
def get_params():
    return get_current_context()['params']['param1']

links = task1.override(task_id="run_task").expand(arg1=get_params())