Skip to content

Commit

Permalink
Merge pull request #65 from ambitus/dev
Browse files Browse the repository at this point in the history
Release 1.0b4
  • Loading branch information
ElijahSwiftIBM authored Jan 30, 2024
2 parents 8a2c28d + d857745 commit 8d96ac1
Show file tree
Hide file tree
Showing 38 changed files with 869 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

[tool.poetry]
name="pyracf"
version="1.0b3"
version="1.0b4"
description="Python interface to RACF using IRRSMO00 RACF Callable Service."
license = "Apache-2.0"
authors = [
Expand Down
60 changes: 40 additions & 20 deletions pyracf/common/security_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,27 +465,9 @@ def __format_data_set_generic_profile_data(
and messages[i + 1] is not None
and ("-" in messages[i + 1])
):
field = " ".join(
[
txt.lower().strip()
for txt in list(filter(None, messages[i].split(" ")))
]
self.__format_tabular_data(
messages, profile, current_segment, i, list_fields
)
field = self._profile_field_to_camel_case(current_segment, field)
value = messages[i + 2]
if "(" in value:
value_tokens = value.split("(")
subfield = self._profile_field_to_camel_case(
current_segment, value_tokens[0].lower()
)
profile[current_segment][field] = {
subfield: self._clean_and_separate(value_tokens[-1].rstrip(")"))
}
elif field in list_fields:
profile[current_segment][field] = []
profile[current_segment][field].append(self._clean_and_separate(value))
else:
profile[current_segment][field] = self._clean_and_separate(value)
i += 1
elif "NO INSTALLATION DATA" in messages[i]:
profile[current_segment]["installationData"] = None
Expand Down Expand Up @@ -668,6 +650,44 @@ def __format_semi_tabular_data(
messages[i + 2][indexes[j] : ind_e1]
)

def __format_tabular_data(
self,
messages: List[str],
profile: dict,
current_segment: str,
i: int,
list_fields: List[str] = ["volumes"],
) -> None:
field = " ".join(
[txt.lower().strip() for txt in list(filter(None, messages[i].split(" ")))]
)
field = self._profile_field_to_camel_case(current_segment, field)
values = (
[messages[i + 2]]
if "," not in messages[i + 2]
else messages[i + 2].split(",")
)
for value in values:
if "(" in value:
value_tokens = value.split("(")
subfield = self._profile_field_to_camel_case(
current_segment, value_tokens[0].lower()
)
if field not in profile[current_segment]:
profile[current_segment][field] = {
subfield: self._clean_and_separate(value_tokens[-1].rstrip(")"))
}
else:
profile[current_segment][field][
subfield
] = self._clean_and_separate(value_tokens[-1].rstrip(")"))
elif field in list_fields:
profile[current_segment][field] = []
profile[current_segment][field].append(self._clean_and_separate(value))
else:
profile[current_segment][field] = self._clean_and_separate(value)
return

def __add_key_value_pairs_to_segment(
self,
segment_name: str,
Expand Down
4 changes: 2 additions & 2 deletions pyracf/data_set/data_set_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ def alter(
profile = self.extract(
data_set, volume=volume, generic=generic, profile_only=True
)
except SecurityRequestError:
raise AlterOperationError(data_set, self._profile_type)
except SecurityRequestError as exception:
raise AlterOperationError(data_set, self._profile_type) from exception
if not self._get_field(profile, "base", "name") == data_set.lower():
raise AlterOperationError(data_set, self._profile_type)
self._build_segment_trait_dictionary(traits)
Expand Down
4 changes: 2 additions & 2 deletions pyracf/group/group_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ def alter(self, group: str, traits: dict) -> Union[dict, bytes]:
return self._make_request(group_request, irrsmo00_precheck=True)
try:
self.extract(group)
except SecurityRequestError:
raise AlterOperationError(group, self._profile_type)
except SecurityRequestError as exception:
raise AlterOperationError(group, self._profile_type) from exception
self._build_segment_trait_dictionary(traits)
group_request = GroupRequest(group, "set")
self._build_xml_segments(group_request, alter=True)
Expand Down
192 changes: 189 additions & 3 deletions pyracf/resource/resource_admin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""General Resource Profile Administration."""

from collections import Counter
from typing import List, Union

from pyracf.common.add_operation_error import AddOperationError
Expand All @@ -25,7 +26,7 @@ def __init__(
self._valid_segment_traits = {
"base": {
"base:application_data": "racf:appldata",
"base:audit_alter:": "racf:audaltr",
"base:audit_alter": "racf:audaltr",
"base:audit_control": "racf:audcntl",
"base:audit_none": "racf:audnone",
"base:audit_read": "racf:audread",
Expand Down Expand Up @@ -285,6 +286,133 @@ def get_user_access(
self.set_running_userid(original_userid)
return self._get_field(profile, "base", "yourAccess")

# ============================================================================
# Auditing Rules
# ============================================================================
def get_audit_rules(
self, resource: str, class_name: str
) -> Union[dict, bytes, None]:
"""Get the auditing rules associated with this general resource profile."""
profile = self.extract(resource, class_name, profile_only=True)
return self._get_field(profile, "base", "auditing")

def overwrite_audit_rules_by_attempt(
self,
resource: str,
class_name: str,
success: Union[str, None] = None,
failure: Union[str, None] = None,
all: Union[str, None] = None,
) -> Union[dict, bytes]:
"""
Overwrites the auditing rules for this general resource profile with new
rules to audit based on specified access attempts.
"""
self.__validate_access_levels(success, failure, all)
traits = {}
if success is not None:
traits[f"base:audit_{success}"] = "success"
if failure is not None:
traits[f"base:audit_{failure}"] = "failure"
if all is not None:
traits[f"base:audit_{all}"] = "all"
result = self.alter(resource, class_name, traits=traits)
return self._to_steps(result)

def alter_audit_rules_by_attempt(
self,
resource: str,
class_name: str,
success: Union[str, None] = None,
failure: Union[str, None] = None,
all: Union[str, None] = None,
) -> Union[dict, bytes]:
"""
Alters the auditing rules for this general resource profile with new rules
to audit by access level, preserving existing non-conflicting rules.
"""
self.__validate_access_levels(success, failure, all)
audit_rules = self.get_audit_rules(resource, class_name)
traits = {}
if "success" in audit_rules:
traits[f"base:audit_{audit_rules['success']}"] = "success"
if "failures" in audit_rules:
traits[f"base:audit_{audit_rules['failures']}"] = "failure"
if "all" in audit_rules:
traits[f"base:audit_{audit_rules['all']}"] = "all"
if success is not None:
traits[f"base:audit_{success}"] = "success"
if failure is not None:
traits[f"base:audit_{failure}"] = "failure"
if all is not None:
traits[f"base:audit_{all}"] = "all"
result = self.alter(resource, class_name, traits=traits)
return self._to_steps(result)

def overwrite_audit_rules_by_access_level(
self,
resource: str,
class_name: str,
alter: Union[str, None] = None,
control: Union[str, None] = None,
read: Union[str, None] = None,
update: Union[str, None] = None,
) -> Union[dict, bytes]:
"""
Overwrites the auditing rules for this general resource profile with new
rules to audit based on specified access levels.
"""
traits = {}
if alter is not None:
traits["base:audit_alter"] = alter
if control is not None:
traits["base:audit_control"] = control
if read is not None:
traits["base:audit_read"] = read
if update is not None:
traits["base:audit_update"] = update
result = self.alter(resource, class_name, traits=traits)
return self._to_steps(result)

def alter_audit_rules_by_access_level(
self,
resource: str,
class_name: str,
alter: Union[str, None] = None,
control: Union[str, None] = None,
read: Union[str, None] = None,
update: Union[str, None] = None,
) -> Union[dict, bytes]:
"""
Alters the auditing rules for this general resource profile with a new
rule to audit alter access, preserving existing non-conflicting rules.
"""
audit_rules = self.get_audit_rules(resource, class_name)
traits = {}
if "success" in audit_rules:
traits[f"base:audit_{audit_rules['success']}"] = "success"
if "failures" in audit_rules:
traits[f"base:audit_{audit_rules['failures']}"] = "failure"
if "all" in audit_rules:
traits[f"base:audit_{audit_rules['all']}"] = "all"
if alter is not None:
traits["base:audit_alter"] = alter
if control is not None:
traits["base:audit_control"] = control
if read is not None:
traits["base:audit_read"] = read
if update is not None:
traits["base:audit_update"] = update
result = self.alter(resource, class_name, traits=traits)
return self._to_steps(result)

def remove_all_audit_rules(
self, resource: str, class_name: str
) -> Union[dict, bytes]:
"""Clears the auditing rules completely."""
result = self.alter(resource, class_name, {"base:audit_none": True})
return self._to_steps(result)

# ============================================================================
# Class Administration
# ============================================================================
Expand Down Expand Up @@ -527,8 +655,8 @@ def alter(self, resource: str, class_name: str, traits: dict) -> Union[dict, byt
return self._make_request(profile_request, irrsmo00_precheck=True)
try:
profile = self.extract(resource, class_name, profile_only=True)
except SecurityRequestError:
raise AlterOperationError(resource, class_name)
except SecurityRequestError as exception:
raise AlterOperationError(resource, class_name) from exception
if not self._get_field(profile, "base", "name") == resource.lower():
raise AlterOperationError(resource, class_name)
self._build_segment_trait_dictionary(traits)
Expand Down Expand Up @@ -589,3 +717,61 @@ def _format_profile(self, result: dict) -> None:

del result["securityResult"]["resource"]["commands"][0]["messages"]
result["securityResult"]["resource"]["commands"][0]["profiles"] = profiles

def __validate_access_levels(
self,
success: Union[str, None] = None,
failure: Union[str, None] = None,
all: Union[str, None] = None,
):
valid_access_levels = ("alter", "control", "read", "update")
value_error_text = (
"Valid access levels include 'alter', 'control', 'read', and 'update'."
)
bad_access_levels = []
for attempt_argument in (success, failure, all):
if (
attempt_argument is not None
and str(attempt_argument).lower() not in valid_access_levels
):
bad_access_levels.append(attempt_argument)
match len(bad_access_levels):
case 0:
self.__check_for_duplicates([success, failure, all], "Access Level")
return
case 1:
value_error_text = (
f"'{bad_access_levels[0]}' is not a valid access level. "
+ f"{value_error_text}"
)
case 2:
value_error_text = (
f"'{bad_access_levels[0]}' and '{bad_access_levels[1]}' are not valid "
+ f"access levels. {value_error_text}"
)
case _:
bad_access_levels = [
f"'{bad_access_level}'" for bad_access_level in bad_access_levels
]
bad_access_levels[-1] = f"and {bad_access_levels[-1]} "
value_error_text = (
f"{', '.join(bad_access_levels)}are not valid access levels. "
+ f"{value_error_text}"
)
raise ValueError(value_error_text)

def __check_for_duplicates(self, argument_list: list, argument: str) -> None:
duplicates = [
key
for (key, value) in Counter(argument_list).items()
if (value > 1 and key is not None)
]
if duplicates == []:
return
value_error_text = []
for duplicate in duplicates:
value_error_text.append(
f"'{duplicate}' is provided as an '{argument}' multiple times, which is not "
+ "allowed."
)
raise ValueError("\n".join(value_error_text))
19 changes: 12 additions & 7 deletions pyracf/user/user_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,13 @@ def take_away_auditor_authority(self, userid: str) -> Union[dict, bytes]:
# Password
# ============================================================================
def set_password(
self,
userid: str,
password: Union[str, bool],
self, userid: str, password: Union[str, bool], expired: Union[bool, None] = None
) -> Union[dict, bytes]:
"""Set a user's password."""
result = self.alter(userid, traits={"base:password": password})
traits = {"base:password": password}
if expired is not None:
traits["base:password_expired"] = expired
result = self.alter(userid, traits=traits)
return self._to_steps(result)

# ============================================================================
Expand All @@ -292,9 +293,13 @@ def set_passphrase(
self,
userid: str,
passphrase: Union[str, bool],
expired: Union[bool, None] = None,
) -> Union[dict, bytes]:
"""Set a user's passphrase."""
result = self.alter(userid, traits={"base:passphrase": passphrase})
traits = {"base:passphrase": passphrase}
if expired is not None:
traits["base:password_expired"] = expired
result = self.alter(userid, traits=traits)
return self._to_steps(result)

# ============================================================================
Expand Down Expand Up @@ -795,8 +800,8 @@ def alter(self, userid: str, traits: dict) -> Union[dict, bytes]:
return self._make_request(user_request, irrsmo00_precheck=True)
try:
self.extract(userid)
except SecurityRequestError:
raise AlterOperationError(userid, self._profile_type)
except SecurityRequestError as exception:
raise AlterOperationError(userid, self._profile_type) from exception
self._build_segment_trait_dictionary(traits)
user_request = UserRequest(userid, "set")
self._build_xml_segments(user_request, alter=True)
Expand Down
5 changes: 3 additions & 2 deletions tests/resource/resource_log_samples/alter_resource_error.log
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<message> </message>
<message>AUDITING</message>
<message>--------</message>
<message>FAILURES(READ)</message>
<message>SUCCESS(UPDATE),FAILURES(READ)</message>
<message> </message>
<message>NOTIFY</message>
<message>------</message>
Expand Down Expand Up @@ -97,7 +97,7 @@
" ",
"AUDITING",
"--------",
"FAILURES(READ)",
"SUCCESS(UPDATE),FAILURES(READ)",
" ",
"NOTIFY",
"------",
Expand Down Expand Up @@ -144,6 +144,7 @@
"installationData": null,
"applicationData": null,
"auditing": {
"success": "update",
"failures": "read"
},
"notify": null,
Expand Down
Loading

0 comments on commit 8d96ac1

Please sign in to comment.