Skip to content

Chapter 3: Reading and writing files

Reading and writing files in python

Writing csv with Python csv library


import csv



output = open('myCSV.csv',mode='w')

mywriter=csv.writer(output)

# define a header and write to file
header = ['name','age']
mywriter.writerow(header)

# write some data to the file
data = ['Bob Smith', 40]
mywriter.writerow(data)


output.close()

# create some fake data to use later

from faker import Faker
import csv

output = open('data/data.csv',mode='w')

fake = Faker()

header = ['name', 'age','street','city','state','zip','lng','lat']

mywriter= csv.writer(output)
mywriter.writerow(header)

for i in range(1000):
    mywriter.writerow([fake.name(),
                      fake.random_int(min=18, max=80, step=1),
                      fake.street_address(),
                      fake.city(),
                      fake.state(),
                      fake.zipcode(),
                      fake.longitude(),
                      fake.latitude()])

output.close()

Reading csv data with Python


# reading data using csv package
with open('data/data.csv') as f:
    my_reader=csv.DictReader(f)

    headers = next(my_reader)

    for row in my_reader:
        print(row['name'])



# reading data using pandas

import pandas as pd

data = pd.read_csv('data/data.csv')

data.head(10)

Writing csv with Python pandas library


# load data from a dictionary into a pandas dataframe
data={'Name':['Paul','Bob','Susan','Yolanda'],
'Age':[23,45,18,21]}

data = pd.DataFrame(data)

# save dictionary data to a csv file with pandas
data.to_csv('data/fromdf.CSV',index=False)

Writing JSON with Python


from faker import Faker
import json

output = open('data/data.json', 'w')

fake = Faker()


all_data = {}
all_data['records'] = []

for x in range(10):

    data = {"name":fake.name(),
           "age":fake.random_int(min=18, max=80, step = 1),
           "street":fake.street_address(),
           "city":fake.city(),
           "state":fake.state(),
           "zip":fake.zipcode(),
           "lng":float(fake.longitude()),
           "lat":float(fake.latitude())}

    all_data['records'].append(data)

json.dump(all_data, output)

Reading JSON files with Python

with open('data/data.json', 'r') as f:
    data = json.load(f)

# see a particular data point
data['records'][0]

# a particular variable in a data point
data['records'][0]['name']

Reading JSON files with Pandas


import pandas.io.json as pd_JSON


f=open('data/data.json','r')
data=pd_JSON.loads(f.read())

df=pd.json_normalize(data,record_path='records')

Building data pipelines in Apache Airflow

import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

import pandas as pd

def csvToJson():
    df=pd.read_csv('/home/fuzzy/Documents/Projects/data-engineering-with-python/data/data.csv')
    for i,r in df.iterrows():
        print(r['name'])
    df.to_json('/home/fuzzy/Documents/Projects/data-engineering-with-python/data/fromAirflow.json',orient='records')




default_args = {
    'owner': 'marquin',
    'start_date': dt.datetime(2021, 6, 27),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}


with DAG('MyCSVDAG',
         default_args=default_args,
         schedule_interval=timedelta(minutes=5),      # '0 * * * *',
         ) as dag:

    print_starting = BashOperator(task_id='starting',
                               bash_command='echo "I am reading the CSV now....."')

    csvJson = PythonOperator(task_id='convertCSVtoJson',
                                 python_callable=csvToJson)


print_starting >> csvJson