Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactors AWSSigner; scheme no longer required. #724

Merged
merged 11 commits into from
Apr 11, 2024
168 changes: 82 additions & 86 deletions securesystemslib/signer/_aws_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Optional, Tuple
from urllib import parse

import securesystemslib.hash as sslib_hash
from securesystemslib import exceptions
from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.signer._key import Key, SSlibKey
Expand Down Expand Up @@ -58,15 +57,28 @@ class AWSSigner(Signer):

SCHEME = "awskms"

aws_signing_algorithms = {
"ecdsa-sha2-nistp256": "ECDSA_SHA_256",
"ecdsa-sha2-nistp384": "ECDSA_SHA_384",
"ecdsa-sha2-nistp512": "ECDSA_SHA_512",
"rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
"rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
"rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
"rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256",
"rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384",
"rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512",
}

def __init__(self, aws_key_id: str, public_key: Key):
if AWS_IMPORT_ERROR:
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)

self.hash_algorithm = self._get_hash_algorithm(public_key)
self.aws_key_id = aws_key_id
self.public_key = public_key
self.client = boto3.client("kms")
self.aws_algo = self._get_aws_signing_algo(self.public_key.scheme)
self.aws_algo = AWSSigner.aws_signing_algorithms.get(
self.public_key.scheme
)

@classmethod
def from_priv_key_uri(
Expand All @@ -83,15 +95,18 @@ def from_priv_key_uri(
return cls(uri.path, public_key)

@classmethod
def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]:
def import_(
cls, aws_key_id: str, local_scheme: Optional[str] = None
) -> Tuple[str, Key]:
"""Loads a key and signer details from AWS KMS.

Returns the private key uri and the public key. This method should only
be called once per key: the uri and Key should be stored for later use.

Arguments:
aws_key_id (str): AWS KMS key ID.
local_scheme (str): Local scheme to use.
local_scheme (Optional[str]): The Secure Systems Library RSA or ECDSA scheme.
Defaults to 'rsassa-pss-sha256' if not provided and RSA.

Returns:
Tuple[str, Key]: A tuple where the first element is a string
Expand All @@ -101,136 +116,117 @@ def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]:
Raises:
UnsupportedAlgorithmError: If the AWS KMS signing algorithm is
unsupported.
BotoCoreError: Errors from the botocore.exceptions library.
BotoCoreError: Errors from the botocore library.
ClientError: Errors related to AWS KMS client.
"""
if AWS_IMPORT_ERROR:
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)

client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
if local_scheme and local_scheme not in cls.aws_signing_algorithms:
raise ValueError(f"Unsupported scheme: {local_scheme}")

try:
client = boto3.client("kms")
request = client.get_public_key(KeyId=aws_key_id)
except (BotoCoreError, ClientError) as e:
ianhundere marked this conversation as resolved.
Show resolved Hide resolved
logger.error(
"Failed to import key using AWS KMS key ID %s: %s",
aws_key_id,
str(e),
)
raise e

keytype = cls._get_keytype_from_aws_response(request)
aws_scheme = request["SigningAlgorithms"][0]

if not local_scheme:
if keytype == "ecdsa":
local_scheme = cls._get_ecdsa_scheme(aws_scheme)
elif keytype == "rsa":
local_scheme = "rsassa-pss-sha256"
else:
raise ValueError(f"Unsupported key type: {keytype}")

kms_pubkey = serialization.load_der_public_key(request["PublicKey"])

public_key_pem = kms_pubkey.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode("utf-8")
try:
keytype = cls._get_keytype_for_scheme(local_scheme)
except KeyError as e:
raise exceptions.UnsupportedAlgorithmError(
f"{local_scheme} is not a supported signing algorithm"
) from e

keyval = {"public": public_key_pem}
keyid = compute_default_keyid(keytype, local_scheme, keyval)
public_key = SSlibKey(keyid, keytype, local_scheme, keyval)
return f"{cls.SCHEME}:{aws_key_id}", public_key

@staticmethod
def _get_keytype_for_scheme(
scheme: str,
) -> str:
"""Returns the Secure Systems Library key type.
def _get_keytype_from_aws_response(reponse: dict) -> str:
"""Determines the key type from the AWS KMS get_public_key response.

Arguments:
(str): The Secure Systems Library scheme.
response (dict): The response from AWS KMS get_public_key request.

Returns:
str: The Secure Systems Library key type.
str: The key type, either 'ecdsa' or 'rsa'.
"""
keytype_for_scheme = {
"ecdsa-sha2-nistp256": "ecdsa",
"ecdsa-sha2-nistp384": "ecdsa",
"ecdsa-sha2-nistp512": "ecdsa",
"rsassa-pss-sha256": "rsa",
"rsassa-pss-sha384": "rsa",
"rsassa-pss-sha512": "rsa",
"rsa-pkcs1v15-sha256": "rsa",
"rsa-pkcs1v15-sha384": "rsa",
"rsa-pkcs1v15-sha512": "rsa",
}
return keytype_for_scheme[scheme]
algo = reponse["SigningAlgorithms"][0]
if "ECDSA" in algo:
return "ecdsa"
if "RSASSA" in algo:
return "rsa"
raise exceptions.UnsupportedAlgorithmError(
f"Unsupported algorithm in AWS response: {algo}"
)

@staticmethod
def _get_aws_signing_algo(
scheme: str,
) -> str:
"""Returns AWS signing algorithm
def _get_ecdsa_scheme(aws_algo: str) -> str:
"""Returns ECDSA signing scheme based on AWS signing algorithm.

Arguments:
scheme (str): The Secure Systems Library signing scheme.
aws_algo (str): The AWS ECDSA signing algorithm.

Returns:
str: AWS signing scheme.
str: The Secure Systems Library ECDSA scheme.
"""
aws_signing_algorithms = {
"ecdsa-sha2-nistp256": "ECDSA_SHA_256",
"ecdsa-sha2-nistp384": "ECDSA_SHA_384",
"ecdsa-sha2-nistp512": "ECDSA_SHA_512",
"rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
"rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
"rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
"rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256",
"rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384",
"rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512",
ecdsa_signing_algorithms = {
"ECDSA_SHA_256": "ecdsa-sha2-nistp256",
"ECDSA_SHA_384": "ecdsa-sha2-nistp384",
"ECDSA_SHA_512": "ecdsa-sha2-nistp512",
}
return aws_signing_algorithms[scheme]

@staticmethod
def _get_hash_algorithm(public_key: Key) -> str:
"""Helper function to return payload hash algorithm used for this key

Arguments:
public_key (Key): Public key object

Returns:
str: Hash algorithm
"""
if public_key.keytype == "rsa":
# hash algorithm is encoded as last scheme portion
algo = public_key.scheme.split("-")[-1]
if public_key.keytype in [
"ecdsa",
"ecdsa-sha2-nistp256",
"ecdsa-sha2-nistp384",
]:
# nistp256 uses sha-256, nistp384 uses sha-384
bits = public_key.scheme.split("-nistp")[-1]
algo = f"sha{bits}"

# trigger UnsupportedAlgorithm if appropriate
_ = sslib_hash.digest(algo)
return algo
return ecdsa_signing_algorithms[aws_algo]

def sign(self, payload: bytes) -> Signature:
"""Sign the payload with the AWS KMS key

This method sends the payload to AWS KMS, where it is signed using the specified
key and algorithm using the raw message type.
ianhundere marked this conversation as resolved.
Show resolved Hide resolved

Arguments:
payload: bytes to be signed.
payload (bytes): The payload to be signed.

Raises:
BotoCoreError: Errors from the botocore.exceptions library.
ClientError: Errors related to AWS KMS client.
BotoCoreError, ClientError: If an error occurs during the signing process.

Returns:
Signature.
Signature: A signature object containing the key ID and the signature.
"""
try:
request = self.client.sign(
sign_request = self.client.sign(
KeyId=self.aws_key_id,
Message=payload,
MessageType="RAW",
SigningAlgorithm=self.aws_algo,
)

hasher = sslib_hash.digest(self.hash_algorithm)
hasher.update(payload)
logger.debug("signing response %s", request)
response = request["Signature"]
logger.debug("signing response %s", response)
logger.debug("Signing response: %s", sign_request)
response = sign_request["Signature"]
logger.debug("Signature response: %s", response)

return Signature(self.public_key.keyid, response.hex())
except (BotoCoreError, ClientError) as e:
logger.error("Failed to sign with AWS KMS: %s", str(e))
logger.error(
"Failed to sign using AWS KMS key ID %s: %s",
self.aws_key_id,
str(e),
)
raise e
11 changes: 8 additions & 3 deletions tests/check_aws_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ def test_aws_sign(self):
with self.assertRaises(UnverifiedSignatureError):
self.pubkey.verify_signature(sig, b"NOT DATA")

def test_aws_import(self):
"""Test that AWS KMS key can be imported"""

def test_aws_import_with_scheme(self):
"""Test that AWS KMS key can be imported with a specified scheme."""
uri, key = AWSSigner.import_(self.aws_key_id, self.pubkey.scheme)
self.assertEqual(key.keytype, self.pubkey.keytype)
self.assertEqual(uri, f"awskms:{self.aws_key_id}")

def test_aws_import_without_scheme(self):
"""Test that AWS KMS key can be imported without specifying a scheme."""
uri, key = AWSSigner.import_(self.aws_key_id)
self.assertEqual(key.keytype, self.pubkey.keytype)
self.assertEqual(uri, f"awskms:{self.aws_key_id}")


if __name__ == "__main__":
unittest.main(verbosity=1)