I've recently begun using Apache Airflow version 2.7.3, along with Apache Airflow Providers Celery version 3.5.1 and Celery version 5.3.6.
While my Directed Acyclic Graph (DAG) executed successfully, I encountered an issue where tasks remained active in the Celery Flower UI and in memory even after completion. This persistent activity prevents the worker from picking up new tasks, as it reaches its concurrency limit.
I've attached an image below illustrating the situation. Could you please assist me in resolving this issue? Thank you.
I attempted to address the issue by both downgrading and upgrading the versions of Apache Airflow Providers Celery and Celery, but unfortunately, neither solution resolved the problem.
Every time I need to clear the UI and memory by restarting the celery worker.
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago
from airflow.exceptions import AirflowSkipException
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'email': '[email protected]',
'retries': 1,
'retry_delay': timedelta(seconds=60)
}
@dag(schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, tags=['A5_Pipeline'], default_args=default_args)
def A5_Pipeline(**kwargs):
@task
def A5_Pipeline_Combined():
# Get current context
context = get_current_context()
context = context['params']
# Cleanup
cleanup_df, cleanup_output = cleanUp_main_func(context)
# Link Extraction
link_extr_df, link_extraction_output = linkExtraction(context)
# Deduplication
dedupe_df, deduplication_output = deDuplication(link_extr_df, link_extraction_output, cleanup_output)
# Content Extraction Decision Process
cedp_df, content_extraction_decision_process_output = contentExtractionDecisionProcess(cleanup_df, dedupe_df, deduplication_output)
# Content Extraction
content_extraction_df, content_extraction_output = contentExtraction(cedp_df, content_extraction_decision_process_output)
# Content Cleaning
cleaning_df, content_cleaning_output = contentCleaning(content_extraction_df, content_extraction_output)
# Relevance Entity
rel_entity_df, relevance_entity_output = relevance_entity(cleaning_df, content_cleaning_output)
# Relevance Event
rel_event_df, relevance_event_output = relevance_event(rel_entity_df, relevance_entity_output)
# Relevance Aggregate
rel_aggregation_df, relevance_aggregate_output = relevance_aggregate(rel_event_df, relevance_event_output)
# Summary Generation
summary_df, summary_output = summaryProcess(rel_aggregation_df, relevance_aggregate_output)
# Source Reputation
source_df, sourcerep_output = sourceRepProcess(rel_aggregation_df, relevance_aggregate_output)
# NER Tagging
ner_df, ner_output = nerprocess(rel_aggregation_df, relevance_aggregate_output)
# Result Merger
result_merge_df, result_merger_output = resultmerge(source_df, summary_df,ner_df, sourcerep_output, summary_output, ner_output)
# Grouping
grouping_output = groupingcomponent(result_merge_df, result_merger_output)
return grouping_output
A5_Pipeline_Combined()
dag = A5_Pipeline()