Skip to content

fivethirtyeight DAG

import json
from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator

import os
import pathlib
import urllib.request
import zipfile
import shutil

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,    
    'start_date': datetime(2021, 7, 14),
    'end_date': datetime(2021, 12, 5),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

#schedule_interval = "00 05 * * *"

BQ_CONN_ID = "my_gcp_conn"

dag = DAG(
    'dag_for_testing', 
    default_args=default_args)#, 
    #schedule_interval=schedule_interval
    #)

# -------------------- Python Functions -------------------- #

def where_am_i():
    """Print the current working directory
    """

    print(os.getcwd())

    return None

def download_repo():
    """Download the fivethirtyeight data repo as a zip file.
    Saves the zip file in a folder called data
    """

    pathlib.Path(os.getcwd(), "data").mkdir(parents=True, exist_ok=True)

    url = "https://github.com/fivethirtyeight/data/archive/refs/heads/master.zip"

    urllib.request.urlretrieve(url, pathlib.Path(os.getcwd(), "data", "master.zip"))

    print(url)

    return None

def unzip_repo():
    """Unzips a file called master.zip that is saved in 
    a data folder.

    """

    zip_path = pathlib.Path(os.getcwd(), "data", "master.zip")

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(pathlib.Path(os.getcwd(), "data"))

    return None

def upload_dataframe_into_bq(pathlib_path_to_csv):
    """With a file path to a csv file loads the csv file
    into a table in BigQuery. It uses the filename of the 
    csv file as the table name.

    Args:
        pathlib_path_to_csv (pathlib Path): [description]
    """


    my_project = 'marquin-personal-tools'
    my_dataset = 'fivethirthyeight_staging'
    my_table = pathlib_path_to_csv.stem


    # upload to BigQuery
    client = bigquery.Client(project=my_project)

    table_name = pathlib_path_to_csv.stem

    table_ref = client.dataset(my_dataset).table(table_name)


    # set job load configuration settings

    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1 # ignore the header
    job_config.autodetect = True

    # delete the table if already exists
    table_id = my_project + '.' + my_dataset + '.' + my_table
    ic(table_id)
    client.delete_table(table_id, not_found_ok=True)

    # load the csv file into BQ table
    with open(pathlib_path_to_csv, "rb") as source_file:
        job = client.load_table_from_file(
            source_file, table_ref, job_config=job_config
        )

    # job is async operation so we have to wait for it to finish
    job.result()

    return None


def load_csvs():

    from google.cloud import bigquery
    from icecream import ic
    import os
    import pathlib
    # This is to get the directory that the program 
    # is currently running in.
    #dir_path = os.path.dirname(os.path.realpath(__file__))
    #dir_path = os.path.join(dir_path, "data")

    # directory for testing
    #dir_path = "/home/fuzzy/Documents/Projects/fivethirtyeight_to_BQ/test_data"

    i = 0

    for root, dirs, files in os.walk(pathlib.Path(os.getcwd(), "data")):
        for file in files: 
            if i > 10:
                return None

            # change the extension from '.mp3' to 
            # the one of your choice.
            if file.endswith('.csv'):
                #ic(root)
                #ic(dirs)
                #ic(file)
                temp_filepath = pathlib.Path(root, file)
                ic(temp_filepath)

                #temp_filepath.stem
                try:
                    upload_dataframe_into_bq(temp_filepath)
                    i = i + 1
                except:
                    print("#"*8)
                    print("#"*8)
                    print("Failed to load the following file to BQ:")
                    ic(temp_filepath)
                    print("#"*8)
                    print("#"*8)

    return None


def clean_directory():
    """Deletes the data directory and its contents.
    """

    directory_to_remove = pathlib.Path(os.getcwd(), "data")
    shutil.rmtree(directory_to_remove)

    return None


# -------------------- Construct DAGs -------------------- #

t1 = PythonOperator(
    task_id='print_location',
    python_callable=where_am_i,
    dag=dag
)

t2 = PythonOperator(
    task_id='get_the_data',
    python_callable=download_repo,
    dag=dag
)

t3 = PythonOperator(
    task_id='unzip_the_data',
    python_callable=unzip_repo,
    dag=dag
)

t4 = BigQueryOperator(
        task_id='bq_create_prod_version_of_drink',    
        sql='''
        #standardSQL
        SELECT *,  
        RANK() OVER (
                ORDER BY beer_servings desc
            ) AS beer_rank,
        RANK() OVER (
                ORDER BY spirit_servings desc
            ) AS spirit_rank,
        RANK() OVER (
                ORDER BY wine_servings desc
            ) AS  wine_rank,
        beer_servings + spirit_servings + wine_servings as total_servings
        FROM `marquin-personal-tools.fivethirthyeight_staging.drinks` 
        ORDER BY country asc
        ''',
        destination_dataset_table='marquin-personal-tools.fivethirtyeight_prod.drinks_prod',    
        write_disposition='WRITE_TRUNCATE',
        allow_large_results=True,
        use_legacy_sql=False,
        bigquery_conn_id=BQ_CONN_ID,
        dag=dag
    )

t5 = PythonVirtualenvOperator(
    task_id='load_csvs_to_BQ',
    python_callable=load_csvs,
    requirements=["bigquery==0.0.15", "icecream==2.1.1"],
    dag=dag
)

t6 = PythonOperator(
    task_id='delete_downloaded_data',
    python_callable=clean_directory,
    dag=dag
)

# -------------------- Set up dependencies -------------------- #

t1 >> t2
t2 >> t3
t3 >> t5
t5 >> t4
t5 >> t6