-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.py
84 lines (77 loc) · 2.89 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# -*- coding: utf-8 -*-
import sqlite3
class DataBase():
tableFields = {
"DATA": ["ID", "DATE", "MEMO", "RATING"],
"SETTING": ["KEY", "VALUE"]
}
status = ""
def __init__(self, db):
# tries to establish connection, creates on fail.
try:
self.connection = sqlite3.connect(db)
c = self.connection.cursor()
c.execute(f'''
CREATE TABLE IF NOT EXISTS DATA
({self.tableFields["DATA"][0]} TEXT PRIMARY KEY NOT NULL,
{self.tableFields["DATA"][1]} TEXT,
{self.tableFields["DATA"][2]} TEXT,
{self.tableFields["DATA"][3]} INTEGER);''')
c.execute(f'''
CREATE TABLE IF NOT EXISTS SETTING
({self.tableFields["SETTING"][0]} TINYTEXT UNIQUE,
{self.tableFields["SETTING"][1]} TINYTEXT);''')
self.connection.commit()
except Exception as error:
self.status +=f" {error} {db}"
def __del__(self):
self.connection.close()
def write(self, table="", key_value={}, where={}):
condition, values, updates = [], [], []
for key in where:
condition.append(f"{self.sanitize(key, False)}={self.sanitize(where[key])}")
for column in self.tableFields[table]:
if column in key_value:
values.append(f"{self.sanitize(key_value[column])}")
updates.append(f"{column}={self.sanitize(key_value[column])}")
else:
values.append("NULL")
# try to update exsting
if len(condition):
self.connection.execute(f"UPDATE {table} SET {', '.join(updates)} WHERE {' AND '.join(condition)};")
# otherwise insert
self.connection.execute(f"INSERT OR IGNORE INTO {table} ({', '.join(self.tableFields[table])}) VALUES ({', '.join(values)});")
self.connection.commit()
return True
def read(self, fields = [], table = "", where = {}):
condition, fields = [], [self.sanitize(field, False) for field in fields] if fields else "*"
for key in where:
condition.append(f"{self.sanitize(key, False)}={self.sanitize(where[key])}")
cursor = self.connection.cursor()
cursor.execute(f"SELECT {' AND '.join(fields)} FROM {table} {('WHERE ' + ' AND '.join(condition)) if len(condition) else ''};")
result = cursor.fetchall()
if result is not None:
return result
return False
def delete(self, table = "", where = {}):
condition= []
for key in where:
condition.append(f"{self.sanitize(key, False)}={self.sanitize(where[key])}")
if len(condition):
self.connection.execute(f"DELETE FROM {table} WHERE {' AND '.join(condition)};")
self.connection.commit()
return True
def sanitize(self, value = "", quotes = True):
# sanitary strings to concatenate to sql queries.
# if not quotes it probbly is a column key
if type(value)==str and value != "NULL":
if value.strip()=="":
return "NULL"
return value.replace('\'','\'\'') if not quotes else "'" + value.replace('\'','\'\'') + "'"
return value
def clear(self, tables=[]):
# reset database
for table in tables:
self.connection.executescript(f"DELETE FROM {table}; VACUUM;")
self.connection.commit()
return True