Airflow - Sequential Dynamic taskgroups

52 Views Asked by At

I am parsing a yaml with some EMR steps to run as a @task , and I cannot add a task inside with a for loop to create the actual operators that run the operations described on the yaml file.

The dag looks more or less like this :

with DAG(...):
  @task
  def parse_yaml(yaml_path):
     """ 
     read the templated yaml and expand the jinja variables inside
     This yaml has blocks of steps that have to run sequentially:
      something like this:
      groups:
        - group_name:1
          steps:
            - step_name: 1.1
              step_emr_command: spark-submit ...
            - step_name: 1.2
              step_emr_command: spark-submit ...
        - group_name:2
          steps:
             - step_name: 2.1
               step_emr_command: spark-submit ...
             - step_name: 2.2
               step_emr_command: spark-submit ...
      """
     return parsed_yaml
  
  @task_group
  def run_group(group_definition):
     setup_emr_cluster = EMRCreateJobFlowOperator()
     run_steps = EMRAddStepsOperator(group_definition['steps'])
     terminate_emr_cluster = EMRTerminateJobFlowOperator()
     setup_emr_cluster >> run_steps >> terminate_emr_cluster
  
  parsed_yaml = parse_yaml(yaml_path)
  
  # this runs all the step groups in parallel (bad)
  run_group.expand(parsed_yaml)

hopefully this pseudo example showcases what im trying to achieve. I need the run_group for group:1 to run before group2 , not in parallel.

Here is the diagram that should be run based on the yaml file above:

                          ┌───────────────────────────────────────────────────┐                ┌─────────────────────────────────────────────────┐
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
┌─────────────┐           │                                                   │                │                                                 │
│             │           │                                                   │                │ ┌─────────┐  ┌───────┐   ┌───────┐   ┌───────── │
│  parse_yaml ├──────────►│  ┌─────────  ┌───────┐   ┌────────┐  ┌──────────  │                │ │EMRCreate├─►│step   ├──►│step   ├──►│Terminate │
│             │           │  │EMRCreate  │ step  │   │step    │  │Terminate''e├───────────────►│ │Cluster2 │  │2.1    │   │2.2    │   │Cluster2│ │
│             │           │  │Cluster1┌─►│ 1.1   ├──►│ 1.2    ├─►│Cluster1 │  │                │ └─────────┘  └───────┘   └───────┘   └────────┘ │
└─────────────┘           │  └────────┘  └───────┘   └────────┘  └─────────┘  │                │                                                 │
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
                          │                                                   │                │                                                 │
                          │                             Group 1               │                │                                         Group 2 │
                          │                                                   │                │                                                 │
                          └───────────────────────────────────────────────────┘                └─────────────────────────────────────────────────┘

Maybe there is a way to do this natively on airflow, I have tried this:

with DAG(...):
  @task
  def parse_yaml(yaml_path):
     """ 
     read the templated yaml and expand the jinja variables inside
     This yaml has blocks of steps that have to run sequentially:
      something like this:
      groups:
        - group_name:1
          steps:
            - step_name: 1.1
              step_emr_command: .....
        - group_name:2
          steps:
             - step_name: 2.1
               step_emr_command: ...
      """
     return parsed_yaml
  
  @task
  def run_group_task(parsed_yaml):
     for group parsed_yaml:
       create_cluster = EMRCreateJobFlowOperator()
       run_steps = EMRAddStepsOperator(group_definition['steps'])
       terminate_emr_cluster = EMRTerminateJobFlowOperator()
       setup_emr_cluster >> run_steps >> terminate_emr_cluster
  
  parsed_yaml = parse_yaml(yaml_path)
  
  # this runs all the step groups in parallel (bad)
  run_group_task(parsed_yaml)

The above dag throws an exception inside the run_group_task:

airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(EmrAddStepsOperator): run_steps>, <Task(EmrCreateJobFlowOperator): create_cluster>]
1

There are 1 best solutions below

1
subram On

Per the airflow Taskflow tutorial and source code example, need to use the @dag decorator to identify it as a DAG instead of the with() block

https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/tutorial_taskflow_api.html