We are trying to re-organize the orchestration of the step submission mechanism in AWS EMR to utilize the concurrency support release under this post using Apache Airflow.
We can submit parallel steps in the EMR but have not been able to figure out how to submit steps after each individual step is completed of the first parallel step runs.
Following is the code snippet:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from airflow.sensors.external_task import ExternalTaskSensor
default_args = {
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
# Define your DAG
dag_id = 'Cluster_parallel_job'
with DAG(
dag_id=dag_id,
default_args=default_args,
start_date=datetime.now()
) as dag:
# Define your EMR cluster ID
cluster_id = 'j-XXXXXXXXX'
# Define the EMR step
step_1 = {
'Name': 'Job_111111111',
'ActionOnFailure': 'CONTINUE', # Specify the action on step failure
'HadoopJarStep': {
'Jar': 'command-runner.jar', # Specify the command-runner.jar for running custom Hadoop streaming steps
'Args': [
'python3',
'/home/hadoop/sleep_120.py'
],
}
}
step_medical_1 = {
'Name': 'Job_medical_1',
'ActionOnFailure': 'CONTINUE', # Specify the action on step failure
'HadoopJarStep': {
'Jar': 'command-runner.jar', # Specify the command-runner.jar for running custom Hadoop streaming steps
'Args': [
'python3',
'/home/hadoop/sleep_600.py'
],
}
}
step_elig_1 = {
'Name': 'Job_eligibility_1',
'ActionOnFailure': 'CONTINUE', # Specify the action on step failure
'HadoopJarStep': {
'Jar': 'command-runner.jar', # Specify the command-runner.jar for running custom Hadoop streaming steps
'Args': [
'python3',
'/home/hadoop/sleep_120.py'
],
}
}
step_phar_1 = {
'Name': 'Job_Phar_1',
'ActionOnFailure': 'CONTINUE', # Specify the action on step failure
'HadoopJarStep': {
'Jar': 'command-runner.jar', # Specify the command-runner.jar for running custom Hadoop streaming steps
'Args': [
'python3',
'/home/hadoop/sleep_180.py'
],
}
}
step_rx_1 = {
'Name': 'Job_RX_1',
'ActionOnFailure': 'CONTINUE', # Specify the action on step failure
'HadoopJarStep': {
'Jar': 'command-runner.jar', # Specify the command-runner.jar for running custom Hadoop streaming steps
'Args': [
'python3',
'/home/hadoop/sleep_180.py'
]
}
}
add_step_job_1 = EmrAddStepsOperator(
task_id='step_1',
job_flow_id=cluster_id,
steps=[step_1]
)
# Medical
add_step_medical_1 = EmrAddStepsOperator(
task_id='step_medical_1',
job_flow_id=cluster_id,
steps=[step_medical_1]
)
# # Pharmacy
add_step_phar_1 = EmrAddStepsOperator(
task_id='step_phar_1',
job_flow_id=cluster_id,
steps=[step_phar_1]
)
# #Rx
add_step_Rx_1 = EmrAddStepsOperator(
task_id='step_rx_1',
job_flow_id=cluster_id,
steps=[step_rx_1]
)
try:
def check_sensor_function(sensor_task_name):
step_id_string = "{{task_instance.xcom_pull('" + sensor_task_name + "', key='return_value')[0]}}"
print("Checking step sensor :: " + step_id_string)
#Sensors
check_sensor = EmrStepSensor(
task_id = 'check_sensor' + '_' + sensor_task_name ,
job_flow_id = cluster_id,
step_id = step_id_string,
dag=dag
)
print("--- Check Sensor here --- " + sensor_task_name)
return check_sensor
except Exception as e:
print("----- Got Exception Here ----" + sensor_task_name)
print(e)
add_step_job_1 >> check_sensor_function('step_1') >> [ add_step_medical_1, add_step_phar_1 ]
# This doesn't get executed.
check_sensor_function('step_medical_1') >> add_step_Rx_1
Basically, Task "add_step_job_1" is submitted with it's EMR step sensor checking to wait for the job completion. After that on parallel, "add_step_medical_1" and "add_step_phar_1" step are submitted gracefully.
Now, what we are trying to do is wait for task : "add_step_medical_1" (step : "step_medical_1" ) and task: "add_step_phar_1" (step: "step_phar_1" ) individually to submit another set of task.
In the code above, I've represented it with the line:
# This doesn't get executed.
check_sensor_function('step_medical_1') >> add_step_Rx_1
This waiting on individual parallel task to submit another set of parallel task can be chained multiple times. If we can crack problem we might be able to re-write our orchestration objectives.
Note: This code runs in EMR with some basic "aws_default" configuration on the airflow setting. We have configured step concurrency value to : 2 for our testing with this code.
Please let me know if you have questions.