dynamic pipeline based on result from sql using Airflow: TypeError: 'XComArg' object is not iterable

128 Views Asked by At

I wanted to make dynamic pipeline based on the result from query. I am trying to get some data from database and based on the result, I wanted to call different tasks. Why results from fetchall is not iterable? You can check the sample code below:-

from datetime import datetime, timedelta
from airflow.utils.edgemodifier import Label
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime.now() - timedelta(minutes=20),
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}


@dag(dag_id='pipeline_taskflow_api_v01',
     default_args=default_args,
     start_date=datetime.now() - timedelta(minutes=20),
     schedule_interval='@daily',
     render_template_as_native_obj=True)
def pipeline_etl():
    @task()
    def get_unprocessed_data():
        pg_hook = PostgresHook(
            postgres_conn_id='some_connection',
            schema='public'
        )
        pg_conn = pg_hook.get_conn()
        cursor = pg_conn.cursor()
        # get max id from source table.
        cursor.execute("SELECT MAX(id) FROM public.source;")
        max_id = cursor.fetchone()[0]
        # get max id from last run.
        cursor.execute("SELECT MAX(id) FROM public.airflow_pipeline;")
        processed_max_id = cursor.fetchone()[0]
        cursor.execute("select DISTINCT a,b,.... from x.... where "
                       "s.id >= %s", [processed_max_id])
        return cursor.fetchall()

    @task.branch(task_id="branching")
    def select_branch_pipeline(stream_list):
        if stream_list[1] == 'a' and stream_list[3] == 'x':
            return 'a_x'
        elif stream_list[1] == 'a' and stream_list[3] == 'y':
            return 'a_y'
        elif stream_list[1] == 'b' and stream_list[3] == 'x':
            return 'b_x'
        elif stream_list[1] == 'b' and stream_list[3] == 'y':
            return 'b_y'
        elif stream_list[1] == 'c' and stream_list[3] == 'x':
            return 'c_x'

    streams = get_unprocessed_data()
    for stream in streams:
        processed_branch = select_branch_pipeline(stream)
        run_this_first = EmptyOperator(task_id="run_this_first")
        join = EmptyOperator(task_id="join", trigger_rule="none_failed_min_one_success")
        t = EmptyOperator(task_id=processed_branch)
        empty_follow = EmptyOperator(task_id="follow_" + processed_branch)

        # Label is optional here, but it can help identify more complex branches
        run_this_first >> Label(processed_branch) >> t >> empty_follow >> join


# call taskflow
demo_taskflow = pipeline_etl()

I wanted to make dynamic pipeline based on the result from query.

0

There are 0 best solutions below