I want to create a daily DAG that reads a file based on the date the DAG execution started. The file has list of folders to be processed for that day. The number of folders can change every day. If there a lot of folders the DAG can take multiple days to complete.
For each folder in the file for the day that the DAG execution started we have to perform few different DAG tasks (PythonOperator, BashOperator, etc). Even if the current date changes and the DAG takes more that 24 hours it should continue processing the list of folders obtained above.
The issue is if you have global python variables in the DAG then they get updated everytime the DAG is parsed. So If I use a variable like this:
DATE = datetime.now(tz).date()
The the value of the DATE variable changes when the date changes.
So I tried saving the date variable in a file with the name of the DAG's run_id so that the file name will be unique always and the run_id of the DAG is unique for each DAG run and always constant while the DAG is running.
However, the issue is getting the value and passing it across different task.s For this I was able to create a user_defined_macro and pass the date read from the file to all the tasks.
After this the next issue is how do we make the list of tasks to be performed for each date work. I tried using for loop, but obviously it doesn't work since the python code is parsed on DAG parsing and not when the DAG is executing. This is the last issue that I cannot figure out how to solve. Basically loop through every single folder for the particular date.
I'm pretty sure that you've reinvent default behaviour of Apache Airflow and Data Intervals. I guess everything related to deterministic dates could be achieved by access to
logical_date,data_interval_startordata_interval_end. You could choose one the approachesPlease note that options are not limited by listed above, there is also could be accessed into Templates