Can I pull XCOM value to ssh_conn_id in SSHOperator in Airflow

54 Views Asked by At

Im Trying to dynimically assign the ssh_conn_id using the xcom value I pushed in a previous task. Here is my code.

def get_config_params(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='myServer1', value='SSH_CONN_1')
    ti.xcom_push(key='myValue', value='Hello World')

with DAG(
    dag_id=DAG_ID, 
    catchup=False,
    schedule_interval=None,
    start_date=DAG_START_DATE,
    default_args={'owner':DAG_OWNER}
) as dag:

    get_param_task  = PythonOperator(
        task_id='get_param_task',
        python_callable=get_config_params,
        provide_context=True
    )
    cli_command = SSHOperator(
        task_id="cli_command",
        ssh_conn_id='{{ ti.xcom_pull(key="myServer1") }}',
        command='echo {{ ti.xcom_pull(key="myValue") }}'
    )  

But i'm getting the following error:

SSH operator error: The conn_id `{{ ti.xcom_pull(key="myServer1") }}` isn't defined 

XCOM pull for the command in the task is working fine, but its not working for the ssh_conn_id. Is there any workaround for this?

I Also tried pulling with task id as well but still facing the same issue.

 cli_command2 = SSHOperator(
        task_id="cli_command2",
        ssh_conn_id='{{ ti.xcom_pull(task_ids="get_param_task",key="myServer1") }}',
        command='echo "Hello world"'
    )  

Still getting the error:

SSH operator error: The conn_id `{{ ti.xcom_pull(task_ids="get_param_task",key="myServer1") }}` isn't defined
1

There are 1 best solutions below

0
Madhawa Manchanayake On

I was able to resolve this by making the ssh_conn_id a template field using the following code:

class SSHOperator(SSHOperator):
template_fields = (
    "ssh_conn_id",
) + SSHOperator.template_fields