Apache Airflow
Podcast episode on Talk python to me
Podcast episode - Apache Airflow Open-Source Workflow with Python
Notes
Cron is unable to handle complexities in scheduling and dependencies.
Luigi was used, but relied heavily on XML.
Airflow is an orchestration tool. Airflow Website
Airflow doesn't perform tasks, it tells other services to perform tasks.
Having an workflow orchestration tool, allows for teams to collaborate on data pipelines.
The components of Airflow are as follows:
- Sensor - waiting to see is an object in S3
- Transfer - moving something from A to B
- Operator - anything in a service eg. starting a kubernetes cluster
[link to Airflow operators or transfer or sensor]
What is something goes wrong? Each task within Airflow should be idempotent. If run multiple times should produce the same result. Within Airflow there is the option to set the number of retries (eg 4).
There are on failure callbacks. sending email on failure letting you know.
A certain number of base operators come when airflow in pip installed.
Other integrations eg google big query are pip installable - seperate from the base installation.
It is possible to build your own providers.
What if I want airflow?
Hosted solutions:
- Google Cloud Composer
- AWS MWAA (Managed Workflows for Apache Airflow) -
There is a docker image for airflow. Docker compose is the quickstart.
Helm chart is recommended for production Airflow: link
Airflow community summit: link
Apache ::: Value community over code.
If the community is not healthy the code will not do well or last.
In Airflow 2.0 there is significant improvement to the dashboard UI.
Apache Airflow YouTube channel
Everything is python in Airflow. It is not possible to edit the DAGs using the user interface.
Example DAG
import json
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator
import os
import pathlib
import urllib.request
import zipfile
import shutil
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 7, 14),
'end_date': datetime(2021, 12, 5),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
#schedule_interval = "00 05 * * *"
BQ_CONN_ID = "my_gcp_conn"
dag = DAG(
'dag_for_testing',
default_args=default_args)#,
#schedule_interval=schedule_interval
#)
# -------------------- Python Functions -------------------- #
def where_am_i():
"""Print the current working directory
"""
print(os.getcwd())
return None
def download_repo():
"""Download the fivethirtyeight data repo as a zip file.
Saves the zip file in a folder called data
"""
pathlib.Path(os.getcwd(), "data").mkdir(parents=True, exist_ok=True)
url = "https://github.com/fivethirtyeight/data/archive/refs/heads/master.zip"
urllib.request.urlretrieve(url, pathlib.Path(os.getcwd(), "data", "master.zip"))
print(url)
return None
def unzip_repo():
"""Unzips a file called master.zip that is saved in
a data folder.
"""
zip_path = pathlib.Path(os.getcwd(), "data", "master.zip")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(pathlib.Path(os.getcwd(), "data"))
return None
def upload_dataframe_into_bq(pathlib_path_to_csv):
"""With a file path to a csv file loads the csv file
into a table in BigQuery. It uses the filename of the
csv file as the table name.
Args:
pathlib_path_to_csv (pathlib Path): [description]
"""
my_project = 'marquin-personal-tools'
my_dataset = 'fivethirthyeight_staging'
my_table = pathlib_path_to_csv.stem
# upload to BigQuery
client = bigquery.Client(project=my_project)
table_name = pathlib_path_to_csv.stem
table_ref = client.dataset(my_dataset).table(table_name)
# set job load configuration settings
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1 # ignore the header
job_config.autodetect = True
# delete the table if already exists
table_id = my_project + '.' + my_dataset + '.' + my_table
ic(table_id)
client.delete_table(table_id, not_found_ok=True)
# load the csv file into BQ table
with open(pathlib_path_to_csv, "rb") as source_file:
job = client.load_table_from_file(
source_file, table_ref, job_config=job_config
)
# job is async operation so we have to wait for it to finish
job.result()
return None
def load_csvs():
from google.cloud import bigquery
from icecream import ic
import os
import pathlib
# This is to get the directory that the program
# is currently running in.
#dir_path = os.path.dirname(os.path.realpath(__file__))
#dir_path = os.path.join(dir_path, "data")
# directory for testing
#dir_path = "/home/fuzzy/Documents/Projects/fivethirtyeight_to_BQ/test_data"
i = 0
for root, dirs, files in os.walk(pathlib.Path(os.getcwd(), "data")):
for file in files:
if i > 10:
return None
# change the extension from '.mp3' to
# the one of your choice.
if file.endswith('.csv'):
#ic(root)
#ic(dirs)
#ic(file)
temp_filepath = pathlib.Path(root, file)
ic(temp_filepath)
#temp_filepath.stem
try:
upload_dataframe_into_bq(temp_filepath)
i = i + 1
except:
print("#"*8)
print("#"*8)
print("Failed to load the following file to BQ:")
ic(temp_filepath)
print("#"*8)
print("#"*8)
return None
def clean_directory():
"""Deletes the data directory and its contents.
"""
directory_to_remove = pathlib.Path(os.getcwd(), "data")
shutil.rmtree(directory_to_remove)
return None
# -------------------- Construct DAGs -------------------- #
t1 = PythonOperator(
task_id='print_location',
python_callable=where_am_i,
dag=dag
)
t2 = PythonOperator(
task_id='get_the_data',
python_callable=download_repo,
dag=dag
)
t3 = PythonOperator(
task_id='unzip_the_data',
python_callable=unzip_repo,
dag=dag
)
t4 = BigQueryOperator(
task_id='bq_create_prod_version_of_drink',
sql='''
#standardSQL
SELECT *,
RANK() OVER (
ORDER BY beer_servings desc
) AS beer_rank,
RANK() OVER (
ORDER BY spirit_servings desc
) AS spirit_rank,
RANK() OVER (
ORDER BY wine_servings desc
) AS wine_rank,
beer_servings + spirit_servings + wine_servings as total_servings
FROM `marquin-personal-tools.fivethirthyeight_staging.drinks`
ORDER BY country asc
''',
destination_dataset_table='marquin-personal-tools.fivethirtyeight_prod.drinks_prod',
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
use_legacy_sql=False,
bigquery_conn_id=BQ_CONN_ID,
dag=dag
)
t5 = PythonVirtualenvOperator(
task_id='load_csvs_to_BQ',
python_callable=load_csvs,
requirements=["bigquery==0.0.15", "icecream==2.1.1"],
dag=dag
)
t6 = PythonOperator(
task_id='delete_downloaded_data',
python_callable=clean_directory,
dag=dag
)
# -------------------- Set up dependencies -------------------- #
t1 >> t2
t2 >> t3
t3 >> t5
t5 >> t4
t5 >> t6