-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl.py
104 lines (80 loc) · 2.95 KB
/
etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
'''
Author: Leandro Kellermann de Oliveira <[email protected]>
Date: 2021-03-20 09:00:26
Last modified by: Leandro Kellermann de Oliveira <[email protected]>
Last modified: 2021-03-27 23:58:35
myProjects <<MIT>>
'''
import psycopg2
import configparser
from datetime import datetime
from sql_queries import create_table_queries, copy_table_queries, insert_table_queries, rc_queries
def create_tables(cur: object) -> None:
"""Method to create staging tables
Args:
cur (object): psycopg cursor to execute queries.
"""
for query in create_table_queries:
print(
f'Creating table:\n############\nExecuting query:\n{query}\n###########\n\n')
cur.execute(query)
def load_staging_tables(cur: object, role_arn: str) -> None:
"""Method to create staging tables
Args:
cur (object): psycopg cursor to execute queries.
role_arn (str): IAM role to copy tables.
"""
for query in copy_table_queries:
print(
f'Copying table:\n############\nExecuting query:\n{query}\n###########\n\n')
cur.execute(query.format(role_arn=role_arn))
def insert_tables(cur: object) -> None:
"""Method to insert data into provided tables tables.
Args:
cur (object): psycopg2 cursor object to execute queries.
"""
for query in insert_table_queries:
print(
f'Insert tables:\n############\nExecuting query:\n{query}\n###########\n\n')
cur.execute(query)
def check(cur: object, start_time: datetime):
"""Method to check the number of rows inserted on tables in this project.
Args:
cur (object): psycopg2 cursor object to execute queries.
start_time (datetime): time when etl.py started.
"""
for query in rc_queries:
now = datetime.now()
cur.execute(query)
results = cur.fetchone()
txt = f'{now} Count #rows:\n############\nQuery: {query}\nResults{results}\n############\n\n'
print(txt)
strf = start_time.strftime("%Y%m%d%H%M%S")
with open(f"etl_{strf}.results", 'a') as f:
f.write(txt)
def main():
"""Main method to execute ETL operations."""
start_time = datetime.now()
# Get data warehouse access information:
config = configparser.ConfigParser()
config.read('dwh.cfg')
# Connect to cluster
con = psycopg2.connect("host={} \
dbname={} \
user={} \
password={} \
port={}".format(*config['CLUSTER'].values()))
con.set_session(autocommit=True)
cur = con.cursor()
create_tables(cur)
load_staging_tables(cur, config['IAM_ROLE']['arn'])
insert_tables(cur)
endpoint = config['CLUSTER']['HOST']
print(f'DW created on {endpoint}!')
print('Checking results...\n')
check(cur, start_time)
cur.close()
con.close()
print('\n\nEnd of execution!')
if __name__ == "__main__":
main()