We are trying to re-organize the orchestration of the step submission mechanism in AWS EMR to utilize the concurrency support release under this post using Apache Airflow.

We can submit parallel steps in the EMR but have not been able to figure out how to submit steps after each individual step is completed of the first parallel step runs.

Following is the code snippet:

from datetime import datetime, timedelta
from airflow import DAG
 
from airflow.models.baseoperator import chain
 
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.sensors.external_task import ExternalTaskSensor
 
default_args = {
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}
 
# Define your DAG
dag_id = 'Cluster_parallel_job'
 
with DAG(
    dag_id=dag_id,
    default_args=default_args,
    start_date=datetime.now()
) as dag:
 
    # Define your EMR cluster ID
    cluster_id = 'j-XXXXXXXXX'
 
    # Define the EMR step
    step_1 = {
        'Name': 'Job_111111111',
        'ActionOnFailure': 'CONTINUE',  # Specify the action on step failure
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',  # Specify the command-runner.jar for running custom Hadoop streaming steps
            'Args': [
                'python3',
                '/home/hadoop/sleep_120.py'
            ],
        }
    }

 
 
    step_medical_1 = {
        'Name': 'Job_medical_1',
        'ActionOnFailure': 'CONTINUE',  # Specify the action on step failure
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',  # Specify the command-runner.jar for running custom Hadoop streaming steps
            'Args': [
                'python3',
                '/home/hadoop/sleep_600.py'
            ],
        }
    }

 
    step_elig_1 = {
        'Name': 'Job_eligibility_1',
        'ActionOnFailure': 'CONTINUE',  # Specify the action on step failure
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',  # Specify the command-runner.jar for running custom Hadoop streaming steps
            'Args': [
                'python3',
                '/home/hadoop/sleep_120.py'
            ],
        }
    }

    step_phar_1 = {
        'Name': 'Job_Phar_1',
        'ActionOnFailure': 'CONTINUE',  # Specify the action on step failure
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',  # Specify the command-runner.jar for running custom Hadoop streaming steps
            'Args': [
                'python3',
                '/home/hadoop/sleep_180.py'
            ],
        }
    }
 
    step_rx_1 = {
        'Name': 'Job_RX_1',
        'ActionOnFailure': 'CONTINUE',  # Specify the action on step failure
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',  # Specify the command-runner.jar for running custom Hadoop streaming steps
            'Args': [
                'python3',
                '/home/hadoop/sleep_180.py'
            ]
        }
    }

    add_step_job_1 = EmrAddStepsOperator(
        task_id='step_1',
        job_flow_id=cluster_id,
        steps=[step_1]
    )
 
    # Medical
    add_step_medical_1 = EmrAddStepsOperator(
        task_id='step_medical_1',
        job_flow_id=cluster_id,
        steps=[step_medical_1]
    )
 
    # # Pharmacy
    add_step_phar_1 = EmrAddStepsOperator(
        task_id='step_phar_1',
        job_flow_id=cluster_id,
        steps=[step_phar_1]
    )
 
    # #Rx
    add_step_Rx_1 = EmrAddStepsOperator(
        task_id='step_rx_1',
        job_flow_id=cluster_id,
        steps=[step_rx_1]
    )
 
    try:
        def check_sensor_function(sensor_task_name):
            step_id_string = "{{task_instance.xcom_pull('" + sensor_task_name + "', key='return_value')[0]}}"
            print("Checking step sensor :: " + step_id_string)
            #Sensors
            check_sensor = EmrStepSensor(
                task_id = 'check_sensor' + '_' + sensor_task_name ,
                job_flow_id = cluster_id,
                step_id = step_id_string,
                dag=dag
            )
            print("--- Check Sensor here --- " + sensor_task_name)
            return check_sensor
    except Exception as e:
        print("----- Got Exception Here ----" + sensor_task_name)
        print(e)
 
 
add_step_job_1 >> check_sensor_function('step_1') >> [ add_step_medical_1, add_step_phar_1 ]

# This doesn't get executed.
check_sensor_function('step_medical_1') >> add_step_Rx_1

Basically, Task "add_step_job_1" is submitted with it's EMR step sensor checking to wait for the job completion. After that on parallel, "add_step_medical_1" and "add_step_phar_1" step are submitted gracefully.

Now, what we are trying to do is wait for task : "add_step_medical_1" (step : "step_medical_1" ) and task: "add_step_phar_1" (step: "step_phar_1" ) individually to submit another set of task.

In the code above, I've represented it with the line:

# This doesn't get executed.
check_sensor_function('step_medical_1') >> add_step_Rx_1

This waiting on individual parallel task to submit another set of parallel task can be chained multiple times. If we can crack problem we might be able to re-write our orchestration objectives.

Note: This code runs in EMR with some basic "aws_default" configuration on the airflow setting. We have configured step concurrency value to : 2 for our testing with this code.

Please let me know if you have questions.

0

There are 0 best solutions below