From ebd3dad33390f774a417d4162ba981afb38e9de9 Mon Sep 17 00:00:00 2001 From: Luke Hendrick Date: Wed, 14 Feb 2024 10:25:59 -0600 Subject: [PATCH 1/3] Update subprocess arguments to use expanded user path --- awsume/awsumepy/default_plugins.py | 18 ++++++- .../awsume/awsumepy/test_default_plugins.py | 49 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/awsume/awsumepy/default_plugins.py b/awsume/awsumepy/default_plugins.py index 5f645f9..e2a9495 100644 --- a/awsume/awsumepy/default_plugins.py +++ b/awsume/awsumepy/default_plugins.py @@ -2,6 +2,7 @@ import json import os import subprocess +from pathlib import Path import colorama import dateutil @@ -519,7 +520,8 @@ def get_credentials_from_credential_process(config: dict, arguments: argparse.Na return_session = {} credential_process_env = os.environ.copy() credential_process_env['AWS_PROFILE'] = target_profile_name - result = subprocess.run(target_profile.get('credential_process').split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=credential_process_env) + credential_process_target_and_arguments = get_credentials_process_target_and_arguments(target_profile.get('credential_process')) + result = subprocess.run(credential_process_target_and_arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=credential_process_env) logger.info('credential_process returncode: {}'.format(result.returncode)) logger.debug('credential_process stdout: {}'.format(result.stdout.decode('utf-8'))) logger.debug('credential_process stderr: {}'.format(result.stderr.decode('utf-8'))) @@ -547,6 +549,20 @@ def get_credentials_from_credential_process(config: dict, arguments: argparse.Na logger.debug("credential_process session: {}".format(return_session)) return return_session +def get_credentials_process_target_and_arguments(target_profile: dict): + credential_process = target_profile.get("credential_process", None) + if credential_process is None: + raise exceptions.ValidationException(f"credential_process not found in profile: {json.dumps(target_profile)}") + parts = credential_process.split() + target = Path(parts[0]).expanduser() if len(parts) > 0 else None + if target is None: + logger.debug(f"Unable to find credentials target from provided process string: {credential_process}") + raise exceptions.ValidationException('credential_process target not found: {}'.format(target)) + if not target.is_file(): + logger.debug(f"{credential_process} is not a file. Derived target is {str(target)}") + raise exceptions.ValidationException('credential_process target is not a file: {}'.format(target)) + return [str(target), *parts[1:]] + def get_session_token_credentials(config: dict, arguments: argparse.Namespace, profiles: dict, target_profile: dict, target_profile_name: str): logger.info('Getting session token credentials') diff --git a/test/unit/awsume/awsumepy/test_default_plugins.py b/test/unit/awsume/awsumepy/test_default_plugins.py index ef8800d..24c37f4 100644 --- a/test/unit/awsume/awsumepy/test_default_plugins.py +++ b/test/unit/awsume/awsumepy/test_default_plugins.py @@ -1,9 +1,12 @@ import argparse +from pathlib import Path + import pytest from io import StringIO from unittest.mock import MagicMock, patch from awsume.awsumepy import default_plugins from awsume.awsumepy.lib import exceptions, autoawsume +from awsume.awsumepy.lib.constants import AWSUME_DIR def generate_namespace_with_defaults( @@ -1006,3 +1009,49 @@ def test_post_add_arguments_session_tags(aws_lib: MagicMock): ], region=arguments.region, ) + +def test_get_credentials_process_target_and_arguments_with_file(): + process_file = Path(f"{AWSUME_DIR}/test.sh").expanduser() + process_file.open('w').close() # create test script file in AWSUME_DIR + expected = [str(process_file), "arg1", "arg2"] + target_profile = { + "credential_process": f"{str(process_file)} arg1 arg2" + } + actual = default_plugins.get_credentials_process_target_and_arguments(target_profile) + assert actual == expected + process_file.unlink() # remove test script file in AWSUME_DIR + +def test_get_credentials_process_target_and_arguments_without_file(): + process_file = Path(f"{AWSUME_DIR}/nonexistent.sh").expanduser() + target_profile = { + "credential_process": f"{str(process_file)} arg1 arg2" + } + with pytest.raises(exceptions.ValidationException): + default_plugins.get_credentials_process_target_and_arguments(target_profile) + +def test_get_credentials_process_target_and_arguments_invalid_arguments(): + process_file = Path(f"{AWSUME_DIR}/test.sh").expanduser() + process_file.open('w').close() + target_profile = { + "credential_process": "" + } + with pytest.raises(exceptions.ValidationException): + default_plugins.get_credentials_process_target_and_arguments(target_profile) + process_file.unlink() + +def test_get_credentials_process_target_and_arguments_invalid_input(): + target_profile = {} + with pytest.raises(exceptions.ValidationException): + default_plugins.get_credentials_process_target_and_arguments(target_profile) + +def test_get_credentials_process_target_and_arguments_expands_user(): + process_file = Path(f"~/test.sh") + expanded_process_file = process_file.expanduser() + expanded_process_file.open('w').close() + target_profile = { + "credential_process": f"{str(process_file)} arg1 arg2" + } + print(target_profile) + expected = [str(expanded_process_file), "arg1", "arg2"] + actual = default_plugins.get_credentials_process_target_and_arguments(target_profile) + assert actual == expected From 7ef5e17c6341ec273db23b620b1fd9a9d1375298 Mon Sep 17 00:00:00 2001 From: Luke Hendrick Date: Wed, 14 Feb 2024 11:16:14 -0600 Subject: [PATCH 2/3] Pass the entire profile --- awsume/awsumepy/default_plugins.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awsume/awsumepy/default_plugins.py b/awsume/awsumepy/default_plugins.py index e2a9495..507fbb7 100644 --- a/awsume/awsumepy/default_plugins.py +++ b/awsume/awsumepy/default_plugins.py @@ -520,7 +520,7 @@ def get_credentials_from_credential_process(config: dict, arguments: argparse.Na return_session = {} credential_process_env = os.environ.copy() credential_process_env['AWS_PROFILE'] = target_profile_name - credential_process_target_and_arguments = get_credentials_process_target_and_arguments(target_profile.get('credential_process')) + credential_process_target_and_arguments = get_credentials_process_target_and_arguments(target_profile) result = subprocess.run(credential_process_target_and_arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=credential_process_env) logger.info('credential_process returncode: {}'.format(result.returncode)) logger.debug('credential_process stdout: {}'.format(result.stdout.decode('utf-8'))) From 212b0a36c34e15226fedfff07fb8ffcca42db407 Mon Sep 17 00:00:00 2001 From: Luke Hendrick Date: Wed, 14 Feb 2024 13:31:19 -0600 Subject: [PATCH 3/3] Update for test failures --- test/unit/awsume/awsumepy/test_default_plugins.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/test/unit/awsume/awsumepy/test_default_plugins.py b/test/unit/awsume/awsumepy/test_default_plugins.py index 24c37f4..c9c9dce 100644 --- a/test/unit/awsume/awsumepy/test_default_plugins.py +++ b/test/unit/awsume/awsumepy/test_default_plugins.py @@ -1010,16 +1010,16 @@ def test_post_add_arguments_session_tags(aws_lib: MagicMock): region=arguments.region, ) -def test_get_credentials_process_target_and_arguments_with_file(): +@patch.object(Path, 'is_file') +def test_get_credentials_process_target_and_arguments_with_file(is_file: MagicMock): + is_file.return_value(True) process_file = Path(f"{AWSUME_DIR}/test.sh").expanduser() - process_file.open('w').close() # create test script file in AWSUME_DIR expected = [str(process_file), "arg1", "arg2"] target_profile = { "credential_process": f"{str(process_file)} arg1 arg2" } actual = default_plugins.get_credentials_process_target_and_arguments(target_profile) assert actual == expected - process_file.unlink() # remove test script file in AWSUME_DIR def test_get_credentials_process_target_and_arguments_without_file(): process_file = Path(f"{AWSUME_DIR}/nonexistent.sh").expanduser() @@ -1030,14 +1030,11 @@ def test_get_credentials_process_target_and_arguments_without_file(): default_plugins.get_credentials_process_target_and_arguments(target_profile) def test_get_credentials_process_target_and_arguments_invalid_arguments(): - process_file = Path(f"{AWSUME_DIR}/test.sh").expanduser() - process_file.open('w').close() target_profile = { "credential_process": "" } with pytest.raises(exceptions.ValidationException): default_plugins.get_credentials_process_target_and_arguments(target_profile) - process_file.unlink() def test_get_credentials_process_target_and_arguments_invalid_input(): target_profile = {}