Skip to content

Commit

Permalink
reformatting the code
Browse files Browse the repository at this point in the history
  • Loading branch information
karen-avetisyan-mc committed Feb 21, 2024
1 parent 80e74bc commit 4a2c02c
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 42 deletions.
12 changes: 7 additions & 5 deletions client_encryption/api_encryption.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
import inspect
import json
from enum import Enum
from functools import wraps
from warnings import warn
from client_encryption.field_level_encryption_config import FieldLevelEncryptionConfig
from client_encryption.jwe_encryption_config import JweEncryptionConfig
from client_encryption.session_key_params import SessionKeyParams

from client_encryption.field_level_encryption import encrypt_payload as encrypt_field_level, \
decrypt_payload as decrypt_field_level
from client_encryption.field_level_encryption_config import FieldLevelEncryptionConfig
from client_encryption.jwe_encryption import encrypt_payload as encrypt_jwe, decrypt_payload as decrypt_jwe
from client_encryption.jwe_encryption_config import JweEncryptionConfig
from client_encryption.session_key_params import SessionKeyParams


class ApiEncryption(object):
Expand All @@ -34,7 +35,7 @@ def field_encryption_call_api(self, func):

@wraps(func)
def call_api_function(*args, **kwargs):
original_parameters = inspect.signature(func.__self__.call_api).parameters
original_parameters = inspect.signature(func.__self__.call_api).parameters
check_type_is_none = original_parameters.get("_check_type") is None
preload_content_is_not_none = original_parameters.get("_preload_content") is not None
if check_type_is_none and preload_content_is_not_none:
Expand Down Expand Up @@ -160,6 +161,7 @@ def encrypt_field_level_payload(headers, conf, body):

return encrypted_payload


def _contains_param(param_name, headers): return param_name and param_name in headers


Expand Down
1 change: 1 addition & 0 deletions client_encryption/encoding_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
from enum import Enum

from client_encryption.encryption_exception import EncodingError


Expand Down
24 changes: 14 additions & 10 deletions client_encryption/encryption_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA1, SHA224, SHA256, SHA384, SHA512
from client_encryption.encryption_exception import CertificateError, PrivateKeyError, HashAlgorithmError
from Crypto.PublicKey import RSA
from cryptography import x509
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import pkcs12
from enum import IntEnum

from client_encryption.encryption_exception import CertificateError, PrivateKeyError, HashAlgorithmError

_SUPPORTED_HASH = {"SHA1": SHA1, "SHA224": SHA224, "SHA256": SHA256, "SHA384": SHA384, "SHA512": SHA512}


class FileType(IntEnum):
FILETYPE_PEM = 0
FILETYPE_ASN1 = 1
Expand All @@ -18,15 +20,15 @@ class FileType(IntEnum):
def load_encryption_certificate(certificate_path):
"""Load X509 encryption certificate data at the given file path."""

try:
try:
with open(certificate_path, "rb") as cert_content:
certificate = cert_content.read()
except IOError:
raise CertificateError ("Unable to load certificate.")
try:
raise CertificateError("Unable to load certificate.")

try:
cert_type = __get_crypto_file_type(certificate)

if cert_type == FileType.FILETYPE_PEM:
cert = x509.load_pem_x509_certificate(certificate)
return cert, Encoding.PEM
Expand All @@ -36,7 +38,8 @@ def load_encryption_certificate(certificate_path):
if cert_type == FileType.FILETYPE_INVALID:
raise CertificateError("Wrong certificate format.")
except ValueError:
raise CertificateError ("Invalid certificate format.")
raise CertificateError("Invalid certificate format.")


def write_encryption_certificate(certificate_path, certificate, cert_type):
with open(certificate_path, "wb") as f:
Expand All @@ -61,7 +64,8 @@ def load_decryption_key(key_file_path, decryption_key_password=None):

def __load_pkcs12_private_key(pkcs_file, password):
private_key, certs, addcerts = pkcs12.load_key_and_certificates(pkcs_file, password.encode("utf-8"))
return private_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption())
return private_key.private_bytes(serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption())


def __get_crypto_file_type(file_content):
Expand Down
8 changes: 4 additions & 4 deletions client_encryption/field_level_encryption.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import copy
import json
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from client_encryption.session_key_params import SessionKeyParams

from client_encryption.encoding_utils import encode_bytes, decode_value
from client_encryption.json_path_utils import get_node, pop_node, update_node, cleanup_node
from client_encryption.encryption_exception import EncryptionError
from client_encryption.json_path_utils import get_node, pop_node, update_node, cleanup_node
from client_encryption.session_key_params import SessionKeyParams


def encrypt_payload(payload, config, _params=None):
Expand Down Expand Up @@ -119,4 +120,3 @@ def _remove_fingerprint_from_node(node, config):
del node[config.encryption_certificate_fingerprint_field_name]
if config.encryption_key_fingerprint_field_name in node:
del node[config.encryption_key_fingerprint_field_name]

21 changes: 12 additions & 9 deletions client_encryption/field_level_encryption_config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
from Crypto.Hash import SHA256
from client_encryption import encoding_utils
from client_encryption.encryption_utils import load_encryption_certificate, load_decryption_key, validate_hash_algorithm
from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding

from client_encryption import encoding_utils
from client_encryption.encryption_utils import load_encryption_certificate, load_decryption_key, validate_hash_algorithm


class FieldLevelEncryptionConfig(object):
Expand All @@ -28,12 +28,14 @@ def __init__(self, conf):
x509_cert, cert_type = load_encryption_certificate(json_config["encryptionCertificate"])
self._encryption_certificate = x509_cert
# Fixed encoding is required, regardless of initial certificate encoding to ensure correct calculation of fingerprint value
self._encryption_certificate_type = Encoding.DER
self._encryption_certificate_type = Encoding.DER
self._encryption_key_fingerprint = \
json_config.get("encryptionKeyFingerprint",self.__compute_fingerprint(x509_cert.public_key().public_bytes(Encoding.DER , PublicFormat.SubjectPublicKeyInfo)))
json_config.get("encryptionKeyFingerprint", self.__compute_fingerprint(
x509_cert.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)))
self._encryption_certificate_fingerprint = \
json_config.get("encryptionCertificateFingerprint", self.__compute_fingerprint(x509_cert.public_bytes(Encoding.DER)))

json_config.get("encryptionCertificateFingerprint",
self.__compute_fingerprint(x509_cert.public_bytes(Encoding.DER)))

else:
self._encryption_certificate = None
self._encryption_key_fingerprint = None
Expand All @@ -54,11 +56,11 @@ def __init__(self, conf):
self._encrypted_key_field_name = json_config["encryptedKeyFieldName"]
self._encrypted_value_field_name = json_config["encryptedValueFieldName"]

self._encryption_certificate_fingerprint_field_name =\
self._encryption_certificate_fingerprint_field_name = \
json_config.get("encryptionCertificateFingerprintFieldName", None)
self._encryption_key_fingerprint_field_name =\
self._encryption_key_fingerprint_field_name = \
json_config.get("encryptionKeyFingerprintFieldName", None)
self._oaep_padding_digest_algorithm_field_name =\
self._oaep_padding_digest_algorithm_field_name = \
json_config.get("oaepPaddingDigestAlgorithmFieldName", None)

self._use_http_headers = json_config.get("useHttpHeaders", False)
Expand All @@ -74,6 +76,7 @@ def encryption_certificate(self):
@property
def encryption_certificate_type(self):
return self._encryption_certificate_type

@property
def encryption_key_fingerprint(self):
return self._encryption_key_fingerprint
Expand Down
1 change: 0 additions & 1 deletion client_encryption/json_path_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json


_SEPARATOR = "."
_ROOT_SYMBOL = "$"

Expand Down
1 change: 0 additions & 1 deletion client_encryption/jwe_encryption.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import copy
import json

from Crypto.Cipher import AES

from client_encryption.encoding_utils import url_encode_bytes, decode_jwe
Expand Down
10 changes: 5 additions & 5 deletions client_encryption/jwe_encryption_config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import json

from Crypto.Hash import SHA256
from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding

from client_encryption.encoding_utils import ClientEncoding
from client_encryption.encryption_utils import load_encryption_certificate, load_decryption_key
from cryptography.hazmat.primitives.serialization import PublicFormat, Encoding


class JweEncryptionConfig(object):
Expand All @@ -29,9 +28,10 @@ def __init__(self, conf):
x509_cert, cert_type = load_encryption_certificate(json_config["encryptionCertificate"])
self._encryption_certificate = x509_cert
# Fixed encoding is required, regardless of initial certificate encoding to ensure correct calculation of fingerprint value
self._encryption_certificate_type = Encoding.DER
self._encryption_certificate_type = Encoding.DER
self._encryption_key_fingerprint = \
json_config.get("encryptionKeyFingerprint",self.__compute_fingerprint(x509_cert.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)))
json_config.get("encryptionKeyFingerprint", self.__compute_fingerprint(
x509_cert.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo)))
else:
self._encryption_certificate = None
self._encryption_key_fingerprint = None
Expand Down Expand Up @@ -64,7 +64,7 @@ def oaep_padding_digest_algorithm(self):
@property
def encryption_certificate(self):
return self._encryption_certificate

@property
def encryption_certificate_type(self):
return self._encryption_certificate_type
Expand Down
16 changes: 9 additions & 7 deletions client_encryption/session_key_params.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from binascii import Error
from Crypto.Cipher import PKCS1_OAEP, AES
from Crypto.Random import get_random_bytes
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
from binascii import Error
from cryptography.hazmat.primitives.serialization import PublicFormat

from client_encryption.encoding_utils import encode_bytes, decode_value, url_encode_bytes
from client_encryption.encryption_utils import load_hash_algorithm
from client_encryption.encryption_exception import KeyWrappingError
from client_encryption.encryption_utils import load_hash_algorithm
from client_encryption.field_level_encryption_config import FieldLevelEncryptionConfig
from cryptography.hazmat.primitives.serialization import PublicFormat



class SessionKeyParams(object):
"""Class implementing private session key and its params. Provide key and iv random generation functionality"""

_JWE_KEY_SIZE = 256//8
_JWE_KEY_SIZE = 256 // 8
_MASTERCARD_KEY_SIZE = 128 // 8
_BLOCK_SIZE = AES.block_size

Expand Down Expand Up @@ -87,7 +87,9 @@ def generate(config):
def __wrap_secret_key(plain_key, config):
try:
hash_algo = load_hash_algorithm(config.oaep_padding_digest_algorithm)
_cipher = PKCS1_OAEP.new(key=RSA.import_key(config.encryption_certificate.public_key().public_bytes(config.encryption_certificate_type, PublicFormat.SubjectPublicKeyInfo)),
_cipher = PKCS1_OAEP.new(key=RSA.import_key(
config.encryption_certificate.public_key().public_bytes(config.encryption_certificate_type,
PublicFormat.SubjectPublicKeyInfo)),
hashAlgo=hash_algo)

encrypted_secret_key = _cipher.encrypt(plain_key)
Expand Down

0 comments on commit 4a2c02c

Please sign in to comment.