-
Notifications
You must be signed in to change notification settings - Fork 27
/
rbac.py
113 lines (98 loc) · 3.89 KB
/
rbac.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/python
# based on https://github.com/23andMe/api-example-flask
import requests
import flask
import os
import sys
import json
from flask import request, config
allowed_population_threshold = 0.51 # minimum allowed match %
ancestry_speculation_threshold = 0.75 # standard ancestry speculation
ancestry_allowed_populations = [ 'French & German', 'British & Irish', 'Finnish',
'Scandinavian', 'Northern European', 'Eastern European', 'Balkan', 'Iberian',
'Italian', 'Sardinian', 'Southern European' ]
# note: does not include "Ashkenazi" or "European"
API_SERVER = "api.23andme.com"
BASE_API_URL = "https://%s" % API_SERVER
DEFAULT_SCOPE = "basic ancestry"
app = flask.Flask(__name__, instance_relative_config=True)
app.config.from_pyfile('rbac.cfg', silent=True)
def load_config(key):
if not key in app.config:
app.config[key] = os.getenv(key)
if not app.config[key]:
print "Config %s missing" % key
sys.exit(1)
return app.config[key]
# required config
CLIENT_ID=load_config('CLIENT_ID')
CLIENT_SECRET=load_config('CLIENT_SECRET')
REDIRECT_URI=load_config('REDIRECT_URI')
DEBUG=os.getenv('DEBUG')
@app.route('/')
def index():
return flask.render_template('index.html', client_id=CLIENT_ID)
@app.route('/receive_code/')
def receive_code():
parameters = {
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'grant_type': 'authorization_code',
'code': request.args.get('code'),
'redirect_uri': REDIRECT_URI,
'scope': DEFAULT_SCOPE
}
response = requests.post(
"%s%s" % (BASE_API_URL, "/token"),
data = parameters,
verify = False
)
if response.status_code == 200:
access_token = response.json()['access_token']
# fetch profile ids
user_res = api_req(access_token, "/user/", {})
profiles = user_res.json()['profiles']
if len(profiles):
profiles = [p for p in profiles if p['genotyped']]
if len(profiles):
profile_id = profiles[0]['id'] # assume first genotyped profile
ancestry_res = api_req(access_token, "/ancestry/%s/" % profile_id, {'threshold': ancestry_speculation_threshold})
ancestry = ancestry_res.json()['ancestry']
# check if ancestry is valid
match_total = ancestor_match_pct(ancestry)
valid = match_total >= allowed_population_threshold
return flask.render_template('auth_status.html', valid=valid,match_total=match_total*100)
else:
return "Error: no genotyped profiles found on your account"
else:
return "Error: could not locate any valid profiles with ancestry data"
else:
return "Error fetching OAuth2 access bearer token"
def api_req(token, path, params):
headers = {'Authorization': 'Bearer %s' % token}
res = requests.get("%s/1%s" % (BASE_API_URL, path), # /profileid
params = params,
headers= headers,
verify = False)
if res.status_code == 200:
return res
else:
reponse_text = res.text
print "API error to %s: %s" % (path, reponse_text)
res.raise_for_status()
# returns percentage of allowed ancestor geographies
def ancestor_match_pct(ancestry, total=0.0):
# see if we match a desired pop, if so add it to total
if 'label' in ancestry:
if ancestry['label'] in ancestry_allowed_populations:
proportion = ancestry['proportion']
return proportion
# if no desired pop, recurse into sub-populations
if 'sub_populations' in ancestry:
subtotal = 0.0
for subpop in ancestry['sub_populations']:
subtotal += ancestor_match_pct(subpop, total)
return subtotal
return total
if __name__ == '__main__':
app.run(debug=DEBUG)