How can I access XCom values in traditional operator that were defined in TaskFlow function?

33 Views Asked by At

I am building a dictionary of configuration parameters to pass on to another DAG via TriggerDagRunOperator, in a Taskflow decorated function I am building and returning said dictionary. I need to access a key/value pair in the dictionary to fill in the field: trigger_dag_id in the TriggerDagRunOperator. Or, I can return multiple outputs from the function that is building the dictionary. I am having trouble coming up with a viable solution to access the trigger_dag_id XCom values to be used in TriggerDagRunOperator trigger_dag_id field.

# Assume the values in params are being passed in via conf
@task
def build_trigger_dag_config(**kwargs):
    # insert values into config
    config = {}
    config['stuff'] = kwargs['params']['stuff']
    config['trigger_dag_id'] = kwargs['params']['dag_run_id']

    # Option #1
    return config
    # Option #2
    return {'trigger_dag_config': config, 'trigger_dag_id': config['trigger_dag_id']}

trigger_dag_id_test = build_trigger_dag_config()

trigger_report_task = TriggerDagRunOperator(
    task_id="trigger_report_task",
    trigger_dag_id=<How do I access trigger_dag_id from the previous function?>
    conf=valid_input_params,
)

Update

After much trial and error, it turns out there is a bug in the TriggerDagRunOperator. I will post my work around ASAP.

0

There are 0 best solutions below