Have been doing an upgrade from Airflow 2.2.2 to 2.8.1. All of my DAGs parse after doing relatively minor modifications, except for one.
Using the TaskFlow API, I have a task with this structure:
@task(multiple_outputs=True)
def decorated_task(
inputs: list[dict],
logical_date: str,
project_id: str,
bucket_name: str,
) -> dict[str, str]:
# Implementation details
return {}
Being called like this in the DAG:
@dag(
dag_id=dag_id,
# ...
default_args={
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(hours=1),
},
)
def decorated_dag():
# ...
task_date = "{{ (dag_run.logical_date - macros.timedelta(days=1)) | ds_nodash }}"
# ...
decorated_task_result = decorated_task(
inputs=inputs,
logical_date=task_date,
project_id=PROJECT_ID,
bucket_name=BUCKET_NAME,
)
I'm getting this error in the scheduler:
ValueError: non-default argument follows default argument
with trace:
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/dag.py", line 3944, in factory
f(**f_kwargs)
File "/usr/local/airflow/dags/dag_file.py", line 160, in decorated_dag
decorated_task_result = decorated_task(
^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 366, in __call__
op = self.operator_class(
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/python.py", line 52, in __init__
super().__init__(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 437, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/decorators/base.py", line 217, in __init__
signature = signature.replace(parameters=parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/inspect.py", line 3052, in replace
return type(self)(parameters,
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/inspect.py", line 3008, in __init__
raise ValueError(msg)
ValueError: non-default argument follows default argument
It's clearly something internal to Airflow as suggested by the stack trace. I couldn't find anything similar on the Internet nor the GitHub Issues of Airflow.
As the stack trace says, the error comes from this code in the
DecoratedOperatorclass indecorators/base.py:The
KNOWN_CONTEXT_KEYSvariable is a long set that represents the Airflow context variables:The
logical_datecontext variable is used in our task function signature. From here we can deduce that putting that parameter name at the end of thedecorated_taskfunction parameter list fixes the issue.Update
I made a PR to Airflow to improve the error message on these cases so hopefully it's clearer starting a later minor version of 2.8, or 2.9.
https://github.com/apache/airflow/pull/38015