Airflow scheduled dag_run time changes if DAG is run manuallly

481 Views Asked by At

I'm using Airflow 2.2.5 built from the official docker image with a Postgres database. Let's say as an example I have a DAG set to run daily at 2am. As long as I only turn it on a let it run, the process works fine and the DAG runs daily at 2am as intended, but if for some reason I need to do a manual run, say at 11am, now on the next day (and all the following ones) the DAG will run automatically at 11am instead of the programmed 2am.

I'v tried using cron expressions instead of the datetime.timedelta for the schedule_interval and even though in the UI it appears the DAG will run at 2am (despite the 11am manual run), the DAG actually only runs at 11am contrary to the UI indication.

As anyone else noticed this behavior? And is there anything I can do to prevent the manual runs from interfering with the scheduled ones?

Thanks

1

There are 1 best solutions below

1
Hussein Awala On

Airflow is a data flow tool, and each run will give you some context variables to use them in your processing:

  • data_interval_start: it's the first date in the data you will process, it's equal to the previous run end date
  • data_interval_end: it's the last date in the data you will process

So if you are using these variables to filter the data you want to process, you cannot change the schedule_interval just for running a manual run, because if you use 0 11,12 * * * for example, you will have two runs per day:

  • the first one with data_interval_start=12h00 of the previous day and data_interval_end=11h00 of the current day (23 hours)
  • and the second after an hour with data_interval_start=11h00 of the current day and data_interval_end=12h00 of the current day (1 hour)

But if you have a dag which use the full data at each run, or a dag which do some tasks without using any data, you can trigger it manually by different ways:

  • UI: click on the bottom Trigger DAG in the dag page
  • CLI: use the command airflow dags trigger <dag_id> (doc)
  • API: POST api/v1/dags/{dag_id}/dagRuns(doc)
  • Airflow plugins: create a FlaskAppBuilder form to create the runs with a python method (complicated, you can use it only for some use cases)