8000 GitHub - amolsr/dynamic-dags
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

amolsr/dynamic-dags

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

dynamic-dags-tutorial

This repo contains an Astronomer project with multiple examples showing how to dynamically generate DAGs in Airflow. A guide discussing these concepts in depth will be published shortly.

DAG Overview

This repo contains DAGs and supporting Python scripts that dynamically generate DAGs using multiple methods. They are described here, organized by folder.

dags

  • dynamic-dags-connections.py
    Dynamically generates DAGs based on database connections and queries. It queries a database for active configurations, processes date parameters, and creates DAGs that transfer data to an object store like MinIO or S3.

  • dynamic-dags-loop.py
    Generates DAGs based on a simple range() loop.

  • dynamic-dags-variable.py
    Generates DAGs based on Airflow variables.

  • dag_file_1.py and dag_file_2.py
    Actual DAG files that were dynamically generated using scripts in the include/ directory, described below.

include

  • dag-config/
    Contains two JSON configuration files with parameters used to dynamically generate Python files for dag_file_1.py and dag_file_2.py.

  • dag-template.py
    Contains the starting DAG template from which other DAG files are dynamically generated.

  • generate-dag-files.py
    Contains a script to dynamically generate a DAG file for each config file in dag-config/ by making a copy of dag-template.py and replacing key parameters from the config file.

Other Files

  • Dockerfile
    Defines the custom Airflow image with additional dependencies installed.

  • requirements.txt
    Lists Python dependencies, including the airflow-clickhouse-plugin and MinIO-related dependencies.

  • .astro/config.yaml
    Contains the Astronomer project configuration.

Getting Started

The easiest way to run these example DAGs is to use the Astronomer CLI to get an Airflow instance up and running locally:

  1. Install the Astronomer CLI.
  2. Clone this repo somewhere locally and navigate to it in your terminal.
  3. Initialize an Astronomer project by running:
    astro dev init
  4. Start Airflow locally by running:
    astro dev start
  5. Navigate to localhost:8080 in your browser, and you should see the tutorial DAGs there.

Dynamically Generating DAGs

To dynamically generate DAGs using the generate-dag-files.py script:

  1. Place your configuration files in the include/dag-config/ directory.
  2. Run the script:
    python include/generate-dag-files.py
  3. The generated DAG files will appear in the dags/ directory.

MinIO Dependencies and Requirements

This project includes support for MinIO as an object store for data pipelines. The following dependencies and configurations are required:

Dependencies

  • The airflow.providers.amazon package is used for MinIO integration.
  • Ensure the boto3 library is installed for interacting with MinIO.

Environment Variables

  • MINIO_BUCKET: The name of the MinIO bucket where data will be stored.
  • ENV: The environment for the tool.

Configuration

  • Update the Airflow connections (aws_default) to include MinIO credentials (e.g., access key, secret key, endpoint URL).
  • Ensure the dynamic-dags-connections.py script is configured to use the MINIO_BUCKET and ENV variables.

This setup allows seamless integration with MinIO for storing and managing data generated by the dynamically created DAGs.

Detailed Functionality of dynamic-dags-connections.py

The dynamic-dags-connections.py script is a powerful tool for creating DAGs dynamically based on database connections and queries. Here's how it works:

  1. Database Query
    It queries a database table for active configurations, including table names, SQL queries, and data sources.

  2. Dynamic DAG Creation
    For each configuration, it creates a DAG with tasks to process date parameters, execute queries, and transfer data to an object store like S3 or MinIO.

  3. Custom Operators
    It uses operators like ClickhouseToS3Operator and SqlToS3Operator to handle data extraction and transfer.

  4. Date Processing
    It processes date parameters dynamically in the IST timezone to ensure accurate data extraction.

  5. Scalability
    This approach allows you to manage multiple data pipelines efficiently by simply updating the database configurations.

This script is ideal for scenarios where data pipelines need to be created or updated frequently based on changing requirements.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 96.4%
  • Dockerfile 3.6%
0