diff --git a/securesystemslib/signer/_aws_signer.py b/securesystemslib/signer/_aws_signer.py index 73fae09f..06a680c3 100644 --- a/securesystemslib/signer/_aws_signer.py +++ b/securesystemslib/signer/_aws_signer.py @@ -4,7 +4,6 @@ from typing import Optional, Tuple from urllib import parse -import securesystemslib.hash as sslib_hash from securesystemslib import exceptions from securesystemslib.exceptions import UnsupportedLibraryError from securesystemslib.signer._key import Key, SSlibKey @@ -58,15 +57,28 @@ class AWSSigner(Signer): SCHEME = "awskms" + aws_signing_algorithms = { + "ecdsa-sha2-nistp256": "ECDSA_SHA_256", + "ecdsa-sha2-nistp384": "ECDSA_SHA_384", + "ecdsa-sha2-nistp512": "ECDSA_SHA_512", + "rsassa-pss-sha256": "RSASSA_PSS_SHA_256", + "rsassa-pss-sha384": "RSASSA_PSS_SHA_384", + "rsassa-pss-sha512": "RSASSA_PSS_SHA_512", + "rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256", + "rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384", + "rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512", + } + def __init__(self, aws_key_id: str, public_key: Key): if AWS_IMPORT_ERROR: raise UnsupportedLibraryError(AWS_IMPORT_ERROR) - self.hash_algorithm = self._get_hash_algorithm(public_key) self.aws_key_id = aws_key_id self.public_key = public_key self.client = boto3.client("kms") - self.aws_algo = self._get_aws_signing_algo(self.public_key.scheme) + self.aws_algo = AWSSigner.aws_signing_algorithms.get( + self.public_key.scheme + ) @classmethod def from_priv_key_uri( @@ -83,7 +95,9 @@ def from_priv_key_uri( return cls(uri.path, public_key) @classmethod - def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]: + def import_( + cls, aws_key_id: str, local_scheme: Optional[str] = None + ) -> Tuple[str, Key]: """Loads a key and signer details from AWS KMS. Returns the private key uri and the public key. This method should only @@ -91,7 +105,8 @@ def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]: Arguments: aws_key_id (str): AWS KMS key ID. - local_scheme (str): Local scheme to use. + local_scheme (Optional[str]): The Secure Systems Library RSA or ECDSA scheme. + Defaults to 'rsassa-pss-sha256' if not provided and RSA. Returns: Tuple[str, Key]: A tuple where the first element is a string @@ -101,26 +116,43 @@ def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]: Raises: UnsupportedAlgorithmError: If the AWS KMS signing algorithm is unsupported. - BotoCoreError: Errors from the botocore.exceptions library. + BotoCoreError: Errors from the botocore library. ClientError: Errors related to AWS KMS client. """ if AWS_IMPORT_ERROR: raise UnsupportedLibraryError(AWS_IMPORT_ERROR) - client = boto3.client("kms") - request = client.get_public_key(KeyId=aws_key_id) + if local_scheme and local_scheme not in cls.aws_signing_algorithms: + raise ValueError(f"Unsupported scheme: {local_scheme}") + + try: + client = boto3.client("kms") + request = client.get_public_key(KeyId=aws_key_id) + except (BotoCoreError, ClientError) as e: + logger.error( + "Failed to import key using AWS KMS key ID %s: %s", + aws_key_id, + str(e), + ) + raise e + + keytype = cls._get_keytype_from_aws_response(request) + aws_scheme = request["SigningAlgorithms"][0] + + if not local_scheme: + if keytype == "ecdsa": + local_scheme = cls._get_ecdsa_scheme(aws_scheme) + elif keytype == "rsa": + local_scheme = "rsassa-pss-sha256" + else: + raise ValueError(f"Unsupported key type: {keytype}") + kms_pubkey = serialization.load_der_public_key(request["PublicKey"]) public_key_pem = kms_pubkey.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ).decode("utf-8") - try: - keytype = cls._get_keytype_for_scheme(local_scheme) - except KeyError as e: - raise exceptions.UnsupportedAlgorithmError( - f"{local_scheme} is not a supported signing algorithm" - ) from e keyval = {"public": public_key_pem} keyid = compute_default_keyid(keytype, local_scheme, keyval) @@ -128,109 +160,73 @@ def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]: return f"{cls.SCHEME}:{aws_key_id}", public_key @staticmethod - def _get_keytype_for_scheme( - scheme: str, - ) -> str: - """Returns the Secure Systems Library key type. + def _get_keytype_from_aws_response(reponse: dict) -> str: + """Determines the key type from the AWS KMS get_public_key response. Arguments: - (str): The Secure Systems Library scheme. + response (dict): The response from AWS KMS get_public_key request. Returns: - str: The Secure Systems Library key type. + str: The key type, either 'ecdsa' or 'rsa'. """ - keytype_for_scheme = { - "ecdsa-sha2-nistp256": "ecdsa", - "ecdsa-sha2-nistp384": "ecdsa", - "ecdsa-sha2-nistp512": "ecdsa", - "rsassa-pss-sha256": "rsa", - "rsassa-pss-sha384": "rsa", - "rsassa-pss-sha512": "rsa", - "rsa-pkcs1v15-sha256": "rsa", - "rsa-pkcs1v15-sha384": "rsa", - "rsa-pkcs1v15-sha512": "rsa", - } - return keytype_for_scheme[scheme] + algo = reponse["SigningAlgorithms"][0] + if "ECDSA" in algo: + return "ecdsa" + if "RSASSA" in algo: + return "rsa" + raise exceptions.UnsupportedAlgorithmError( + f"Unsupported algorithm in AWS response: {algo}" + ) @staticmethod - def _get_aws_signing_algo( - scheme: str, - ) -> str: - """Returns AWS signing algorithm + def _get_ecdsa_scheme(aws_algo: str) -> str: + """Returns ECDSA signing scheme based on AWS signing algorithm. Arguments: - scheme (str): The Secure Systems Library signing scheme. + aws_algo (str): The AWS ECDSA signing algorithm. Returns: - str: AWS signing scheme. + str: The Secure Systems Library ECDSA scheme. """ - aws_signing_algorithms = { - "ecdsa-sha2-nistp256": "ECDSA_SHA_256", - "ecdsa-sha2-nistp384": "ECDSA_SHA_384", - "ecdsa-sha2-nistp512": "ECDSA_SHA_512", - "rsassa-pss-sha256": "RSASSA_PSS_SHA_256", - "rsassa-pss-sha384": "RSASSA_PSS_SHA_384", - "rsassa-pss-sha512": "RSASSA_PSS_SHA_512", - "rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256", - "rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384", - "rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512", + ecdsa_signing_algorithms = { + "ECDSA_SHA_256": "ecdsa-sha2-nistp256", + "ECDSA_SHA_384": "ecdsa-sha2-nistp384", + "ECDSA_SHA_512": "ecdsa-sha2-nistp512", } - return aws_signing_algorithms[scheme] - - @staticmethod - def _get_hash_algorithm(public_key: Key) -> str: - """Helper function to return payload hash algorithm used for this key - - Arguments: - public_key (Key): Public key object - - Returns: - str: Hash algorithm - """ - if public_key.keytype == "rsa": - # hash algorithm is encoded as last scheme portion - algo = public_key.scheme.split("-")[-1] - if public_key.keytype in [ - "ecdsa", - "ecdsa-sha2-nistp256", - "ecdsa-sha2-nistp384", - ]: - # nistp256 uses sha-256, nistp384 uses sha-384 - bits = public_key.scheme.split("-nistp")[-1] - algo = f"sha{bits}" - - # trigger UnsupportedAlgorithm if appropriate - _ = sslib_hash.digest(algo) - return algo + return ecdsa_signing_algorithms[aws_algo] def sign(self, payload: bytes) -> Signature: """Sign the payload with the AWS KMS key + This method sends the payload to AWS KMS, where it is signed using the specified + key and algorithm using the raw message type. + Arguments: - payload: bytes to be signed. + payload (bytes): The payload to be signed. Raises: - BotoCoreError: Errors from the botocore.exceptions library. - ClientError: Errors related to AWS KMS client. + BotoCoreError, ClientError: If an error occurs during the signing process. Returns: - Signature. + Signature: A signature object containing the key ID and the signature. """ try: - request = self.client.sign( + sign_request = self.client.sign( KeyId=self.aws_key_id, Message=payload, MessageType="RAW", SigningAlgorithm=self.aws_algo, ) - hasher = sslib_hash.digest(self.hash_algorithm) - hasher.update(payload) - logger.debug("signing response %s", request) - response = request["Signature"] - logger.debug("signing response %s", response) + logger.debug("Signing response: %s", sign_request) + response = sign_request["Signature"] + logger.debug("Signature response: %s", response) return Signature(self.public_key.keyid, response.hex()) except (BotoCoreError, ClientError) as e: - logger.error("Failed to sign with AWS KMS: %s", str(e)) + logger.error( + "Failed to sign using AWS KMS key ID %s: %s", + self.aws_key_id, + str(e), + ) raise e diff --git a/tests/check_aws_signer.py b/tests/check_aws_signer.py index e2603cff..66bbdcc7 100644 --- a/tests/check_aws_signer.py +++ b/tests/check_aws_signer.py @@ -50,13 +50,18 @@ def test_aws_sign(self): with self.assertRaises(UnverifiedSignatureError): self.pubkey.verify_signature(sig, b"NOT DATA") - def test_aws_import(self): - """Test that AWS KMS key can be imported""" - + def test_aws_import_with_scheme(self): + """Test that AWS KMS key can be imported with a specified scheme.""" uri, key = AWSSigner.import_(self.aws_key_id, self.pubkey.scheme) self.assertEqual(key.keytype, self.pubkey.keytype) self.assertEqual(uri, f"awskms:{self.aws_key_id}") + def test_aws_import_without_scheme(self): + """Test that AWS KMS key can be imported without specifying a scheme.""" + uri, key = AWSSigner.import_(self.aws_key_id) + self.assertEqual(key.keytype, self.pubkey.keytype) + self.assertEqual(uri, f"awskms:{self.aws_key_id}") + if __name__ == "__main__": unittest.main(verbosity=1)