This repo contains an Astronomer project with multiple examples showing how to dynamically generate DAGs in Airflow. A guide discussing these concepts in depth will be published shortly.
This repo contains DAGs and supporting Python scripts that dynamically generate DAGs using multiple methods. They are described here, organized by folder.
-
dynamic-dags-connections.py
Dynamically generates DAGs based on database connections and queries. It queries a database for active configurations, processes date parameters, and creates DAGs that transfer data to an object store like MinIO or S3. -
dynamic-dags-loop.py
Generates DAGs based on a simple range() loop. -
dynamic-dags-variable.py
Generates DAGs based on Airflow variables. -
dag_file_1.py
anddag_file_2.py
Actual DAG files that were dynamically generated using scripts in theinclude/
directory, described below.
-
dag-config/
Contains two JSON configuration files with parameters used to dynamically generate Python files fordag_file_1.py
anddag_file_2.py
. -
dag-template.py
Contains the starting DAG template from which other DAG files are dynamically generated. -
generate-dag-files.py
Contains a script to dynamically generate a DAG file for each config file indag-config/
by making a copy ofdag-template.py
and replacing key parameters from the config file.
-
Dockerfile
Defines the custom Airflow image with additional dependencies installed. -
requirements.txt
Lists Python dependencies, including theairflow-clickhouse-plugin
and MinIO-related dependencies. -
.astro/config.yaml
Contains the Astronomer project configuration.
The easiest way to run these example DAGs is to use the Astronomer CLI to get an Airflow instance up and running locally:
- Install the Astronomer CLI.
- Clone this repo somewhere locally and navigate to it in your terminal.
- Initialize an Astronomer project by running:
astro dev init
- Start Airflow locally by running:
astro dev start
- Navigate to
localhost:8080
in your browser, and you should see the tutorial DAGs there.
To dynamically generate DAGs using the generate-dag-files.py
script:
- Place your configuration files in the
include/dag-config/
directory. - Run the script:
python include/generate-dag-files.py
- The generated DAG files will appear in the
dags/
directory.
This project includes support for MinIO as an object store for data pipelines. The following dependencies and configurations are required:
- The
airflow.providers.amazon
package is used for MinIO integration. - Ensure the
boto3
library is installed for interacting with MinIO.
MINIO_BUCKET
: The name of the MinIO bucket where data will be stored.ENV
: The environment for the tool.
- Update the Airflow connections (
aws_default
) to include MinIO credentials (e.g., access key, secret key, endpoint URL). - Ensure the
dynamic-dags-connections.py
script is configured to use theMINIO_BUCKET
andENV
variables.
This setup allows seamless integration with MinIO for storing and managing data generated by the dynamically created DAGs.
The dynamic-dags-connections.py
script is a powerful tool for creating DAGs dynamically based on database connections and queries. Here's how it works:
-
Database Query
It queries a database table for active configurations, including table names, SQL queries, and data sources. -
Dynamic DAG Creation
For each configuration, it creates a DAG with tasks to process date parameters, execute queries, and transfer data to an object store like S3 or MinIO. -
Custom Operators
It uses operators likeClickhouseToS3Operator
andSqlToS3Operator
to handle data extraction and transfer. -
Date Processing
It processes date parameters dynamically in the IST timezone to ensure accurate data extraction. -
Scalability
This approach allows you to manage multiple data pipelines efficiently by simply updating the database configurations.
This script is ideal for scenarios where data pipelines need to be created or updated frequently based on changing requirements.