Airflow 2.3 - Changing a task instance state as failed without raising

649 Views Asked by At

Using task flow, let's say I have:

from airflow.utils.state import State
from airflow.operators.python import get_current_context

@dag(
    schedule_interval=None,
    start_date=datetime(2021, 1, 1)
)
def myDag():
    @task()
    def getData():
        try:
            result = something_that_might_fail()
            return result
        except HorribleException as error:
            context = get_current_context()
            context['task_instance'] = State.FAILED # Doesn't work
            return {"error": str(error)}

    @task()
    def transform(data_dict: dict):
        for data in data_dict:
            print(data)
        ...


    transform(getData())

run = myDag()

For monitoring purposes I want to mark the getData task failed but I still want to pass a result to transform. I tried to use the context to get access to the task instance state but it doesn't seem to work. It seems there should be a better approach, but I don't see it.

1

There are 1 best solutions below

1
ozs On BEST ANSWER

You can change the status of the task, but not part of the task itself but in the other task. I guess that once you exit the "getdata" successfully then its state eventually is "success".

You can send info to "transform" task to change "getdata" and do it as follow :

@task()
def transform(data_dict: dict):
    context = get_current_context()
    dag_run: DagRun = context['dag_run']
    dag_run.dag.set_task_instance_state(
        task_id="getdata", state=TaskInstanceState.FAILED, run_id=dag_run.run_id,
    ) 

enter image description here