-
Notifications
You must be signed in to change notification settings - Fork 1
/
BQQuery.py
132 lines (113 loc) · 5.38 KB
/
BQQuery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from __future__ import print_function
from google.cloud import bigquery
import pandas as pd
#from hurry.filesize import size
class Query(object):
"""
Abstract the querying process for GBQ tables.
Object is instanciated by using Query(). The object has 2 methods:
- standardQuery: returns an iterable composed of Row() functions
- pandasQuery: returns a pandas DataFrame
Each method takes 3 arguments:
- query: the query to execute
- legacy: determines if SQL language used is satandard or legacy. default to False (standard)
- safe: determines if user should confirm the run before executing the query. If set to True, the area queried will be display.
"""
def __init__(self):
self.client = bigquery.Client()
def standardQuery(self, query, legacy=False, location='US',safe=False):
query = query
if not safe:
if not legacy:
if bigquery.__version__ == '0.32.0':
query_job = self.client.query(query, location=location)
else:
query_job = self.client.query(query)
return query_job
else:
query_config = bigquery.QueryJobConfig()
query_config.use_legacy_sql = True
if bigquery.__version__ == '0.32.0':
query_job = self.client.query(query, location=location, job_config=query_config)
else:
query_job = self.client.query(query, job_config=query_config)
return query_job
else:
answer = self.safeQuery(query, location, legacy)
if not legacy:
if answer[0].lower() == 'y':
if bigquery.__version__ == '0.32.0':
query_job = self.client.query(query, location=location)
else:
query_job = self.client.query(query)
return query_job
elif answer[0].lower() == 'y':
return 'Query execution has been aborded by user'
else:
raise ValueError('System could not understand user input. Make sure you\'ve entered "y" or "n"')
else:
query_conf = bigquery.QueryJobConfig()
query_conf.use_legacy_sql = True
if answer[0].lower() == 'y':
if bigquery.__version__ == '0.32.0':
query_job = self.client.query(query, location=location, job_config=query_conf)
else:
query_job = self.client.query(query, job_config=query_conf)
return query_job
elif answer[0].lower() == 'y':
return 'Query execution has been aborded by user'
else:
raise ValueError('System could not understand user input. Make sure you\'ve entered "y" or "n"')
def pandasQuery(self, query, legacy=False, location='US',safe=False):
query = query
if not safe:
if not legacy:
df = self.client.query(query).to_dataframe()
else:
query_conf = bigquery.QueryJobConfig()
query_conf.use_legacy_sql = True
df = self.client.query(query, job_config=query_conf).to_dataframe()
return df
else:
answer = self.safeQuery(query, location,legacy)
if answer[0].lower() == 'y':
if not legacy:
df = self.client.query(query).to_dataframe()
else:
query_config = bigquery.QueryJobConfig()
query_config.use_legacy_sql = True
df = self.client.query(query, job_config=query_config).to_dataframe()
return df
elif answer[0].lower() == 'y':
return 'Query execution has been aborded by user'
else:
raise ValueError('System could not understand user input. Make sure you\'ve entered "y" or "n"')
def queryToTable(self, q, dataset=None, destination_table=None):
client = bigquery.Client()
datatset_ref = client.dataset(dataset)
table_ref = datatset_ref.table(destination_table)
job_config = bigquery.QueryJobConfig()
job_config.destination = table_ref
q = q
if float(bigquery.__version__[:-2]) >= 0.32:
query_job = client.query(q, location='US', job_config=job_config)
query_job.result()
return query_job
else:
query_job = client.query(q,job_config=job_config)
query_job.result()
return query_job
def safeQuery(self, query, location, legacy):
query_config = bigquery.QueryJobConfig()
query_config.dry_run = True
query_config.use_query_cache = False
if legacy:
query_config.use_legacy_sql = True
if bigquery.__version__ == '0.32.0':
query_job = self.client.query(query, location=location, job_config=query_config)
else:
query_job = self.client.query(query, job_config=query_config)
processed_size = query_job.total_bytes_processed
print(f'Your query will process {processed_size}, do you want to processed? [y/n]')
answer = input()
return answer