-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathcommon.py
347 lines (294 loc) · 12.1 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
"""Common utils to support apply and reverting recommendations on bulk."""
import collections
from concurrent import futures
import json
import logging
import time
from google_auth_httplib2 import AuthorizedHttp
import httplib2
from google.oauth2 import service_account
class Recommendation(object):
"""Encapsulate Recommendation information required to compute hero metrics."""
def __init__(self, data):
self.name = data["name"]
self.etag = data["etag"]
self.state = self.get_state(data)
self.principal = set()
self.principal_type = ""
self.remove_role = set()
self.add_roles = set()
self.resource = set()
self.extract_recommendation(data)
self.check_integrity()
self.update_data()
def __repr__(self):
return repr(
(self.state, self.principal, self.principal_type, self.remove_role,
self.add_roles, self.resource, self.name, self.etag))
def get_state(self, data):
"""Get state of the recommendation."""
if data["stateInfo"]["state"] == "ACTIVE":
return "ACTIVE"
elif data["stateInfo"]["state"] == "SUCCEEDED":
if ("reverted" in data["stateInfo"].get("stateMetadata", {}) and
data["stateInfo"]["stateMetadata"].get("reverted",
"false") == "true"):
return "SUCCEEDED_REVERTED"
else:
return "SUCCEEDED"
return data["stateInfo"]["state"]
def extract_recommendation(self, data):
"""Populate recommendation data from a recommendation payload."""
for op_grps in data.get("content", {}).get("operationGroups", []):
for op in op_grps["operations"]:
if op["action"] == "remove":
self.principal.add(
op["pathFilters"]["/iamPolicy/bindings/*/members/*"])
self.resource.add(op["resource"])
self.remove_role.add(
op["pathFilters"]["/iamPolicy/bindings/*/role"])
elif op["action"] == "add":
self.resource.add(op["resource"])
self.add_roles.add(
op["pathFilters"]["/iamPolicy/bindings/*/role"])
self.principal.add(op["value"])
else:
raise ValueError("Wrong action : " + op["action"])
def check_integrity(self):
"""Check invariance of a recommendation payload."""
assert len(
self.principal
) == 1, "there should be exactly one principal. principal : " + str(
self.principal)
assert len(
self.remove_role
) == 1, "there should be exactly one removed role. remove_role: " + str(
self.remove_role)
assert len(
self.resource
) == 1, "there should be exactly one resource. resource: " + str(
self.resource)
def update_data(self):
"""Update recommendation data after checking the integrity."""
self.principal = self.principal.pop()
self.principal_type = self.principal.split(":")[0]
self.resource = self.resource.pop()
def rate_limit_execution(f, rate_limit, *args):
"""Execute multiple threads of function f for args while respecting the rate limit.
Args:
f: function to execute
rate_limit: rate with which the functions should be executed.
*args: Args provided for executing the function f.
Returns:
Output of executing f on args
"""
i = 0
n = len(args[0])
all_output = []
max_request, duration = rate_limit
while i < n:
tic = int(time.time())
with futures.ThreadPoolExecutor(max_workers=max_request) as executor:
output_ = executor.map(f, *[arg[i:i + max_request] for arg in args])
i += max_request
all_output.extend(output_)
toc = int(time.time())
diff = toc - tic
if diff < duration and i < n:
time.sleep(duration - diff)
logging.info("Finish investigating %d items out of total %d items.",
min(i, n), n)
return all_output
def get_recommendations(project_id, recommender, state, credentials):
"""Returns all recommendtions.
Args:
project_id: (str) Project for which to get the recommendtion.
recommender: Recommender stub to call recommender API
state: state of the recommendation
credentials: client credentials
"""
http = httplib2.Http()
authorize_http = AuthorizedHttp(credentials, http=http)
parent = "projects/{}/locations/global/recommenders/google.iam.policy.Recommender".format(
project_id)
fields = [
"recommendations/stateInfo/state", "recommendations/content",
"recommendations/etag", "recommendations/name",
"recommendations/stateInfo/stateMetadata"
]
try:
request = recommender.projects().locations().recommenders(
).recommendations().list(parent=parent, fields=",".join(fields))
response = request.execute(http=authorize_http)
recommendation_data = [
Recommendation(r) for r in response.get("recommendations", [])
]
return [r for r in recommendation_data if r.state == state]
except:
return []
def update_recommendation_status(recommendation, recommender_client, metadata,
credentials):
"""Update the recommendation status for the recommendations.
Args:
recommendation: Recommendation on IAM policy.
recommender_client: Iam recommender client.
metadata: (Dict) metadata to update the recommendation state.
credentials: service account credentials.
Returns:
Recommendations with updated status.
"""
http = httplib2.Http()
authorize_http = AuthorizedHttp(credentials, http=http)
return (recommender_client.projects().locations().recommenders().
recommendations().markSucceeded(name=recommendation["id"],
body={
"etag": recommendation["etag"],
"stateMetadata": metadata
}).execute(http=authorize_http))
def get_current_policy(resourcemanager_v1, project_id, credentials):
"""Returns the current policy associated with project_id.
Args:
resourcemanager_v1: ResourcemanagerV1 stub to call IAM API
project_id: (str) Project for which to get the recommendtion.
credentials: client credentials
"""
http = httplib2.Http()
authorize_http = AuthorizedHttp(credentials, http=http)
request = resourcemanager_v1.projects().getIamPolicy(resource=project_id)
cur_policy = request.execute(http=authorize_http)
del cur_policy["etag"]
return cur_policy
def update_policy(resourcemanager_v1, project_id, credentials, new_policy):
"""Returns the new policy associated with project_id.
Args:
resourcemanager_v1: ResourcemanagerV1 stub to call IAM API
project_id: (str) Project for which to get the recommendtion.
credentials: client credentials
new_policy: New policy to set on the project
"""
http = httplib2.Http()
authorize_http = AuthorizedHttp(credentials, http=http)
set_policy_request = resourcemanager_v1.projects().setIamPolicy(
resource=project_id, body={"policy": new_policy})
return set_policy_request.execute(http=authorize_http)
def get_credentials(service_account_file_path, scopes=None):
"""Returns credentials from a service_account_file_path.
Args:
service_account_file_path: (str) Path to service account key.
scopes: List scopes for service account
"""
if scopes is None:
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
return service_account.Credentials.from_service_account_file(
service_account_file_path, scopes=scopes)
def diff_between_policies(old_policy, new_policy):
"""Returns the difference between two policies.
Args:
old_policy: Old policy
new_policy: New policy
"""
old_bindings = collections.defaultdict(set)
for b in old_policy["bindings"]:
if "condition" in b:
continue
for principal in b["members"]:
old_bindings[principal].add(b["role"])
new_bindings = collections.defaultdict(set)
for b in new_policy["bindings"]:
if "condition" in b:
continue
for principal in b["members"]:
new_bindings[principal].add(b["role"])
all_principals = {*old_bindings.keys(), *new_bindings.keys()}
entries = []
for principal in sorted(all_principals):
new_roles = new_bindings[principal]
old_roles = old_bindings[principal]
if new_roles == old_roles:
continue
removed_roles = old_roles - new_roles
added_roles = new_roles - old_roles
entry = {
"principal": principal,
"removed_roles": list(removed_roles),
"added_roles": list(added_roles)
}
entries.append(entry)
return json.dumps({"diff_policy": entries}, sort_keys=True, indent=4)
def remove_role_from_policy(policy, recommendation):
"""Remove roles for a policy based on recommendations.
Args:
policy: IAM policy.
recommendation: Recommendation on IAM policy.
Returns:
None. Change the policy in place.
"""
is_acted_recommendation = False
acted_and_succeeded = False
if not recommendation["role_recommended_to_be_removed"]:
return True # No role to be removed.
for binding in policy["bindings"]:
if binding["role"] not in recommendation[
"role_recommended_to_be_removed"]:
continue
if "condition" in binding:
continue
try:
is_acted_recommendation = True
binding["members"].remove(recommendation["principal"])
recommendation["role_recommended_to_be_removed"].remove(
binding["role"])
acted_and_succeeded = True
except:
logging.error("`%s` does not have `role:%s`.",
recommendation["principal"],
recommendation["role_recommended_to_be_removed"])
if not is_acted_recommendation:
logging.error("`%s` does not have `role:%s`.",
recommendation["principal"],
recommendation["role_recommended_to_be_removed"])
return is_acted_recommendation and acted_and_succeeded
def add_roles_in_policy(policy, recommendation):
"""Add roles in the policy based on recommendations.
Args:
policy: IAM policy.
recommendation: Recommendation on IAM policy.
Returns:
None. Change the policy in place.
"""
is_acted_recommendation = False
roles_to_be_added = set(
recommendation["roles_recommended_to_be_replaced_with"])
for binding in policy["bindings"]:
if binding["role"] not in roles_to_be_added:
continue
if "condition" in binding:
continue
binding["members"].append(recommendation["principal"])
roles_to_be_added.remove(binding["role"])
for role in roles_to_be_added:
policy["bindings"].append({
"role": role,
"members": [recommendation["principal"]]
})
is_acted_recommendation = True
return is_acted_recommendation
def writefile(data, output_file):
with open(output_file, "w") as f:
f.write(data)
def describe_recommendations(recommendations):
"""Returns a json string representation of recommendation with selected fileds.
Args:
recommendations: List(common.Recommendation)
"""
recommendations_sorted = sorted(recommendations, key=lambda x: x.principal)
data = []
for r in recommendations_sorted:
data.append({
"id": r.name,
"etag": r.etag,
"principal": r.principal,
"role_recommended_to_be_removed": list(r.remove_role),
"roles_recommended_to_be_replaced_with": list(r.add_roles)
})
return json.dumps({"recommendations": data}, indent=4, sort_keys=True)