I have my airflow dag, the tasks are constantly toggling between success and removed and vice versa.
I am not sure why the task state is going from success to removed state.
My dag code is:
from airflow import DAG
import datetime
from datetime import timedelta
from tasks.user_space_tables_refresh import UserSpaceTablesRefresh
default_args = {
'owner': 'Data Engineering',
'depends_on_past': False,
'start_date': datetime.datetime(2022, 2, 1),
}
user_space_dag = DAG(
'user_space_snowflake_tables_refresh', default_args=default_args,
schedule_interval="30 18 * * *", catchup=False)
with user_space_dag:
users_task = UserSpaceTablesRefresh(
task_id='ingest_USERS_data',
source_table='USERS')
saved_software_task = UserSpaceTablesRefresh(
task_id='ingest_SAVED_SOFTWARE_data',
source_table='SAVED_SOFTWARE')
tasks.user_space_tables_refresh file:
class UserSpaceTablesRefresh(BaseOperator):
@apply_defaults
def __init__(self, source_table, *args, **kwargs):
super().__init__(*args, **kwargs)
self.table = source_table
def execute(self, context):
try:
sf_table = self.table
...
except Exception as ex:
print("Exception")
The task is marked as
removedwhen it disappears from the DAG since the run started.This can happen when you create tasks dynamically:
If the run history is not important for, try to remove the dag completely from the Metastore, and let airflow dag file processor re-create it, maybe it's a problem with different version of serialized dags or a problem which appear after an Airflow upgrade.
This command will delete the dag from the Metastore and all the dag runs and tasks information.