-
Notifications
You must be signed in to change notification settings - Fork 2
/
PGUtils.py
148 lines (137 loc) · 5.35 KB
/
PGUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import psycopg2
import json
from math import log
class PGConfig:
def __init__(self):
self.keepExecutedPlan =True
self.maxTimes = 5
self.maxTime = 300000
LatencyDict = {}
selectivityDict = {}
LatencyRecordFileHandle = None
class PGRunner:
def __init__(self,dbname = '',user = '',password = '',host = '',port = '',isCostTraining = True,latencyRecord = True,latencyRecordFile = "RecordFile.json"):
"""
:param dbname:
:param user:
:param password:
:param host:
:param port:
:param latencyRecord:-1:loadFromFile
:param latencyRecordFile:
"""
self.con = psycopg2.connect(database="test", user="postgres",password="123", host="localhost", port=5432)
self.cur = self.con.cursor()
self.config = PGConfig()
self.isLatencyRecord = latencyRecord
# self.LatencyRecordFileHandle = None
global LatencyRecordFileHandle
self.isCostTraining = isCostTraining
if latencyRecord:
LatencyRecordFileHandle = self.generateLatencyPool(latencyRecordFile)
def generateLatencyPool(self,fileName):
"""
:param fileName:
:return:
"""
import os
import json
if os.path.exists(fileName):
f = open(fileName,"r")
lines = f.readlines()
for line in lines:
data = json.loads(line)
global LatencyDict
LatencyDict[data[0]] = data[1]
f = open(fileName,"a")
else:
f = open(fileName,"w")
return f
def getLatency(self, sql,sqlwithplan):
"""
:param sql:a sqlSample object
:return: the latency of sql
"""
# query = sql.toSql()
if self.isCostTraining:
return self.getCost(sql,sqlwithplan)
global LatencyDict
if self.isLatencyRecord:
if sqlwithplan in LatencyDict:
return LatencyDict[sqlwithplan]
self.cur.execute("set join_collapse_limit = 1;")
self.cur.execute("SET statement_timeout = "+str(int(sql.timeout()))+ ";")
self.cur.execute("set max_parallel_workers=1;")
self.cur.execute("set max_parallel_workers_per_gather = 1;")
self.cur.execute("set geqo_threshold = 20;")
self.cur.execute("EXPLAIN "+sqlwithplan)
thisQueryCost = self.getCost(sql,sqlwithplan)
print("time",thisQueryCost)
if thisQueryCost / sql.getDPCost()<100:
try:
self.cur.execute("EXPLAIN ANALYZE "+sqlwithplan)
rows = self.cur.fetchall()
row = rows[0][0]
afterCost = float(rows[0][0].split("actual time=")[1].split("..")[1].split(" ")[
0])
except:
self.con.commit()
afterCost = max(thisQueryCost / sql.getDPCost()*sql.getDPlantecy(),sql.timeout())
else:
afterCost = max(thisQueryCost / sql.getDPCost()*sql.getDPlantecy(),sql.timeout())
afterCost += 5
if self.isLatencyRecord:
LatencyDict[sqlwithplan] = afterCost
global LatencyRecordFileHandle
LatencyRecordFileHandle.write(json.dumps([sqlwithplan,afterCost])+"\n")
LatencyRecordFileHandle.flush()
return afterCost
def getCost(self,sql,sqlwithplan):
"""
:param sql: a sqlSample object
:return: the cost of sql
"""
self.cur.execute("set join_collapse_limit = 1;")
self.cur.execute("set max_parallel_workers=1;")
self.cur.execute("set max_parallel_workers_per_gather = 1;")
self.cur.execute("set geqo_threshold = 20;")
self.cur.execute("SET statement_timeout = 100000;")
self.cur.execute("EXPLAIN "+sqlwithplan)
#print("sql",sqlwithplan)
rows = self.cur.fetchall()
row = rows[0][0]
afterCost = float(rows[0][0].split("cost=")[1].split("..")[1].split(" ")[
0])
self.con.commit()
return afterCost
def getDPPlanTime(self,sql,sqlwithplan):
"""
:param sql: a sqlSample object
:return: the planTime of sql
"""
import time
startTime = time.time()
cost = self.getCost(sql,sqlwithplan)
plTime = time.time()-startTime
return plTime
def getSelectivity(self,table,whereCondition):
global selectivityDict
if whereCondition in selectivityDict:
return selectivityDict[whereCondition]
self.cur.execute("SET statement_timeout = "+str(int(100000))+ ";")
totalQuery = "select * from "+table+";"
# print(totalQuery)
self.cur.execute("EXPLAIN "+totalQuery)
rows = self.cur.fetchall()[0][0]
# print(rows)
# print(rows)
total_rows = int(rows.split("rows=")[-1].split(" ")[0])
resQuery = "select * from "+table+" Where "+whereCondition+";"
# print(resQuery)
self.cur.execute("EXPLAIN "+resQuery)
rows = self.cur.fetchall()[0][0]
# print(rows)
select_rows = int(rows.split("rows=")[-1].split(" ")[0])
selectivityDict[whereCondition] = -log(select_rows/total_rows)
# print(stored_selectivity_fake[whereCondition],select_rows,total_rows)
return selectivityDict[whereCondition]