I am parsing a yaml with some EMR steps to run as a @task , and I cannot add a task inside with a for loop to create the actual operators that run the operations described on the yaml file.
The dag looks more or less like this :
with DAG(...):
@task
def parse_yaml(yaml_path):
"""
read the templated yaml and expand the jinja variables inside
This yaml has blocks of steps that have to run sequentially:
something like this:
groups:
- group_name:1
steps:
- step_name: 1.1
step_emr_command: spark-submit ...
- step_name: 1.2
step_emr_command: spark-submit ...
- group_name:2
steps:
- step_name: 2.1
step_emr_command: spark-submit ...
- step_name: 2.2
step_emr_command: spark-submit ...
"""
return parsed_yaml
@task_group
def run_group(group_definition):
setup_emr_cluster = EMRCreateJobFlowOperator()
run_steps = EMRAddStepsOperator(group_definition['steps'])
terminate_emr_cluster = EMRTerminateJobFlowOperator()
setup_emr_cluster >> run_steps >> terminate_emr_cluster
parsed_yaml = parse_yaml(yaml_path)
# this runs all the step groups in parallel (bad)
run_group.expand(parsed_yaml)
hopefully this pseudo example showcases what im trying to achieve. I need the run_group for group:1 to run before group2 , not in parallel.
Here is the diagram that should be run based on the yaml file above:
┌───────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────┐
│ │ │ │
│ │ │ │
┌─────────────┐ │ │ │ │
│ │ │ │ │ ┌─────────┐ ┌───────┐ ┌───────┐ ┌───────── │
│ parse_yaml ├──────────►│ ┌───────── ┌───────┐ ┌────────┐ ┌────────── │ │ │EMRCreate├─►│step ├──►│step ├──►│Terminate │
│ │ │ │EMRCreate │ step │ │step │ │Terminate''e├───────────────►│ │Cluster2 │ │2.1 │ │2.2 │ │Cluster2│ │
│ │ │ │Cluster1┌─►│ 1.1 ├──►│ 1.2 ├─►│Cluster1 │ │ │ └─────────┘ └───────┘ └───────┘ └────────┘ │
└─────────────┘ │ └────────┘ └───────┘ └────────┘ └─────────┘ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ │ │ │
│ Group 1 │ │ Group 2 │
│ │ │ │
└───────────────────────────────────────────────────┘ └─────────────────────────────────────────────────┘
Maybe there is a way to do this natively on airflow, I have tried this:
with DAG(...):
@task
def parse_yaml(yaml_path):
"""
read the templated yaml and expand the jinja variables inside
This yaml has blocks of steps that have to run sequentially:
something like this:
groups:
- group_name:1
steps:
- step_name: 1.1
step_emr_command: .....
- group_name:2
steps:
- step_name: 2.1
step_emr_command: ...
"""
return parsed_yaml
@task
def run_group_task(parsed_yaml):
for group parsed_yaml:
create_cluster = EMRCreateJobFlowOperator()
run_steps = EMRAddStepsOperator(group_definition['steps'])
terminate_emr_cluster = EMRTerminateJobFlowOperator()
setup_emr_cluster >> run_steps >> terminate_emr_cluster
parsed_yaml = parse_yaml(yaml_path)
# this runs all the step groups in parallel (bad)
run_group_task(parsed_yaml)
The above dag throws an exception inside the run_group_task:
airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(EmrAddStepsOperator): run_steps>, <Task(EmrCreateJobFlowOperator): create_cluster>]
Per the airflow Taskflow tutorial and source code example, need to use the @dag decorator to identify it as a DAG instead of the with() block
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/tutorial_taskflow_api.html