Airflow “Trigger Dags” Python Script

You have some dag that runs multiple times a day but you need to do a manual backfill of last 30 days.

It’s 2022 and this is still surprisingly painful with Airflow. The “new” REST API helps and mean’s all the building blocks are there but, as I found out today, there can often still be some faffing about left for you to do.

So here is a little Python script to just loop over a range of days and kick of a dag run for each day.

You would run it like this:

python airflow_trigger_dags.py --dag 'my_beautiful_dag' --start '2022-06-01 00:00:01' --end '2022-07-31 00:00:01'

For the dag you pass, it will loop over each day and kick off a dag run for the same timestamp you define. So you can just increment the 00:00:01 part to 00:00:02 if you need to rerun the same backfill again for some reason (like you messed up your “fix” the first time around 🙂 ). This assumes your dags are just using typical params like “ds” etc and so only need the execution_date to run properly. If your dag is more complex and depends of specific start and end times then this approach might not work or may need to be extended a little.

#!/usr/bin/python
"""
A python script to run many "airflow run dags trigger {dag} -e {execution_datetime}" commands via the airflow rest api.
Example usage:
python airflow_trigger_dags.py –dag 'dev_dag' –start '2021-10-01 00:00:01' –end '2021-10-31 00:00:01'
python airflow_trigger_dags.py -d 'dev_dag' -s '2022-05-20 00:00:01' -e '2022-05-24 00:00:01'
Example usage to just trigger dag for now:
python airflow_trigger_dags.py -d 'dev_dag' -s 'now'
"""
import argparse
from datetime import datetime, timedelta
import os
import requests
from dotenv import load_dotenv
from pprint import pprint
from requests.auth import HTTPBasicAuth
load_dotenv()
parser = argparse.ArgumentParser()
parser.add_argument("-d", "–dag", help="name of dag to run.", type=str)
parser.add_argument("-s", "–start", help="start date.", type=str)
parser.add_argument("-e", "–end", help="end date.", type=str)
args = parser.parse_args()
def main():
# get airflow host,user,pwd from ENV variables
airflow_host = os.getenv("AIRFLOW_HOST")
airflow_username = os.getenv("AIRFLOW_USERNAME")
airflow_password = os.getenv("AIRFLOW_PASSWORD")
# handle args
dag = args.dag
if args.start == 'now':
start = datetime.now() timedelta(hours=1)
end = start + timedelta(days=1)
else:
start = datetime.strptime(args.start, '%Y-%m-%d %H:%M:%S')
end = datetime.strptime(args.end, '%Y-%m-%d %H:%M:%S')
days = (end start).days
print(dag, start, end, days)
# trigger dag for each day
for i in range(days):
execution_datetime = (start + timedelta(days=i)).strftime('%Y-%m-%dT%H:%M:%SZ')
print(f'dag={dag}, e={execution_datetime}')
url = f'http://{airflow_host}/api/v1/dags/{dag}/dagRuns'
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
data = {
'dag_run_id': f'manual_api_{execution_datetime}',
'logical_date': execution_datetime,
}
result = requests.post(
url,
json=data,
headers=headers,
auth=HTTPBasicAuth(airflow_username, airflow_password)
)
result_json = result.json()
pprint(result_json)
if __name__ == "__main__":
main()

Leave a Reply