From 4a2c02c22a449e16dae35eab1aae0a9d0f48a122 Mon Sep 17 00:00:00 2001 From: karen-avetisyan-mc Date: Wed, 21 Feb 2024 17:06:07 +0000 Subject: [PATCH] reformatting the code --- client_encryption/api_encryption.py | 12 ++++++---- client_encryption/encoding_utils.py | 1 + client_encryption/encryption_utils.py | 24 +++++++++++-------- client_encryption/field_level_encryption.py | 8 +++---- .../field_level_encryption_config.py | 21 +++++++++------- client_encryption/json_path_utils.py | 1 - client_encryption/jwe_encryption.py | 1 - client_encryption/jwe_encryption_config.py | 10 ++++---- client_encryption/session_key_params.py | 16 +++++++------ 9 files changed, 52 insertions(+), 42 deletions(-) diff --git a/client_encryption/api_encryption.py b/client_encryption/api_encryption.py index 470900b..11384da 100644 --- a/client_encryption/api_encryption.py +++ b/client_encryption/api_encryption.py @@ -1,14 +1,15 @@ -import json import inspect +import json from enum import Enum from functools import wraps from warnings import warn -from client_encryption.field_level_encryption_config import FieldLevelEncryptionConfig -from client_encryption.jwe_encryption_config import JweEncryptionConfig -from client_encryption.session_key_params import SessionKeyParams + from client_encryption.field_level_encryption import encrypt_payload as encrypt_field_level, \ decrypt_payload as decrypt_field_level +from client_encryption.field_level_encryption_config import FieldLevelEncryptionConfig from client_encryption.jwe_encryption import encrypt_payload as encrypt_jwe, decrypt_payload as decrypt_jwe +from client_encryption.jwe_encryption_config import JweEncryptionConfig +from client_encryption.session_key_params import SessionKeyParams class ApiEncryption(object): @@ -34,7 +35,7 @@ def field_encryption_call_api(self, func): @wraps(func) def call_api_function(*args, **kwargs): - original_parameters = inspect.signature(func.__self__.call_api).parameters + original_parameters = inspect.signature(func.__self__.call_api).parameters check_type_is_none = original_parameters.get("_check_type") is None preload_content_is_not_none = original_parameters.get("_preload_content") is not None if check_type_is_none and preload_content_is_not_none: @@ -160,6 +161,7 @@ def encrypt_field_level_payload(headers, conf, body): return encrypted_payload + def _contains_param(param_name, headers): return param_name and param_name in headers diff --git a/client_encryption/encoding_utils.py b/client_encryption/encoding_utils.py index ba9133b..22aa376 100644 --- a/client_encryption/encoding_utils.py +++ b/client_encryption/encoding_utils.py @@ -1,5 +1,6 @@ import base64 from enum import Enum + from client_encryption.encryption_exception import EncodingError diff --git a/client_encryption/encryption_utils.py b/client_encryption/encryption_utils.py index 0ea1ad8..ce01b1a 100644 --- a/client_encryption/encryption_utils.py +++ b/client_encryption/encryption_utils.py @@ -1,14 +1,16 @@ -from Crypto.PublicKey import RSA from Crypto.Hash import SHA1, SHA224, SHA256, SHA384, SHA512 -from client_encryption.encryption_exception import CertificateError, PrivateKeyError, HashAlgorithmError +from Crypto.PublicKey import RSA from cryptography import x509 -from cryptography.hazmat.primitives.serialization import pkcs12 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.hazmat.primitives.serialization import pkcs12 from enum import IntEnum +from client_encryption.encryption_exception import CertificateError, PrivateKeyError, HashAlgorithmError + _SUPPORTED_HASH = {"SHA1": SHA1, "SHA224": SHA224, "SHA256": SHA256, "SHA384": SHA384, "SHA512": SHA512} + class FileType(IntEnum): FILETYPE_PEM = 0 FILETYPE_ASN1 = 1 @@ -18,15 +20,15 @@ class FileType(IntEnum): def load_encryption_certificate(certificate_path): """Load X509 encryption certificate data at the given file path.""" - try: + try: with open(certificate_path, "rb") as cert_content: certificate = cert_content.read() except IOError: - raise CertificateError ("Unable to load certificate.") - - try: + raise CertificateError("Unable to load certificate.") + + try: cert_type = __get_crypto_file_type(certificate) - + if cert_type == FileType.FILETYPE_PEM: cert = x509.load_pem_x509_certificate(certificate) return cert, Encoding.PEM @@ -36,7 +38,8 @@ def load_encryption_certificate(certificate_path): if cert_type == FileType.FILETYPE_INVALID: raise CertificateError("Wrong certificate format.") except ValueError: - raise CertificateError ("Invalid certificate format.") + raise CertificateError("Invalid certificate format.") + def write_encryption_certificate(certificate_path, certificate, cert_type): with open(certificate_path, "wb") as f: @@ -61,7 +64,8 @@ def load_decryption_key(key_file_path, decryption_key_password=None): def __load_pkcs12_private_key(pkcs_file, password): private_key, certs, addcerts = pkcs12.load_key_and_certificates(pkcs_file, password.encode("utf-8")) - return private_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption()) + return private_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption()) def __get_crypto_file_type(file_content): diff --git a/client_encryption/field_level_encryption.py b/client_encryption/field_level_encryption.py index e5eb337..bde4410 100644 --- a/client_encryption/field_level_encryption.py +++ b/client_encryption/field_level_encryption.py @@ -1,11 +1,12 @@ -import json import copy +import json from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad -from client_encryption.session_key_params import SessionKeyParams + from client_encryption.encoding_utils import encode_bytes, decode_value -from client_encryption.json_path_utils import get_node, pop_node, update_node, cleanup_node from client_encryption.encryption_exception import EncryptionError +from client_encryption.json_path_utils import get_node, pop_node, update_node, cleanup_node +from client_encryption.session_key_params import SessionKeyParams def encrypt_payload(payload, config, _params=None): @@ -119,4 +120,3 @@ def _remove_fingerprint_from_node(node, config): del node[config.encryption_certificate_fingerprint_field_name] if config.encryption_key_fingerprint_field_name in node: del node[config.encryption_key_fingerprint_field_name] - diff --git a/client_encryption/field_level_encryption_config.py b/client_encryption/field_level_encryption_config.py index dad6cdc..707d61c 100644 --- a/client_encryption/field_level_encryption_config.py +++ b/client_encryption/field_level_encryption_config.py @@ -1,9 +1,9 @@ import json from Crypto.Hash import SHA256 -from client_encryption import encoding_utils -from client_encryption.encryption_utils import load_encryption_certificate, load_decryption_key, validate_hash_algorithm from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding +from client_encryption import encoding_utils +from client_encryption.encryption_utils import load_encryption_certificate, load_decryption_key, validate_hash_algorithm class FieldLevelEncryptionConfig(object): @@ -28,12 +28,14 @@ def __init__(self, conf): x509_cert, cert_type = load_encryption_certificate(json_config["encryptionCertificate"]) self._encryption_certificate = x509_cert # Fixed encoding is required, regardless of initial certificate encoding to ensure correct calculation of fingerprint value - self._encryption_certificate_type = Encoding.DER + self._encryption_certificate_type = Encoding.DER self._encryption_key_fingerprint = \ - json_config.get("encryptionKeyFingerprint",self.__compute_fingerprint(x509_cert.public_key().public_bytes(Encoding.DER , PublicFormat.SubjectPublicKeyInfo))) + json_config.get("encryptionKeyFingerprint", self.__compute_fingerprint( + x509_cert.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo))) self._encryption_certificate_fingerprint = \ - json_config.get("encryptionCertificateFingerprint", self.__compute_fingerprint(x509_cert.public_bytes(Encoding.DER))) - + json_config.get("encryptionCertificateFingerprint", + self.__compute_fingerprint(x509_cert.public_bytes(Encoding.DER))) + else: self._encryption_certificate = None self._encryption_key_fingerprint = None @@ -54,11 +56,11 @@ def __init__(self, conf): self._encrypted_key_field_name = json_config["encryptedKeyFieldName"] self._encrypted_value_field_name = json_config["encryptedValueFieldName"] - self._encryption_certificate_fingerprint_field_name =\ + self._encryption_certificate_fingerprint_field_name = \ json_config.get("encryptionCertificateFingerprintFieldName", None) - self._encryption_key_fingerprint_field_name =\ + self._encryption_key_fingerprint_field_name = \ json_config.get("encryptionKeyFingerprintFieldName", None) - self._oaep_padding_digest_algorithm_field_name =\ + self._oaep_padding_digest_algorithm_field_name = \ json_config.get("oaepPaddingDigestAlgorithmFieldName", None) self._use_http_headers = json_config.get("useHttpHeaders", False) @@ -74,6 +76,7 @@ def encryption_certificate(self): @property def encryption_certificate_type(self): return self._encryption_certificate_type + @property def encryption_key_fingerprint(self): return self._encryption_key_fingerprint diff --git a/client_encryption/json_path_utils.py b/client_encryption/json_path_utils.py index 59331eb..c79a005 100644 --- a/client_encryption/json_path_utils.py +++ b/client_encryption/json_path_utils.py @@ -1,6 +1,5 @@ import json - _SEPARATOR = "." _ROOT_SYMBOL = "$" diff --git a/client_encryption/jwe_encryption.py b/client_encryption/jwe_encryption.py index 0ea6db8..832e879 100644 --- a/client_encryption/jwe_encryption.py +++ b/client_encryption/jwe_encryption.py @@ -1,6 +1,5 @@ import copy import json - from Crypto.Cipher import AES from client_encryption.encoding_utils import url_encode_bytes, decode_jwe diff --git a/client_encryption/jwe_encryption_config.py b/client_encryption/jwe_encryption_config.py index 8e7a42c..c14a46d 100644 --- a/client_encryption/jwe_encryption_config.py +++ b/client_encryption/jwe_encryption_config.py @@ -1,10 +1,9 @@ import json - from Crypto.Hash import SHA256 +from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding from client_encryption.encoding_utils import ClientEncoding from client_encryption.encryption_utils import load_encryption_certificate, load_decryption_key -from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding class JweEncryptionConfig(object): @@ -29,9 +28,10 @@ def __init__(self, conf): x509_cert, cert_type = load_encryption_certificate(json_config["encryptionCertificate"]) self._encryption_certificate = x509_cert # Fixed encoding is required, regardless of initial certificate encoding to ensure correct calculation of fingerprint value - self._encryption_certificate_type = Encoding.DER + self._encryption_certificate_type = Encoding.DER self._encryption_key_fingerprint = \ - json_config.get("encryptionKeyFingerprint",self.__compute_fingerprint(x509_cert.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo))) + json_config.get("encryptionKeyFingerprint", self.__compute_fingerprint( + x509_cert.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo))) else: self._encryption_certificate = None self._encryption_key_fingerprint = None @@ -64,7 +64,7 @@ def oaep_padding_digest_algorithm(self): @property def encryption_certificate(self): return self._encryption_certificate - + @property def encryption_certificate_type(self): return self._encryption_certificate_type diff --git a/client_encryption/session_key_params.py b/client_encryption/session_key_params.py index 7e45f9f..849cb21 100644 --- a/client_encryption/session_key_params.py +++ b/client_encryption/session_key_params.py @@ -1,19 +1,19 @@ -from binascii import Error from Crypto.Cipher import PKCS1_OAEP, AES -from Crypto.Random import get_random_bytes from Crypto.PublicKey import RSA +from Crypto.Random import get_random_bytes +from binascii import Error +from cryptography.hazmat.primitives.serialization import PublicFormat + from client_encryption.encoding_utils import encode_bytes, decode_value, url_encode_bytes -from client_encryption.encryption_utils import load_hash_algorithm from client_encryption.encryption_exception import KeyWrappingError +from client_encryption.encryption_utils import load_hash_algorithm from client_encryption.field_level_encryption_config import FieldLevelEncryptionConfig -from cryptography.hazmat.primitives.serialization import PublicFormat - class SessionKeyParams(object): """Class implementing private session key and its params. Provide key and iv random generation functionality""" - _JWE_KEY_SIZE = 256//8 + _JWE_KEY_SIZE = 256 // 8 _MASTERCARD_KEY_SIZE = 128 // 8 _BLOCK_SIZE = AES.block_size @@ -87,7 +87,9 @@ def generate(config): def __wrap_secret_key(plain_key, config): try: hash_algo = load_hash_algorithm(config.oaep_padding_digest_algorithm) - _cipher = PKCS1_OAEP.new(key=RSA.import_key(config.encryption_certificate.public_key().public_bytes(config.encryption_certificate_type, PublicFormat.SubjectPublicKeyInfo)), + _cipher = PKCS1_OAEP.new(key=RSA.import_key( + config.encryption_certificate.public_key().public_bytes(config.encryption_certificate_type, + PublicFormat.SubjectPublicKeyInfo)), hashAlgo=hash_algo) encrypted_secret_key = _cipher.encrypt(plain_key)