I'd like to automatically set inlets and outlets parameters in executable function inside PythonOperator.
But, it seems to me that it doesn't work while it should. You can find the code snippet below:
from datahub_provider import entities
def executable_func(**kwargs):
task = kwargs.get("task")
task.inlets = [entities.Dataset(source, data_path) ...]
task.outlets = [entitites.Dataset(source, data_path) ...]
...
dag_task = PythonOperator(
task_id="task_id",
python_callable=executable_func,
provide_context=True,
dag=dag,
)
I also tried the following way:
def executable_func(**kwargs):
task = kwargs.get("ti")
task.inlets = [entities.Dataset(source, data_path) ...]
task.outlets = [entitites.Dataset(source, data_path) ...]
...
...
Is there a way to do what I want with the standard Airflow installation?
By the way, we use the Airflow 2.1.2
I've been experiencing the same challenge since we would like to generate datasets dynamically and it looks like the current system doesn't support it out-of-the-box (see Dag code where it removes the dataset in case of not being declared as an outlet or inlet in the task/Dag dependencies).
However, it looks like it works for us using the following code:
Then, we tested this approach having a dummy DAG, waiting for the following datasets:
We could see that
potato-1&potato-4always disappear from the Datasets URL section since they are not explicitly referenced, butpotato-2&potato-3keeps increasing.Note: Airflow has some current issues & open PR trying to address external datasets, so I hope this post helps you as a workaround while the community keeps working in this direction.
Note2: We are using Airflow 2.6.1 and while I'm not proud of our solution, it looks like it works