-
Notifications
You must be signed in to change notification settings - Fork 0
/
Cli.py
executable file
·371 lines (337 loc) · 16 KB
/
Cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
#!/usr/bin/python3
import glob
import optparse
import os
import re
import shutil
import sys
from typing import List
from colorama import Fore
from Database import Database
from FormatError import FormatError
class Cli:
MESSAGE_NORMAL = 0
MESSAGE_IMP = 1
MESSAGE_ERR = 2
def __init__(self):
"""
Gui class constructor. Initialize database file name and database object.
"""
self._database_file = None
self._database = Database()
self.work_path = os.path.join('.', 'workDir')
self._parser = optparse.OptionParser('Usage: ./Cli.py -a | -g | -l | -d ID | -s STRING [-f FILE] \nExamples:\n'
'./Cli.py -a\n'
'./Cli.py -g\n'
'./Cli.py -d 42\n'
'./Cli.py -s bear\n'
'./Cli.py -s bear -f database.yml\n'
'./Cli.py -l')
self._parser.add_option('-a', '--add', default=False,
action="store_true", dest="add_record",
help="Run the process of adding a record into database")
self._parser.add_option('-d', '--delete', type='string',
action="store", dest="delete_id",
help="Delete a database record with the passed ID")
self._parser.add_option('-f', '--file', type='string',
action="store", dest="database_file",
help="Open database in the file, if empty the first yaml file in the directory is used")
self._parser.add_option('-g', '--graph', default=False,
action="store_true", dest="make_graph",
help="Create a graph of the database")
self._parser.add_option('-l', '--list', default=False,
action="store_true", dest="list_all",
help="List all records in database")
self._parser.add_option('-s', '--search', type='string',
action="store", dest="search_string",
help="Search for this string in the database, if empty all records are printed")
self._options, _ = self._parser.parse_args()
option_combination = [self._options.add_record, self._options.delete_id,
self._options.make_graph, self._options.search_string, self._options.list_all]
option_combination = [1 for o in option_combination if o]
if len(option_combination) > 1:
self._parser.error('Only one option can be used at a time')
if not option_combination:
self._parser.error('At least one option is required')
@staticmethod
def print_record(records) -> None:
"""
Nice print the passed list of records.
:param records: List of yaml records
:return: None
"""
for record in records:
for name, values in record.items():
print('\n' + Fore.GREEN + str(name) + Fore.RESET)
print('\t' + 'id: ' + Fore.LIGHTBLUE_EX + str(values['id']) + Fore.RESET)
for attribute, content in values.items():
if attribute == 'id':
continue
if attribute in ['email', 'linkto']:
if attribute == 'email':
if content:
print('\t' + 'e-mails:')
else:
print('\t' + 'e-mails: -')
else:
if content:
print('\t' + 'link to:')
else:
print('\t' + 'link to: -')
if content:
for item in content:
print('\t\t' + Fore.YELLOW + str(item) + Fore.RESET)
else:
print('\t' + str(attribute) + ': ' + (
'-' if not content else Fore.LIGHTBLUE_EX + str(content) + Fore.RESET))
@staticmethod
def print_message(message: str, kind: int) -> None:
"""
Print a nice looking message on the screen.
:param message:
:param kind: One of MESSAGE_NORMAL,IMP,ERR if the message should be an error, normal or important.
:return: None
"""
newline = ''
# If the message begins with a new line add it before the ##
if message[0] == '\n':
newline = '\n'
message = message[1:]
if kind == Cli.MESSAGE_ERR:
print(Fore.RED + newline + '## ' + str(message) + Fore.RESET)
elif kind == Cli.MESSAGE_IMP:
print(Fore.CYAN + newline + '## ' + str(message) + Fore.RESET)
else:
print(newline + '## ' + str(message))
def search(self) -> None:
"""
Run database search and print results.
:return: None
"""
self.print_message('Searching for: ' + self._options.search_string, Cli.MESSAGE_IMP)
records = self._database.find(self._options.search_string)
self.print_record(records)
self.print_message('\nFound: ' + str(len(records)) + ' database records', Cli.MESSAGE_IMP)
def _list_all(self) -> None:
"""
List all records in the database.
:return: None
"""
self.print_message('List all records', Cli.MESSAGE_IMP)
records = self._database.find('')
self.print_record(records)
self.print_message('\nDatabase contains: ' + str(len(records)) + ' records', Cli.MESSAGE_IMP)
def _get_linkto(self, kind: bool) -> List[str]:
"""
Return a list of linkto database records, ask the user to provide them.
:param kind: True if link, False if email.
:return: Return a list of linkto database records, ask the user to provide them.
"""
link_list = []
if kind:
print('This account links to:')
else:
print('Associated e-mail addresses:')
while self.confirm('Add another?'):
link_list.append(str(input('Record: ')).lstrip().rstrip())
return link_list
@staticmethod
def _get_email() -> str:
"""
Ask the user for a valid e-mail address and return it.
:return: str, a valid e-mail address.
"""
email = ''
while not re.match(r"[^@]+@[^@]+\.[^@]+", email):
email = str(input('Address: ')).lstrip().rstrip()
return email
@staticmethod
def _get_simple_attribute(name: str, empty: bool) -> str:
"""
Ask the user for an attribute and return it.
:param name: The name of the attribute to get
:param empty: Can it be empty, True if it can?
:return: str, the attribute from user.
"""
attribute = ''
while not attribute:
attribute = str(input(name + ': ')).lstrip().rstrip()
if empty:
return attribute
return attribute
def _get_web_address(self) -> str:
"""
Ask the user for a valid website address and return it.
:return: Valid website address.
"""
address = ''
while not address.startswith('www.'):
address = self._get_simple_attribute('Website address', False)
return address
def add(self) -> None:
"""
Add a new record into the database, ask the user for details.
:return: None
"""
record_id = self._database.get_new_id()
self.print_message('\nAdd new record ID: ' + str(record_id), Cli.MESSAGE_IMP)
self.print_message('Select category:', Cli.MESSAGE_IMP)
print('Email -> "e"')
print('Website -> "w')
print('Company -> "c"')
selection = None
while selection not in ['e', 'w', 'c']:
selection = str(input('Category [e/w/c]: '))
# Add an e-mail
if selection == 'e':
self.print_message('\nAdding a new e-mail', Cli.MESSAGE_IMP)
email = self._get_email()
login = email.split('@')[0]
password = self._get_simple_attribute('Password', False)
question = self._get_simple_attribute('Security question', True)
linktos = self._get_linkto(True)
notes = self._get_simple_attribute('Additional notes', True)
self.print_message('\nSaving new e-mail record', Cli.MESSAGE_IMP)
# Record example
# {'[email protected]': {'id': 3, 'linkto': ['[email protected]', '[email protected]'], 'login': 'whitebear',
# 'notes': None, 'password': 'thepassword', 'question': 'what question?'}}
new_record = {email: {'id': record_id, 'linkto': (linktos if linktos else None), 'login': login,
'notes': (notes if notes else None), 'password': password,
'question': (question if question else None)}}
if self._database.add('emails', new_record):
self.print_message('Record added, database saved', Cli.MESSAGE_IMP)
# Add a website
if selection == 'w':
self.print_message('\nAdding a new website', Cli.MESSAGE_IMP)
web_name = self._get_web_address()
login = self._get_simple_attribute('Login', False)
password = self._get_simple_attribute('Password', False)
question = self._get_simple_attribute('Security question', True)
emails = self._get_linkto(False)
linktos = self._get_linkto(True)
notes = self._get_simple_attribute('Additional notes', True)
self.print_message('\nSaving new website record', Cli.MESSAGE_IMP)
# Record example
# {'www.github.com': {'email': ['[email protected]'], 'id': 5, 'linkto': None, 'login': 'athwale',
# 'notes': None, 'password': 'probablyisnt', 'question': 'dog'}}
new_record = {web_name: {'email': (emails if emails else None), 'id': record_id,
'linkto': (linktos if linktos else None), 'login': login,
'notes': (notes if notes else None), 'password': password,
'question': (question if question else None)}}
if self._database.add('websites', new_record):
self.print_message('Record added, database saved', Cli.MESSAGE_IMP)
# Add a company
if selection == 'c':
self.print_message('\nAdding a new company', Cli.MESSAGE_IMP)
company_name = self._get_simple_attribute('Company name', False)
emails = self._get_linkto(False)
linktos = self._get_linkto(True)
notes = self._get_simple_attribute('Additional notes', True)
self.print_message('\nSaving new company record', Cli.MESSAGE_IMP)
# Record example
# {'Zoo': {'email': [], 'id': 9, 'linkto': [], 'notes': None}}
new_record = {company_name: {'email': (emails if emails else None), 'id': record_id,
'linkto': (linktos if linktos else None),
'notes': (notes if notes else None)}}
if self._database.add('companies', new_record):
self.print_message('Record added, database saved', Cli.MESSAGE_IMP)
# Save to disk and replace
if not self._replace_database():
raise FormatError('Error replacing database')
def delete(self) -> None:
"""
Remove a record from database based on ID. Get input from the user.
:return: None
"""
self.print_message('\nRemove record ID: ' + str(self._options.delete_id) + ':', Cli.MESSAGE_IMP)
record = self._database.find_id(int(self._options.delete_id))
self.print_record([record])
if self.confirm('\n Are you sure?'):
self.print_message('Removing', Cli.MESSAGE_NORMAL)
if self._database.delete(int(self._options.delete_id)):
self.print_message('Record deleted, database saved', Cli.MESSAGE_IMP)
if not self._replace_database():
raise FormatError('Error replacing database')
else:
self.print_message('Deletion canceled', Cli.MESSAGE_IMP)
def graph(self) -> None:
"""
Create a graph of the database. Back up old version of the graph if exists in the directory.
:return: None
"""
self.print_message('Creating database graph', Cli.MESSAGE_IMP)
# Rename previous graph
file_path = os.path.join('.', 'graph.pdf')
if os.path.exists(file_path):
self.print_message('Backing up previous graph', Cli.MESSAGE_IMP)
os.rename(file_path, os.path.join('.', 'graph.old.pdf'))
self._database.graph('graph')
# Remove intermediate file
os.remove(os.path.join('.', 'graph'))
self.print_message('Graph saved: ' + str(os.path.join('.', 'graph.pdf')), Cli.MESSAGE_IMP)
def _replace_database(self) -> bool:
"""
Replace original database file with the valid workingCopy database after transactions.
:return: True if replaced successfully.
"""
os.replace(os.path.join(self.work_path, 'workCopy.yml'), os.path.join('.', self._database_file))
self.print_message(str(self._database_file + ' replaced successfully'), Cli.MESSAGE_IMP)
return True
@staticmethod
def confirm(prompt: str) -> bool:
"""
Ask the user for confirmation, return True of False.
:param prompt: str, Display this prompt.
:return: True if the user answered yes.
"""
answer = None
while answer not in ['y', 'Y', 'n', 'N']:
answer = str(input(prompt + ' [y/n]: '))
if answer in ['y', 'Y']:
return True
if answer in ['n', 'N']:
return False
def run(self) -> None:
"""
This method begins the user interaction with the database.
:return: None
"""
if self._options.database_file:
self._database_file = self._options.database_file
else:
yml_files = glob.glob('*.yml')
if yml_files:
self._database_file = yml_files[0]
self.print_message('Using database: ' + str(self._database_file), Cli.MESSAGE_IMP)
else:
self._parser.error('no database file found in current directory, use -f to specify '
'or create an empty .yml file')
if not os.path.exists(self.work_path) and not os.path.isdir(self.work_path):
os.mkdir(self.work_path)
try:
if not os.path.exists(self._database_file):
raise FormatError('Database file: ' + str(self._database_file) + ' does not exist')
shutil.copyfile(self._database_file, os.path.join(self.work_path, 'workCopy.yml'))
self.print_message('Creating work copy in: ' + str(os.path.join(self.work_path, 'workCopy.yml')), False)
if self._database.load(os.path.realpath(os.path.join(self.work_path, 'workCopy.yml'))):
self.print_message('Database workCopy.yml load OK', Cli.MESSAGE_IMP)
else:
self._parser.error('Incorrect database file')
if self._options.add_record:
self.add()
elif self._options.delete_id:
self.delete()
elif self._options.search_string:
self.search()
elif self._options.list_all:
self._list_all()
else:
self.graph()
except FormatError as ex:
self.print_message('Database error:', Cli.MESSAGE_ERR)
print(ex, file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
data = 'data.yml'
cli = Cli()
cli.run()