-
Notifications
You must be signed in to change notification settings - Fork 0
/
etl_code.py
96 lines (70 loc) · 2.97 KB
/
etl_code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime
log_file = "log_file.txt"
target_file = "transformed_data.csv"
def extract_from_csv(file_to_process):
dataframe = pd.read_csv(file_to_process)
return dataframe
def extract_from_json(file_to_process):
dataframe = pd.read_json(file_to_process, lines=True)
return dataframe
def extract_from_xml(file_to_process):
dataframe = pd.DataFrame(columns=['name', 'height', 'weight'])
tree = ET.parse(file_to_process)
root = tree.getroot()
for person in root:
name = person.find("name").text
height = float(person.find("height").text)
weight = float(person.find("weight").text)
dataframe = pd.concat([dataframe, pd.DataFrame([{"name": name, "height": height, "weight": weight}])], ignore_index=True)
return dataframe
def extract():
# Create a dataframe to hold extracted data
extracted_data = pd.DataFrame(columns=['name', 'height', 'weight'])
#process all csv files
for csvfile in glob.glob("*.csv"):
extracted_data = pd.concat([extracted_data, extract_from_csv(csvfile)], ignore_index=True)
for jsonfile in glob.glob("*.json"):
extracted_data = pd.concat([extracted_data, extract_from_json(jsonfile)], ignore_index=True)
for xmlfile in glob.glob("*.xml"):
extracted_data = pd.concat([extracted_data, extract_from_xml(xmlfile)], ignore_index=True)
return extracted_data
def transform(data):
'''Convert inches to meters and round off to two decimals
1 inch is 0.0254 meters '''
data['height'] = round(data['height'] * 0.0254, 2)
'''Convert pounds to kilograms and round off to two decimals
1 pound is 0.45359237 kilograms '''
data['weight'] = round(data.weight * 0.45359237, 2)
return data
def load_data(target_file, transformed_data):
transformed_data.to_csv(target_file)
def log_progress(message):
timestamp_format = '%Y-%h-%d-%H:%M:%S' #Year-Month-Day Hour:Minutes:Seconds
now = datetime.now()
timestamp = now.strftime(timestamp_format)
with open(log_file, "a") as f:
f.write(timestamp + ',' + message + '\n')
# Log the initialization of the ETL process
log_progress("ETL Job Started")
# Log the beginning of the Extraction process
log_progress("Extract phase Started")
extracted_data = extract()
# Log the completion of the Extraction process
log_progress("Extract phase Ended")
# Log the beginning of the Transformation process
log_progress("Transform phase Started")
transformed_data = transform(extracted_data)
print("Transformed Data")
print(transformed_data)
# Log the completion of the Transformation process
log_progress("Transform phase Ended")
# Log the beginning of the Loading process
log_progress("Load phase Started")
load_data(target_file,transformed_data)
# Log the completion of the Loading process
log_progress("Load phase Ended")
# Log the completion of the ETL process
log_progress("ETL Job Ended")