From 0af62cac740da40db4f95c601f5a6107205efb1f Mon Sep 17 00:00:00 2001 From: Enrique Date: Tue, 5 Mar 2024 20:48:23 -0600 Subject: [PATCH] schedule-orders: Enable access_rules check (#43) --- src/orders/delivery/app.py | 8 +- .../delivery_modules/utils/doorman.py | 81 ++++++++++++++++--- 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/src/orders/delivery/app.py b/src/orders/delivery/app.py index 45c999f..cf70e28 100644 --- a/src/orders/delivery/app.py +++ b/src/orders/delivery/app.py @@ -36,10 +36,10 @@ def set_delivery_schedule_order( logger.info("Initializing set_delivery_schedule_order function") try: doorman = DoormanUtil(event, logger) - username = doorman.get_username_from_token() + username = doorman.get_username_from_context() is_auth = doorman.auth_user() if is_auth is False: - raise AuthError("User is not allow to retrieve orders") + raise AuthError(f"User {username} is not authorized to schedule orders") body = doorman.get_body_from_request() logger.debug(f"Incoming data is {body=} and {username=}") @@ -66,8 +66,8 @@ def set_delivery_schedule_order( logger.warning("No orders to process today, check DB if this is ok") return doorman.build_response(payload={"message": "scheduling completed"}, status_code=200) - except AuthError: - error_details = f"user {username} was not auth to fetch orders" + except AuthError as auth_error: + error_details = f"Not authorized. {auth_error}" logger.error(error_details) output_data = {"message": error_details} return doorman.build_response(payload=output_data, status_code=403) diff --git a/src/orders/delivery/delivery_modules/utils/doorman.py b/src/orders/delivery/delivery_modules/utils/doorman.py index ad84606..9998160 100644 --- a/src/orders/delivery/delivery_modules/utils/doorman.py +++ b/src/orders/delivery/delivery_modules/utils/doorman.py @@ -1,12 +1,20 @@ # Python's libraries import json +import os # Own's modules from delivery_modules.errors.util_error import UtilError +from delivery_modules.errors.auth_error import AuthError +from settings import environment # Third-party libraries from aws_lambda_powertools import Logger +ACCESS_RULES = { + "Admin": ["ScheduleOrdersFunction"], + "MesaDeControl": ["ScheduleOrdersFunction"], +} + class DoormanUtil(object): @@ -19,20 +27,20 @@ def get_body_from_request(self): raise UtilError( _message="There is no body in request data", _error=None, - _logger=self.logger + _logger=self.logger, ) - if self.request['body'] is None: + if self.request["body"] is None: raise UtilError( _message="The body node is null", _error=None, _logger=self.logger, ) # Check if body is already a dict and return it directly - if isinstance(self.request['body'], dict): - return self.request['body'] + if isinstance(self.request["body"], dict): + return self.request["body"] try: - body = json.loads(self.request['body']) + body = json.loads(self.request["body"]) except Exception as e: raise UtilError( _message=f"The body was not a JSON object. Details: {e}", @@ -66,14 +74,67 @@ def build_response(self, payload: dict, status_code: int) -> dict: "Access-Control-Allow-Headers": "Content-Type,Authorization,x-apigateway-header,X-Amz-Date,X-Api-Key,X-Amz-Security-Token", "Access-Control-Allow-Methods": "GET, POST, PATCH, OPTIONS, DELETE", }, - "body": json.dumps(payload) + "body": json.dumps(payload), } return response - def get_username_from_token(self): - # ToDO - return "Admin" + def _is_any_group_authorized(self, group_names: list) -> bool: + """ + Checks if any of the user's groups are authorized to access the current Lambda function. + + Parameters: + - group_names (list): A list of group names to which the user belongs. + - function_name (str): The name of the currently executing Lambda function. + + Returns: + - bool: True if the group is authorized, False otherwise. + """ + current_function_name = os.environ["AWS_LAMBDA_FUNCTION_NAME"] + for group_name in group_names: + if group_name in ACCESS_RULES: + for allowed_function in ACCESS_RULES[group_name]: + if current_function_name.startswith(allowed_function): + return True + + self.logger.error( + f"Group/s: {group_names} are not authorized to access {current_function_name}." + ) + return False + + def get_username_from_context(self): + if environment == "local": + return "Admin" + + try: + email = self.request["requestContext"]["authorizer"]["claims"]["email"] + except KeyError: + raise AuthError(f"Missing context from Api gateway authorizer.") + + return email def auth_user(self): - return True + """ + Authorizes a user based on their Cognito group memberships. This function is intended for a Lambda + function triggered by AWS API Gateway with a Cognito Authorizer. + + The Cognito Authorizer adds user group information to the 'requestContext' in the Lambda event object. + This function extracts these group memberships and checks them against predefined access rules to + determine if the user is authorized to access the current Lambda function. + + Returns: + - bool: True if the user is authorized; False otherwise. + """ + if environment == "local": + return True + + user_groups = [] + try: + groups_string = self.request["requestContext"]["authorizer"]["claims"][ + "cognito:groups" + ] + user_groups = groups_string.split(",") + except KeyError: + return False + + return self._is_any_group_authorized(user_groups)