I want to clear all task instances running at any given point in airflow. However my code fails when task is not running
def get_active_dag(hostname, username, password):
dag_ids = []
try:
payload = {"limit": 100, "offset": 0, "only_active": True, "paused": False}
response = requests.get("http://{hostname}/api/v1/dags".format(hostname=hostname), params=payload, auth=(username, password))
if response.status_code == 200:
out = json.loads(response.text)
dag_count = out["total_entries"]
for i in range(dag_count):
dag_ids.append(out["dags"][i]["dag_id"])
except requests.exceptions.ConnectionError as e:
logging.error("Failed to connect to HOST\n{}".format(hostname))
return dag_ids
def restart_tasks(hostname, username, password, dag_ids):
for dag_id in dag_ids:
data = {
"dry_run": False,
"only_running": True,
"only_failed": False,
}
response = requests.post(
"http://{hostname}/api/v1/dags/{dag_id}/clearTaskInstances"
.format(hostname=hostname, dag_id=dag_id), json=data, auth=(username, password))
data = response.json()
if response.status_code == 200:
logging.info("Cleared task for dag_id = {} & tasks={}".format(dag_id, data['task_instances']))
else:
logging.error("Failed \n{}".format(response.text))
exit(1)
Is there anyway to get only dag_ids for which task is running at given point?