The Apache Airflow DAG executed successfully, yet it continues to appear as active in the Flower UI and remains in memory

56 Views Asked by At

I've recently begun using Apache Airflow version 2.7.3, along with Apache Airflow Providers Celery version 3.5.1 and Celery version 5.3.6.

While my Directed Acyclic Graph (DAG) executed successfully, I encountered an issue where tasks remained active in the Celery Flower UI and in memory even after completion. This persistent activity prevents the worker from picking up new tasks, as it reaches its concurrency limit. enter image description here I've attached an image below illustrating the situation. Could you please assist me in resolving this issue? Thank you.

I attempted to address the issue by both downgrading and upgrading the versions of Apache Airflow Providers Celery and Celery, but unfortunately, neither solution resolved the problem.

Every time I need to clear the UI and memory by restarting the celery worker.

from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago
from airflow.exceptions import AirflowSkipException

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'email': '[email protected]',
    'retries': 1,
    'retry_delay': timedelta(seconds=60)
}

@dag(schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, tags=['A5_Pipeline'], default_args=default_args)
def A5_Pipeline(**kwargs):
    @task
    def A5_Pipeline_Combined():
        # Get current context
        context = get_current_context()
        context = context['params']

        # Cleanup
        cleanup_df, cleanup_output = cleanUp_main_func(context)

        # Link Extraction
        link_extr_df, link_extraction_output = linkExtraction(context)

        # Deduplication
        dedupe_df, deduplication_output = deDuplication(link_extr_df, link_extraction_output, cleanup_output)

        # Content Extraction Decision Process
        cedp_df, content_extraction_decision_process_output = contentExtractionDecisionProcess(cleanup_df, dedupe_df, deduplication_output)

        # Content Extraction
        content_extraction_df, content_extraction_output = contentExtraction(cedp_df, content_extraction_decision_process_output)

        # Content Cleaning
        cleaning_df, content_cleaning_output = contentCleaning(content_extraction_df, content_extraction_output)

        # Relevance Entity
        rel_entity_df, relevance_entity_output = relevance_entity(cleaning_df, content_cleaning_output)

        # Relevance Event
        rel_event_df, relevance_event_output = relevance_event(rel_entity_df, relevance_entity_output)

        # Relevance Aggregate
        rel_aggregation_df, relevance_aggregate_output = relevance_aggregate(rel_event_df, relevance_event_output)


        # Summary Generation
        summary_df, summary_output = summaryProcess(rel_aggregation_df, relevance_aggregate_output)

        # Source Reputation
        source_df, sourcerep_output = sourceRepProcess(rel_aggregation_df, relevance_aggregate_output)

        # NER Tagging
        ner_df, ner_output = nerprocess(rel_aggregation_df, relevance_aggregate_output)

        # Result Merger
        result_merge_df, result_merger_output = resultmerge(source_df, summary_df,ner_df, sourcerep_output, summary_output, ner_output)

        # Grouping
        grouping_output = groupingcomponent(result_merge_df, result_merger_output)

        return grouping_output

    A5_Pipeline_Combined()

dag = A5_Pipeline()
0

There are 0 best solutions below