I wanted to make dynamic pipeline based on the result from query. I am trying to get some data from database and based on the result, I wanted to call different tasks. Why results from fetchall is not iterable? You can check the sample code below:-
from datetime import datetime, timedelta
from airflow.utils.edgemodifier import Label
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'start_date': datetime.now() - timedelta(minutes=20),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
@dag(dag_id='pipeline_taskflow_api_v01',
default_args=default_args,
start_date=datetime.now() - timedelta(minutes=20),
schedule_interval='@daily',
render_template_as_native_obj=True)
def pipeline_etl():
@task()
def get_unprocessed_data():
pg_hook = PostgresHook(
postgres_conn_id='some_connection',
schema='public'
)
pg_conn = pg_hook.get_conn()
cursor = pg_conn.cursor()
# get max id from source table.
cursor.execute("SELECT MAX(id) FROM public.source;")
max_id = cursor.fetchone()[0]
# get max id from last run.
cursor.execute("SELECT MAX(id) FROM public.airflow_pipeline;")
processed_max_id = cursor.fetchone()[0]
cursor.execute("select DISTINCT a,b,.... from x.... where "
"s.id >= %s", [processed_max_id])
return cursor.fetchall()
@task.branch(task_id="branching")
def select_branch_pipeline(stream_list):
if stream_list[1] == 'a' and stream_list[3] == 'x':
return 'a_x'
elif stream_list[1] == 'a' and stream_list[3] == 'y':
return 'a_y'
elif stream_list[1] == 'b' and stream_list[3] == 'x':
return 'b_x'
elif stream_list[1] == 'b' and stream_list[3] == 'y':
return 'b_y'
elif stream_list[1] == 'c' and stream_list[3] == 'x':
return 'c_x'
streams = get_unprocessed_data()
for stream in streams:
processed_branch = select_branch_pipeline(stream)
run_this_first = EmptyOperator(task_id="run_this_first")
join = EmptyOperator(task_id="join", trigger_rule="none_failed_min_one_success")
t = EmptyOperator(task_id=processed_branch)
empty_follow = EmptyOperator(task_id="follow_" + processed_branch)
# Label is optional here, but it can help identify more complex branches
run_this_first >> Label(processed_branch) >> t >> empty_follow >> join
# call taskflow
demo_taskflow = pipeline_etl()
I wanted to make dynamic pipeline based on the result from query.