forked from torchbox/kube-ldap-authn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
131 lines (110 loc) · 4.24 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#! /usr/bin/env python3
#
# Copyright (c) 2017 Torchbox Ltd.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely. This software is provided 'as-is', without any express or implied
# warranty.
from flask import Flask, request, jsonify, Response
import ldap, ldap.filter, logging, sys
logger = logging.getLogger('kube-ldap-authn')
logger.setLevel(logging.INFO)
hdl = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s [%(levelname)-8s] %(name)-15s: %(message)s')
hdl.setFormatter(formatter)
logger.addHandler(hdl)
app = Flask('kube-ldap-authn')
try:
app.config.from_envvar('KUBE_LDAP_AUTHN_SETTINGS')
except RuntimeError as e:
logger.error(str(e))
sys.exit(1)
if 'LDAP_TLS_CA_FILE' in app.config:
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, app.config['LDAP_TLS_CA_FILE'])
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
@app.route('/healthz', methods=['GET'])
def healthz():
return Response("OK\n", mimetype='text/plain')
@app.route('/authn', methods=['POST'])
def authn():
auth_error = jsonify({
'apiVersion': 'authentication.k8s.io/v1beta1',
'kind': 'TokenReview',
'status': {
'authenticated': False,
},
})
req = request.json
if req is None:
logger.info("invalid or missing request json")
return auth_error
if 'apiVersion' not in req or req['apiVersion'] != 'authentication.k8s.io/v1beta1':
logger.info("invalid or missing apiVersion")
return auth_error
if 'kind' not in req or req['kind'] != 'TokenReview':
logger.info("invalid or missing kind")
return auth_error
if 'spec' not in req or 'token' not in req['spec']:
logger.info("invalid or missing spec")
return auth_error
token = req['spec']['token']
try:
ld = ldap.initialize(app.config['LDAP_URL'])
except ldap.LDAPError as e:
logger.info("LDAP connection error: " + str(e))
return auth_error
ld.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
if app.config.get('LDAP_START_TLS', True) == True:
try:
ld.start_tls_s()
except ldap.LDAPError as e:
logger.info("LDAP TLS error: " + str(e))
return auth_error
try:
ld.simple_bind_s(app.config['LDAP_BIND_DN'],
app.config['LDAP_BIND_PASSWORD'])
except ldap.LDAPError as e:
logger.info("LDAP bind error: " + str(e))
return auth_error
user_search = app.config['LDAP_USER_SEARCH_FILTER'].format(
token=ldap.filter.escape_filter_chars(token))
try:
r = ld.search_s(app.config['LDAP_USER_SEARCH_BASE'],
ldap.SCOPE_SUBTREE,
user_search,
[ app.config['LDAP_USER_NAME_ATTRIBUTE'],
app.config['LDAP_USER_UID_ATTRIBUTE'] ])
except ldap.LDAPError as e:
logger.info("LDAP search error: " + str(e))
return auth_error
if len(r) != 1:
logger.info("expected 1 user, found " + str(len(r)))
return auth_error
username = r[0][1][app.config['LDAP_USER_NAME_ATTRIBUTE']][0].decode('ascii')
uid = r[0][1][app.config['LDAP_USER_UID_ATTRIBUTE']][0].decode('ascii')
group_search = app.config['LDAP_GROUP_SEARCH_FILTER'].format(
username=ldap.filter.escape_filter_chars(username),
dn=ldap.filter.escape_filter_chars(r[0][0]))
try:
g = ld.search_s(app.config['LDAP_GROUP_SEARCH_BASE'],
ldap.SCOPE_SUBTREE, group_search,
[ app.config['LDAP_GROUP_NAME_ATTRIBUTE'] ])
except ldap.LDAPError as e:
logger.info("LDAP search error: " + str(e))
return auth_error
groups = [
i[1][app.config['LDAP_GROUP_NAME_ATTRIBUTE']][0].decode('ascii')
for i in g ]
return jsonify({
'apiVersion': 'authentication.k8s.io/v1beta1',
'kind': 'TokenReview',
'status': {
'authenticated': True,
'user': {
'username': username,
'uid': uid,
'groups': groups,
},
},
})