-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot_data.py
165 lines (141 loc) · 6.57 KB
/
bot_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import sqlite3
import datetime_funcs
# dates used in database query should be formatted as "YYYY-MM-DD HH:MM:SS"
class BotData:
def __init__(self, name="data/BotData.db"):
self.table_meta = {
"BitcoinHistorical": {
"Name": "BitcoinHistorical",
"CreateCmd": "CREATE TABLE IF NOT EXISTS BitcoinHistorical ("
" date text NOT NULL PRIMARY KEY, "
" price real NOT NULL,"
" UNIQUE(date)"
");",
"InsertCmd": "INSERT or REPLACE INTO BitcoinHistorical (date, price) "
"VALUES (?,?);",
"SelectWhereDateCmd": "SELECT * FROM BitcoinHistorical "
"WHERE date >= ? AND date <= ? "
"ORDER BY date ASC;",
"SelectAllCmd": "SELECT * FROM BitcoinHistorical "
"ORDER BY date ASC;",
"SelectNewCmd": "SELECT * "
"FROM "
"("
" SELECT * FROM BitcoinHistorical "
" ORDER BY date DESC "
" LIMIT (?) "
")"
"ORDER BY date ASC;",
"SelectLastCmd": "SELECT * FROM BitcoinRealTime "
"ORDER BY date DESC "
"LIMIT 1 ",
"NewCount": "0",
},
"BitcoinRealTime": {
"Name": "BitcoinRealTime",
"CreateCmd": "CREATE TABLE IF NOT EXISTS BitcoinRealTime ("
" date text NOT NULL PRIMARY KEY, "
" price real NOT NULL,"
" UNIQUE(date)"
");",
"InsertCmd": "INSERT or REPLACE INTO BitcoinRealTime (date, price) "
"VALUES (?,?);",
"SelectWhereDateCmd": "SELECT * FROM BitcoinRealTime "
"WHERE date >= ? AND date <= ? "
"ORDER BY date ASC;",
"SelectAllCmd": "SELECT * FROM BitcoinRealTime "
"ORDER BY date ASC;",
"SelectNewCmd": "SELECT * "
"FROM "
"("
" SELECT * FROM BitcoinRealTime "
" ORDER BY date DESC "
" LIMIT (?) "
")"
"ORDER BY date ASC;",
"SelectLastCmd": "SELECT * FROM BitcoinRealTime "
"ORDER BY date DESC "
"LIMIT 1 ",
"NewCount": "0"
}
}
self.name = name
self.conn = sqlite3.connect(name)
self.cursor = self.conn.cursor()
# Rebuilding the database
#self.rebuild_tables()
self.build_all_tables()
# Opens a connection to database $name
def open_database(self):
self.conn = sqlite3.connect(self.name)
self.cursor = self.conn.cursor()
# Closes current database connection
def close_database(self):
self.conn.commit()
self.conn.close()
def drop_table(self, table):
self.cursor.execute("DROP TABLE "+table+";")
def build_all_tables(self):
for key, item in self.table_meta.items():
sql_cmd = item["CreateCmd"]
self.cursor.execute(sql_cmd)
def drop_all_tables(self):
# Select all tables
self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
# Loop through result
for i in self.select_generator():
self.drop_table(i[0])
def rebuild_tables(self):
self.drop_all_tables()
self.build_all_tables()
# ----------------------------------------------- Insert Commands ------------------------------------------
def insert(self, table, array_tuple):
if table in self.table_meta.keys():
sql_cmd = self.table_meta[table]["InsertCmd"]
self.cursor.executemany(sql_cmd, array_tuple)
self.conn.commit()
# Update new element
self.table_meta[table]["NewCount"] = str(int(self.table_meta[table]["NewCount"]) + len(array_tuple))
# ------------------------------------------- Select Commands ------------------------------------------------
def select(self, table, date_range=tuple()):
if table in self.table_meta.keys():
if date_range:
sql_cmd = self.table_meta[table]["SelectAllCmd"]
self.cursor.execute(sql_cmd)
else:
sql_cmd = self.table_meta[table]["SelectWhereDateCmd"]
self.cursor.execute(sql_cmd, date_range)
for i in self.select_generator():
yield i
def select_new(self, table):
if table in self.table_meta.keys():
if self.table_meta[table]["NewCount"]:
sql_cmd = self.table_meta[table]["SelectNewCmd"]
new_count = (self.table_meta[table]["NewCount"], )
self.cursor.execute(sql_cmd, new_count)
# Clear NewCount
self.table_meta[table]["NewCount"] = "0"
for i in self.select_generator():
yield i
def select_last(self, table):
if table in self.table_meta.keys():
sql_cmd = self.table_meta[table]["SelectLastCmd"]
self.cursor.execute(sql_cmd)
for i in self.select_generator():
yield i
def select_all(self, table):
if table in self.table_meta.keys():
sql_cmd = self.table_meta[table]["SelectAllCmd"]
self.cursor.execute(sql_cmd)
for i in self.select_generator():
yield i
# ------------------------------------------- Misc Functions -------------------------------------------
# This function is used as a generator to yield select results. This function is automatically called after each
# select command on the database is executed. The amount parameter specifies how many rows to select at a time.
def select_generator(self, amount=1000):
while True:
rows = self.cursor.fetchmany(amount)
if not rows:
break
for row in rows:
yield row