Skip to content

Apache Airflow

Podcast episode on Talk python to me

Podcast episode - Apache Airflow Open-Source Workflow with Python

Notes

Cron is unable to handle complexities in scheduling and dependencies.

Luigi was used, but relied heavily on XML.

Airflow is an orchestration tool. Airflow Website

Airflow doesn't perform tasks, it tells other services to perform tasks.

Having an workflow orchestration tool, allows for teams to collaborate on data pipelines.

The components of Airflow are as follows:

  • Sensor - waiting to see is an object in S3
  • Transfer - moving something from A to B
  • Operator - anything in a service eg. starting a kubernetes cluster

[link to Airflow operators or transfer or sensor]

Airflow operators

Airflow sensors

What is something goes wrong? Each task within Airflow should be idempotent. If run multiple times should produce the same result. Within Airflow there is the option to set the number of retries (eg 4).

There are on failure callbacks. sending email on failure letting you know.

airflow principles

A certain number of base operators come when airflow in pip installed.

Other integrations eg google big query are pip installable - seperate from the base installation.

It is possible to build your own providers.

What if I want airflow?

Hosted solutions:

  • Google Cloud Composer
  • AWS MWAA (Managed Workflows for Apache Airflow) -

There is a docker image for airflow. Docker compose is the quickstart.

Helm chart is recommended for production Airflow: link

Airflow community summit: link

Apache ::: Value community over code.

If the community is not healthy the code will not do well or last.

In Airflow 2.0 there is significant improvement to the dashboard UI.

Apache Airflow YouTube channel

Everything is python in Airflow. It is not possible to edit the DAGs using the user interface.

Example DAG


import json
from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator

import os
import pathlib
import urllib.request
import zipfile
import shutil

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,    
    'start_date': datetime(2021, 7, 14),
    'end_date': datetime(2021, 12, 5),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

#schedule_interval = "00 05 * * *"

BQ_CONN_ID = "my_gcp_conn"

dag = DAG(
    'dag_for_testing', 
    default_args=default_args)#, 
    #schedule_interval=schedule_interval
    #)

# -------------------- Python Functions -------------------- #

def where_am_i():
    """Print the current working directory
    """

    print(os.getcwd())

    return None

def download_repo():
    """Download the fivethirtyeight data repo as a zip file.
    Saves the zip file in a folder called data
    """

    pathlib.Path(os.getcwd(), "data").mkdir(parents=True, exist_ok=True)

    url = "https://github.com/fivethirtyeight/data/archive/refs/heads/master.zip"

    urllib.request.urlretrieve(url, pathlib.Path(os.getcwd(), "data", "master.zip"))

    print(url)

    return None

def unzip_repo():
    """Unzips a file called master.zip that is saved in 
    a data folder.

    """

    zip_path = pathlib.Path(os.getcwd(), "data", "master.zip")

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(pathlib.Path(os.getcwd(), "data"))

    return None

def upload_dataframe_into_bq(pathlib_path_to_csv):
    """With a file path to a csv file loads the csv file
    into a table in BigQuery. It uses the filename of the 
    csv file as the table name.

    Args:
        pathlib_path_to_csv (pathlib Path): [description]
    """


    my_project = 'marquin-personal-tools'
    my_dataset = 'fivethirthyeight_staging'
    my_table = pathlib_path_to_csv.stem


    # upload to BigQuery
    client = bigquery.Client(project=my_project)

    table_name = pathlib_path_to_csv.stem

    table_ref = client.dataset(my_dataset).table(table_name)


    # set job load configuration settings

    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1 # ignore the header
    job_config.autodetect = True

    # delete the table if already exists
    table_id = my_project + '.' + my_dataset + '.' + my_table
    ic(table_id)
    client.delete_table(table_id, not_found_ok=True)

    # load the csv file into BQ table
    with open(pathlib_path_to_csv, "rb") as source_file:
        job = client.load_table_from_file(
            source_file, table_ref, job_config=job_config
        )

    # job is async operation so we have to wait for it to finish
    job.result()

    return None


def load_csvs():

    from google.cloud import bigquery
    from icecream import ic
    import os
    import pathlib
    # This is to get the directory that the program 
    # is currently running in.
    #dir_path = os.path.dirname(os.path.realpath(__file__))
    #dir_path = os.path.join(dir_path, "data")

    # directory for testing
    #dir_path = "/home/fuzzy/Documents/Projects/fivethirtyeight_to_BQ/test_data"

    i = 0

    for root, dirs, files in os.walk(pathlib.Path(os.getcwd(), "data")):
        for file in files: 
            if i > 10:
                return None

            # change the extension from '.mp3' to 
            # the one of your choice.
            if file.endswith('.csv'):
                #ic(root)
                #ic(dirs)
                #ic(file)
                temp_filepath = pathlib.Path(root, file)
                ic(temp_filepath)

                #temp_filepath.stem
                try:
                    upload_dataframe_into_bq(temp_filepath)
                    i = i + 1
                except:
                    print("#"*8)
                    print("#"*8)
                    print("Failed to load the following file to BQ:")
                    ic(temp_filepath)
                    print("#"*8)
                    print("#"*8)

    return None


def clean_directory():
    """Deletes the data directory and its contents.
    """

    directory_to_remove = pathlib.Path(os.getcwd(), "data")
    shutil.rmtree(directory_to_remove)

    return None


# -------------------- Construct DAGs -------------------- #

t1 = PythonOperator(
    task_id='print_location',
    python_callable=where_am_i,
    dag=dag
)

t2 = PythonOperator(
    task_id='get_the_data',
    python_callable=download_repo,
    dag=dag
)

t3 = PythonOperator(
    task_id='unzip_the_data',
    python_callable=unzip_repo,
    dag=dag
)

t4 = BigQueryOperator(
        task_id='bq_create_prod_version_of_drink',    
        sql='''
        #standardSQL
        SELECT *,  
        RANK() OVER (
                ORDER BY beer_servings desc
            ) AS beer_rank,
        RANK() OVER (
                ORDER BY spirit_servings desc
            ) AS spirit_rank,
        RANK() OVER (
                ORDER BY wine_servings desc
            ) AS  wine_rank,
        beer_servings + spirit_servings + wine_servings as total_servings
        FROM `marquin-personal-tools.fivethirthyeight_staging.drinks` 
        ORDER BY country asc
        ''',
        destination_dataset_table='marquin-personal-tools.fivethirtyeight_prod.drinks_prod',    
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        use_legacy_sql=False,
        bigquery_conn_id=BQ_CONN_ID,
        dag=dag
    )

t5 = PythonVirtualenvOperator(
    task_id='load_csvs_to_BQ',
    python_callable=load_csvs,
    requirements=["bigquery==0.0.15", "icecream==2.1.1"],
    dag=dag
)

t6 = PythonOperator(
    task_id='delete_downloaded_data',
    python_callable=clean_directory,
    dag=dag
)

# -------------------- Set up dependencies -------------------- #

t1 >> t2
t2 >> t3
t3 >> t5
t5 >> t4
t5 >> t6