This repository has been archived by the owner on May 18, 2020. It is now read-only.
forked from jirutka/ldap-passwd-webui
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
executable file
·369 lines (288 loc) · 12.7 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
#!/usr/bin/env python3
# coding: utf-8
import bottle
from bottle import get, post, static_file, request, route, template, install
from bottle import SimpleTemplate
from configparser import ConfigParser
from ldap3 import Connection, LDAPBindError, LDAPInvalidCredentialsResult, Server
from ldap3 import AUTH_SIMPLE, SUBTREE, MODIFY_REPLACE, HASHED_MD5
from ldap3.core.exceptions import LDAPConstraintViolationResult, LDAPUserNameIsMandatoryError
import os
from os import path, urandom
import sys
import uuid
from bottle.ext import sqlite
from hashlib import sha1
import time
import datetime
from base64 import encodestring as encode
from base64 import decodestring as decode
from captcha.image import ImageCaptcha
from lib.mail import Email
# from lib.captcha import Captcha
# Устанавливаем кодировку
reload(sys)
sys.setdefaultencoding('utf-8')
@get('/')
def get_index():
return index_tpl()
@get('/email')
def get_email():
global captcha, relative_path_captcha, full_path_captcha
captcha = str(uuid.uuid1())[:5]
relative_path_captcha = '%s%s.png' % (CONF['captcha']['path_image'], captcha[:4])
full_path_captcha = '%s/%s' % (os.getcwdu(), relative_path_captcha)
image = ImageCaptcha(fonts=[CONF['captcha']['font'], CONF['captcha']['font']])
data = image.generate(captcha)
image.write(captcha, full_path_captcha)
return email_tpl(path_captcha=relative_path_captcha,
ok='0',
expire=expire_info)
@post('/email')
def post_email(db):
form = request.forms.getunicode
email = form('email')
if (captcha != form('captcha')):
return email_tpl(alerts=[('error', "Неверный код")],
path_captcha=relative_path_captcha,
ok='0',
expire=expire_info)
# каптча
if find_email(email) != None:
hash_user = sha1(email)
id_user = str(hash_user.hexdigest())
hash_session = sha1(id_user)
id_session = str(hash_session.hexdigest())
ip = str(request.environ.get('HTTP_X_FORWARDED_FOR'))
user_name = find_email(email)['cn'][0]
if check_db_email(db, email):
return email_tpl(alerts=[('error', "Письмо уже было отправлено вам на почту.(Проверьте спам)")],
path_captcha=relative_path_captcha,
ok='0',
expire=expire_info)
db.execute(
"INSERT INTO user_code (id_user, id_session, email, username, date_start, ip) "
"VALUES ('{0:s}','{1:s}','{2:s}','{3:s}','{4:d}','{5:s}')"
.format(id_user, id_session, email, user_name, unixtime(), ip))
sm = Email(CONF['mail']['smtp'],
int(CONF['mail']['port']),
CONF['mail']['login'],
CONF['mail']['passwd'])
html_message = """\
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
</head>
<body>
<p>Восстановление пароля!<br>
На это письмо не нужно отвечать.<br>
Перейдите по <a href="http://{2:s}/restore/{0:s}/{1:s}">ссылке</a> для восстановление пароля.<br>
Или скопируйте ее в буфер обмена и вставьте в браузер.<br>
http://{2:s}/restore/{0:s}/{1:s}
Ссылка действительна в течении 6 часов.
</p>
</body>
</html>
""".format(id_user, id_session, CONF['server']['hostname'])
# Высылаем линк на почту для подтверждения
sm.send_mail(CONF['mail']['login'], email, 'Восстановление пароля', html_message)
return email_tpl(alerts=[('success', "Пароль был отправлен на почту")],
path_captcha=relative_path_captcha, ok='1', expire=expire_info)
else:
return email_tpl(alerts=[('error', "Пользователь с таким ящиком не найден")],
path_captcha=relative_path_captcha, ok='0', expire=expire_info)
@post('/')
def post_index():
form = request.forms.getunicode
def error(msg):
return index_tpl(username=form('username'), alerts=[('error', msg)])
if form('new-password') != form('confirm-password'):
return error("Password doesn't match the confirmation!")
if len(form('new-password')) < 8:
return error("Password must be at least 8 characters long!")
try:
change_password(form('username'), form('old-password'), form('new-password'))
except Error as e:
print("Unsuccessful attemp to change password for %s: %s" % (form('username'), e))
return error(str(e))
print("Password successfully changed for: %s" % form('username'))
return index_tpl(alerts=[('success', "Пароль был успешно изменен")])
# TODO переделать запрос в базу, а также отправку почты.
@post('/restore/<id_user>/<id_session>')
def restore_passwd(db, id_user, id_session):
form = request.forms.getunicode
if (form('passwd_1') == form('passwd_2')):
username = db.execute('SELECT username FROM user_code WHERE id_user="{0:s}" '
'AND id_session="{1:s}" LIMIT 1'.format(id_user, id_session)).fetchone()[0]
email = db.execute('SELECT email FROM user_code WHERE id_user="{0:s}" '
'AND id_session="{1:s}" LIMIT 1'.format(id_user, id_session)).fetchone()[0]
change_password_ldap_privileges(username, makeSecret(form('passwd_1')))
db.execute('DELETE FROM user_code WHERE id_user="{0:s}" AND id_session="{1:s}"'.format(id_user, id_session))
sm = Email(CONF['mail']['smtp'],
int(CONF['mail']['port']),
CONF['mail']['login'],
CONF['mail']['passwd'])
html_message = """\
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
</head>
<body>
<p><br>
Ваш пароль успешно изменен.<br>
Логин:{0:s}<br>
Пароль:{1:s}<br>
</p>
</body>
</html>
""".format(username, form('passwd_1'))
# Высылаем линк на почту для подтверждения
sm.send_mail(CONF['mail']['login'], email, 'Восстановление пароля', html_message)
return passwd_tpl(alerts=[('success', "Пароль успешно изменен, а так же продублирован на почту.")])
else:
return passwd_tpl(alerts=[('error', "Пароли не совпадают")])
@route('/static/<filename>', name='static')
def serve_static(filename):
return static_file(filename, root=path.join(BASE_DIR, 'static'))
@route('/restore/<id_user>/<id_session>')
def serve_static(db, id_user, id_session):
if redirect_to_change_passwd(db, id_user, id_session):
return passwd_tpl()
else:
return '<center><h2><a href="/email">Возспользуйтейсь формой восстановления пароля</a></h2></center>'
# отрисовка шаблонов
def index_tpl(**kwargs):
return template('index', **kwargs)
def email_tpl(**kwargs):
return template('email', **kwargs)
def passwd_tpl(**kwargs):
return template('passwd', **kwargs)
def connect_ldap(**kwargs):
server = Server(CONF['ldap']['host'], int(CONF['ldap']['port']), connect_timeout=5)
return Connection(server, raise_exceptions=True, **kwargs)
def change_password(*args):
try:
if CONF['ldap'].get('type') == 'ad':
change_password_ad(*args)
else:
change_password_ldap(*args)
except (LDAPBindError, LDAPInvalidCredentialsResult, LDAPUserNameIsMandatoryError):
error = 'Ошибка'
raise Error(error)
except LDAPConstraintViolationResult as e:
# Extract useful part of the error message (for Samba 4 / AD).
msg = e.message.split('check_password_restrictions: ')[-1].capitalize()
raise Error(msg)
def change_password_ldap(username, old_pass, new_pass):
"""
Меняем пользователю пароль
:param username:
:param old_pass:
:param new_pass:
:return:
"""
with connect_ldap() as c:
user_dn = find_user_dn(c, username)
# Note: raises LDAPUserNameIsMandatoryError when user_dn is None.
with connect_ldap(authentication=AUTH_SIMPLE, user=user_dn, password=old_pass) as c:
c.bind()
c.extend.standard.modify_password(user_dn, old_pass, new_pass)
def change_password_ad(username, old_pass, new_pass):
user = username + '@' + CONF['ldap']['ad_domain']
with connect_ldap(authentication=AUTH_SIMPLE, user=user, password=old_pass) as c:
c.bind()
user_dn = find_user_dn(c, username)
c.extend.microsoft.modify_password(user_dn, new_pass, old_pass)
# TODO переделать шифрование пароля
def change_password_ldap_privileges(user_ldap, new_passwd):
"""
Изменение пароля с правами администратора
не требует старый пароль.
:param user:
:param new_passwd:
:return:
"""
with connect_ldap(user=CONF['ldap']['admin_user'], password=CONF['ldap']['admin_pass']) as c:
c.bind()
# username = 'cn={0:s},{1:s}'.format(user_ldap, CONF['ldap']['base'])
username = find_user_dn(c, user_ldap)
c.modify(username, {'userPassword': [(MODIFY_REPLACE, [new_passwd])]})
# c.extend.standard.modify_password(username,None,new_passwd,HASHED_MD5)
def find_user_dn(conn, uid):
"""
Ищем пользователя по его uid
:param conn:
:param uid:
:return:
"""
search_filter = CONF['ldap']['search_filter'].replace('{uid}', uid)
conn.search(CONF['ldap']['base'], u"({0:s})".format(search_filter), SUBTREE, attributes=['dn', 'mail'])
return conn.response[0]['dn'] if conn.response else None
def find_email(email):
"""
Поиск почтового ящика в ldap
:param email:
:return:
"""
search_filter = 'mail=%s' % email
response = {}
with connect_ldap() as conn:
conn.search(CONF['ldap']['base'], "(%s)" % search_filter, SUBTREE, attributes=['dn', 'mail', 'cn'])
response['mail'] = conn.response[0]['attributes']['mail'] if conn.response else None
response['cn'] = conn.response[0]['attributes']['cn'] if conn.response else None
return response if conn.response else None
def unixtime():
"""
Возвращает время в формате unixtime
:return:
"""
now = datetime.datetime.now()
return int(time.mktime(now.timetuple()))
def makeSecret(password):
'''
Создание шифрованного пароля
:param password:
:return:
'''
salt = urandom(4)
h = sha1(password)
h.update(salt)
return "{SSHA}" + encode(h.digest() + salt)
def check_db_email(db, email):
"""
Проверяем высылалось ли письмо на почту
если да, то True
:return:
"""
row = db.execute('SELECT email FROM user_code WHERE email="{0:s}" LIMIT 1'.format(email)).fetchone()
return True if row else False
def redirect_to_change_passwd(db, id_user, id_session):
row = db.execute('SELECT email FROM user_code WHERE id_user="{0}" AND id_session="{1}" LIMIT 1'
.format(id_user, id_session)).fetchone()
return True if row else False
def read_config():
"""
Читаем конфиг
:return:
"""
config = ConfigParser()
config.read([path.join(BASE_DIR, 'settings.ini'), os.getenv('CONF_FILE', '')])
return config
class Error(Exception):
pass
BASE_DIR = path.dirname(__file__)
CONF = read_config()
expire_info = CONF['db']['date_expire']
bottle.TEMPLATE_PATH = [BASE_DIR]
# Set default attributes to pass into templates.
SimpleTemplate.defaults = dict(CONF['html'])
SimpleTemplate.defaults['url'] = bottle.url
# подключаем базу
plugin = sqlite.Plugin(dbfile=CONF['db']['dbname'])
install(plugin)
# Run bottle internal server when invoked directly (mainly for development).
if __name__ == '__main__':
bottle.run(**CONF['server'])
# Run bottle in application mode (in production under uWSGI server).
else:
application = bottle.default_app()