fivethirtyeight DAG
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator
import os
import pathlib
import urllib.request
import zipfile
import shutil
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 7, 14),
'end_date': datetime(2021, 12, 5),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
#schedule_interval = "00 05 * * *"
BQ_CONN_ID = "my_gcp_conn"
dag = DAG(
'dag_for_testing',
default_args=default_args)#,
#schedule_interval=schedule_interval
#)
# -------------------- Python Functions -------------------- #
def where_am_i():
"""Print the current working directory
"""
print(os.getcwd())
return None
def download_repo():
"""Download the fivethirtyeight data repo as a zip file.
Saves the zip file in a folder called data
"""
pathlib.Path(os.getcwd(), "data").mkdir(parents=True, exist_ok=True)
url = "https://github.com/fivethirtyeight/data/archive/refs/heads/master.zip"
urllib.request.urlretrieve(url, pathlib.Path(os.getcwd(), "data", "master.zip"))
print(url)
return None
def unzip_repo():
"""Unzips a file called master.zip that is saved in
a data folder.
"""
zip_path = pathlib.Path(os.getcwd(), "data", "master.zip")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(pathlib.Path(os.getcwd(), "data"))
return None
def upload_dataframe_into_bq(pathlib_path_to_csv):
"""With a file path to a csv file loads the csv file
into a table in BigQuery. It uses the filename of the
csv file as the table name.
Args:
pathlib_path_to_csv (pathlib Path): [description]
"""
my_project = 'marquin-personal-tools'
my_dataset = 'fivethirthyeight_staging'
my_table = pathlib_path_to_csv.stem
# upload to BigQuery
client = bigquery.Client(project=my_project)
table_name = pathlib_path_to_csv.stem
table_ref = client.dataset(my_dataset).table(table_name)
# set job load configuration settings
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1 # ignore the header
job_config.autodetect = True
# delete the table if already exists
table_id = my_project + '.' + my_dataset + '.' + my_table
ic(table_id)
client.delete_table(table_id, not_found_ok=True)
# load the csv file into BQ table
with open(pathlib_path_to_csv, "rb") as source_file:
job = client.load_table_from_file(
source_file, table_ref, job_config=job_config
)
# job is async operation so we have to wait for it to finish
job.result()
return None
def load_csvs():
from google.cloud import bigquery
from icecream import ic
import os
import pathlib
# This is to get the directory that the program
# is currently running in.
#dir_path = os.path.dirname(os.path.realpath(__file__))
#dir_path = os.path.join(dir_path, "data")
# directory for testing
#dir_path = "/home/fuzzy/Documents/Projects/fivethirtyeight_to_BQ/test_data"
i = 0
for root, dirs, files in os.walk(pathlib.Path(os.getcwd(), "data")):
for file in files:
if i > 10:
return None
# change the extension from '.mp3' to
# the one of your choice.
if file.endswith('.csv'):
#ic(root)
#ic(dirs)
#ic(file)
temp_filepath = pathlib.Path(root, file)
ic(temp_filepath)
#temp_filepath.stem
try:
upload_dataframe_into_bq(temp_filepath)
i = i + 1
except:
print("#"*8)
print("#"*8)
print("Failed to load the following file to BQ:")
ic(temp_filepath)
print("#"*8)
print("#"*8)
return None
def clean_directory():
"""Deletes the data directory and its contents.
"""
directory_to_remove = pathlib.Path(os.getcwd(), "data")
shutil.rmtree(directory_to_remove)
return None
# -------------------- Construct DAGs -------------------- #
t1 = PythonOperator(
task_id='print_location',
python_callable=where_am_i,
dag=dag
)
t2 = PythonOperator(
task_id='get_the_data',
python_callable=download_repo,
dag=dag
)
t3 = PythonOperator(
task_id='unzip_the_data',
python_callable=unzip_repo,
dag=dag
)
t4 = BigQueryOperator(
task_id='bq_create_prod_version_of_drink',
sql='''
#standardSQL
SELECT *,
RANK() OVER (
ORDER BY beer_servings desc
) AS beer_rank,
RANK() OVER (
ORDER BY spirit_servings desc
) AS spirit_rank,
RANK() OVER (
ORDER BY wine_servings desc
) AS wine_rank,
beer_servings + spirit_servings + wine_servings as total_servings
FROM `marquin-personal-tools.fivethirthyeight_staging.drinks`
ORDER BY country asc
''',
destination_dataset_table='marquin-personal-tools.fivethirtyeight_prod.drinks_prod',
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
use_legacy_sql=False,
bigquery_conn_id=BQ_CONN_ID,
dag=dag
)
t5 = PythonVirtualenvOperator(
task_id='load_csvs_to_BQ',
python_callable=load_csvs,
requirements=["bigquery==0.0.15", "icecream==2.1.1"],
dag=dag
)
t6 = PythonOperator(
task_id='delete_downloaded_data',
python_callable=clean_directory,
dag=dag
)
# -------------------- Set up dependencies -------------------- #
t1 >> t2
t2 >> t3
t3 >> t5
t5 >> t4
t5 >> t6