airflow prev_execution_date but with PythonOperator

129 Views Asked by At

I'm looking to reproduce something similar to this command, but using PythonOperator instead. What I struggle with: call execution_date, prev_execution_date but using a PythonOperator. How should it be inserted in op_kwargs?

dag = DAG(
    dag_id="06_templated_query",
    schedule_interval="@daily",
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
)

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json "
        "http://events_api:5000/events?"
        "start_date={{execution_date.strftime('%Y-%m-%d')}}&"
        "end_date={{next_execution_date.strftime('%Y-%m-%d')}}"
    ),
    dag=dag,
)
1

There are 1 best solutions below

0
Hussein Awala On BEST ANSWER

You can access the jinja templates from the python function, but it's better by providing the variables as arguments, to simplify your unit tests.

from airflow.operators.python import PythonOperator
from pathlib import Path
import requests

def fetch_events(start_date, end_date):
    events_file_path = Path("/data/events.json")
    events_file_path.mkdir(parents=True, exist_ok=True)
    response = requests.get(
        url="http://events_api:5000/events",
        params={
            "start_date": start_date,
            "end_date": end_date,
        }
    )
    with open(events_file_path, "w") as file:
        file.write(response.json())

fetch_events = PythonOperator(
    task_id="fetch_events",
    python_callable=fetch_events,
    dag=dag,
    op_kwargs={
        "start_date": "{{execution_date.strftime('%Y-%m-%d')}}",
        "end_date": "{{next_execution_date.strftime('%Y-%m-%d')}}"
    }
)