From 2597fb22888eaf158d805ce4c1e68de656753238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Szil=C3=A1rd=20Pfeiffer?= Date: Sat, 4 Nov 2023 14:47:38 +0100 Subject: [PATCH 1/3] refactor!: Move HTTP fetcher class to utils (#15) --- cryptodatahub/common/utils.py | 51 +++++++++++++++++++++++++++++++++++ test/common/classes.py | 10 +++++++ test/common/test_utils.py | 43 +++++++++++++++++++++++++++-- test/updaters/test_common.py | 44 +++--------------------------- updaters/common.py | 35 +----------------------- 5 files changed, 106 insertions(+), 77 deletions(-) diff --git a/cryptodatahub/common/utils.py b/cryptodatahub/common/utils.py index 9661ffc..16d71ea 100644 --- a/cryptodatahub/common/utils.py +++ b/cryptodatahub/common/utils.py @@ -2,7 +2,9 @@ import binascii +import attr import six +import urllib3 def bytes_to_hex_string(byte_array, separator='', lowercase=False): @@ -35,3 +37,52 @@ def name_to_enum_item_name(name): converted_name += '_' return converted_name.rstrip('_').upper() + + +@attr.s +class HttpFetcher(object): + connect_timeout = attr.ib(default=2, validator=attr.validators.instance_of((int, float))) + read_timeout = attr.ib(default=1, validator=attr.validators.instance_of((int, float))) + retry = attr.ib(default=1, validator=attr.validators.instance_of(int)) + _request_params = attr.ib(default=None, init=False) + _response = attr.ib(default=None, init=False) + + def __attrs_post_init__(self): + request_params = { + 'preload_content': False, + 'timeout': urllib3.Timeout(connect=self.connect_timeout, read=self.read_timeout), + 'retries': urllib3.Retry( + self.retry, status_forcelist=urllib3.Retry.RETRY_AFTER_STATUS_CODES | frozenset([502]) + ), + } + + object.__setattr__(self, '_request_params', request_params) + + def get_response_header(self, header_name): + if self._response is None: + raise AttributeError() + + return self._response.headers.get(header_name, None) + + @property + def response_data(self): + if self._response is None: + raise AttributeError() + + return self._response.data + + def fetch(self, url): + pool_manager = urllib3.PoolManager() + + try: + self._response = pool_manager.request('GET', str(url), **self._request_params) + except BaseException as e: # pylint: disable=broad-except + if e.__class__.__name__ != 'TimeoutError' and not isinstance(e, urllib3.exceptions.HTTPError): + raise e + + pool_manager.clear() + + def __call__(self, url): + self.fetch(url) + + return self.response_data diff --git a/test/common/classes.py b/test/common/classes.py index 7e3f11d..7e8a649 100644 --- a/test/common/classes.py +++ b/test/common/classes.py @@ -32,6 +32,16 @@ ) +TEST_URL_PREFIX = '/'.join([ + 'https://gist.githubusercontent.com', + 'c0r0n3r', + '54386701406df6e7299bd95c46a4c8d1', + 'raw', + 'e0b9cf606739d3fc1d97cc7f21e501e118bc4e07', + '' +]) + + @attr.s class TestJsonObjectComplex(CryptoDataParamsBase): attr = attr.ib() diff --git a/test/common/test_utils.py b/test/common/test_utils.py index edea8fb..0a3be79 100644 --- a/test/common/test_utils.py +++ b/test/common/test_utils.py @@ -1,10 +1,18 @@ # -*- coding: utf-8 -*- -import unittest +try: + import unittest + from unittest import mock +except ImportError: + import unittest2 as unittest + import mock + +from test.common.classes import TEST_URL_PREFIX import six +import urllib3 -from cryptodatahub.common.utils import bytes_from_hex_string, bytes_to_hex_string, name_to_enum_item_name +from cryptodatahub.common.utils import bytes_from_hex_string, bytes_to_hex_string, name_to_enum_item_name, HttpFetcher class TestBytesToHexString(unittest.TestCase): @@ -48,3 +56,34 @@ def test_convert_multipart_name(self): def test_convert_i18n_name(self): self.assertEqual(name_to_enum_item_name(six.ensure_text('αβγ')), six.ensure_text('ΑΒΓ')) + + +class TestHttpFetcher(unittest.TestCase): + @mock.patch.object(urllib3.poolmanager.PoolManager, 'request', side_effect=NotImplementedError) + def test_error_unhandaled_exception(self, _): + with self.assertRaises(NotImplementedError): + HttpFetcher()('http://example.com') + + def test_error_fetch_timeout(self): + http_fetcher = HttpFetcher( + connect_timeout=0.001, read_timeout=0.001, retry=0 + ) + with self.assertRaises(AttributeError): + http_fetcher(TEST_URL_PREFIX + 'test.csv') + with self.assertRaises(AttributeError): + http_fetcher.get_response_header('Server') + with self.assertRaises(AttributeError): + _ = http_fetcher.response_data + + def test_fetch(self): + http_fetcher = HttpFetcher() + http_fetcher.fetch(TEST_URL_PREFIX + 'test.html') + self.assertEqual(http_fetcher.get_response_header('Content-Type'), 'text/plain; charset=utf-8') + self.assertEqual(http_fetcher.response_data, b'\n'.join([ + b'', + b'', + b' ', + b' Page content', + b' ', + b'', + ])) diff --git a/test/updaters/test_common.py b/test/updaters/test_common.py index f5b0d34..90ebf85 100644 --- a/test/updaters/test_common.py +++ b/test/updaters/test_common.py @@ -1,29 +1,17 @@ # -*- coding: utf-8 -*- -import urllib3 - try: import unittest - from unittest import mock except ImportError: import unittest2 as unittest - import mock + +from test.common.classes import TEST_URL_PREFIX import attr from cryptodatahub.common.utils import name_to_enum_item_name -from updaters.common import FetcherBase, FetcherCsvBase, HttpFetcher - - -TEST_URL_PREFIX = '/'.join([ - 'https://gist.githubusercontent.com', - 'c0r0n3r', - '54386701406df6e7299bd95c46a4c8d1', - 'raw', - 'e0b9cf606739d3fc1d97cc7f21e501e118bc4e07', - '' -]) +from updaters.common import FetcherBase, FetcherCsvBase @attr.s @@ -66,29 +54,3 @@ def test_current_data(self): {'Col 1': 'Row 2, Col 1', 'Col 2': 'Row 2, Col 2'}, ]) ) - - -class TestHttpFetcher(unittest.TestCase): - @mock.patch.object(urllib3.poolmanager.PoolManager, 'request', side_effect=NotImplementedError) - def test_error_unhandaled_exception(self, _): - with self.assertRaises(NotImplementedError): - HttpFetcher()('http://example.com') - - def test_error_fetch_timeout(self): - response = HttpFetcher( - connect_timeout=0.001, read_timeout=0.001, retry=0 - )( - TEST_URL_PREFIX + 'test.csv' - ) - self.assertEqual(response, None) - - def test_fetch(self): - data = HttpFetcher()(TEST_URL_PREFIX + 'test.html') - self.assertEqual(data, b'\n'.join([ - b'', - b'', - b' ', - b' Page content', - b' ', - b'', - ])) diff --git a/updaters/common.py b/updaters/common.py index c7b2eb8..37c0866 100644 --- a/updaters/common.py +++ b/updaters/common.py @@ -8,44 +8,11 @@ from six.moves import collections_abc import attr -import urllib3 +from cryptodatahub.common.utils import HttpFetcher from cryptodatahub.common.stores import RootCertificate -@attr.s(frozen=True) -class HttpFetcher(object): - connect_timeout = attr.ib(default=2, validator=attr.validators.instance_of((int, float))) - read_timeout = attr.ib(default=1, validator=attr.validators.instance_of((int, float))) - retry = attr.ib(default=1, validator=attr.validators.instance_of(int)) - _request_params = attr.ib(default=None, init=False) - - def __attrs_post_init__(self): - request_params = { - 'preload_content': False, - 'timeout': urllib3.Timeout(connect=self.connect_timeout, read=self.read_timeout), - 'retries': urllib3.Retry( - self.retry, status_forcelist=urllib3.Retry.RETRY_AFTER_STATUS_CODES | frozenset([502]) - ), - } - - object.__setattr__(self, '_request_params', request_params) - - def __call__(self, url): - pool_manager = urllib3.PoolManager() - try: - response = pool_manager.request('GET', url, **self._request_params) - except BaseException as e: # pylint: disable=broad-except - if e.__class__.__name__ != 'TimeoutError' and not isinstance(e, urllib3.exceptions.HTTPError): - raise e - - return None - - pool_manager.clear() - - return response.data - - @attr.s(frozen=True) class CertificatePemFetcher(object): http_fetcher = attr.ib( From abf9117a2599dde0a7895dc79fda29f46fde4590 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Szil=C3=A1rd=20Pfeiffer?= Date: Sat, 4 Nov 2023 14:49:13 +0100 Subject: [PATCH 2/3] refactor!: Move hash generator function to utils (#15) --- cryptodatahub/common/key.py | 23 +++++------------------ cryptodatahub/common/stores.py | 4 +--- cryptodatahub/common/utils.py | 19 +++++++++++++++++++ test/common/test_key.py | 8 ++------ test/common/test_utils.py | 22 +++++++++++++++++++++- 5 files changed, 48 insertions(+), 28 deletions(-) diff --git a/cryptodatahub/common/key.py b/cryptodatahub/common/key.py index 8d30b0c..c164a5e 100644 --- a/cryptodatahub/common/key.py +++ b/cryptodatahub/common/key.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- import abc -import hashlib import base64 import collections import datetime @@ -26,7 +25,7 @@ Vulnerability, ) from cryptodatahub.common.types import _ConverterBase -from cryptodatahub.common.utils import bytes_to_hex_string +from cryptodatahub.common.utils import bytes_to_hex_string, hash_bytes from cryptodatahub.tls.algorithm import TlsExtensionType @@ -156,12 +155,6 @@ class PublicKeyParamsRsa(PublicKeyParamBase): @attr.s(eq=False) class PublicKey(object): - _HASHLIB_FUNCS = { - Hash.MD5: hashlib.md5, - Hash.SHA1: hashlib.sha1, - Hash.SHA2_256: hashlib.sha256 - } - _public_key = attr.ib(validator=attr.validators.instance_of(asn1crypto.keys.PublicKeyInfo)) @classmethod @@ -317,17 +310,11 @@ def key_size(self): def key_bytes(self): return PublicKey.der.fget(self) - @classmethod - def get_digest(cls, hash_type, key_bytes): - try: - hashlib_funcs = cls._HASHLIB_FUNCS[hash_type] - except KeyError as e: - six.raise_from(NotImplementedError(hash_type), e) - - return hashlib_funcs(key_bytes).digest() + def get_digest(self, hash_type): + return hash_bytes(hash_type, self.der) def fingerprint(self, hash_type): - return bytes_to_hex_string(self.get_digest(hash_type, self.der), ':') + return bytes_to_hex_string(self.get_digest(hash_type), ':') @property def fingerprints(self): @@ -472,7 +459,7 @@ def public_key(self): @property def public_key_pin(self): - return base64.b64encode(self.get_digest(Hash.SHA2_256, self.key_bytes)).decode('ascii') + return base64.b64encode(hash_bytes(Hash.SHA2_256, self.key_bytes)).decode('ascii') def _has_any_policy_value(self, oid_values): if self._certificate.certificate_policies_value is None: diff --git a/cryptodatahub/common/stores.py b/cryptodatahub/common/stores.py index de036c7..04c73de 100644 --- a/cryptodatahub/common/stores.py +++ b/cryptodatahub/common/stores.py @@ -281,9 +281,7 @@ def get_json_encoding(cls): def get_item_by_sha2_256_fingerprint(cls, fingerprint_value): if not hasattr(cls, '_ITEMS_BY_SHA2_256_HASH'): cls._ITEMS_BY_SHA2_256_HASH = { - bytes_to_hex_string( - item.value.certificate.get_digest(Hash.SHA2_256, item.value.certificate.der) - ): item + bytes_to_hex_string(item.value.certificate.get_digest(Hash.SHA2_256)): item for item in cls } diff --git a/cryptodatahub/common/utils.py b/cryptodatahub/common/utils.py index 16d71ea..ff81d6c 100644 --- a/cryptodatahub/common/utils.py +++ b/cryptodatahub/common/utils.py @@ -1,11 +1,14 @@ # -*- coding: utf-8 -*- import binascii +import hashlib import attr import six import urllib3 +from cryptodatahub.common.algorithm import Hash + def bytes_to_hex_string(byte_array, separator='', lowercase=False): if lowercase: @@ -39,6 +42,22 @@ def name_to_enum_item_name(name): return converted_name.rstrip('_').upper() +_HASHLIB_FUNCS = { + Hash.MD5: hashlib.md5, + Hash.SHA1: hashlib.sha1, + Hash.SHA2_256: hashlib.sha256, +} + + +def hash_bytes(hash_algorithm, hashable_value): + try: + hashlib_funcs = _HASHLIB_FUNCS[hash_algorithm] + except KeyError as e: + six.raise_from(NotImplementedError(hash_algorithm), e) + + return hashlib_funcs(hashable_value).digest() + + @attr.s class HttpFetcher(object): connect_timeout = attr.ib(default=2, validator=attr.validators.instance_of((int, float))) diff --git a/test/common/test_key.py b/test/common/test_key.py index 2dcd9b0..521d8a5 100644 --- a/test/common/test_key.py +++ b/test/common/test_key.py @@ -244,14 +244,10 @@ def test_der(self): def test_digest(self): public_key = self._get_public_key('snakeoil_cert_pubkey') self.assertEqual( - PublicKey.get_digest(Hash.MD5, public_key.der), + public_key.get_digest(Hash.MD5), bytes_from_hex_string('5D:B2:D9:9F:97:5C:C6:19:B3:91:7E:F8:1A:37:2C:78', ':') ) - with self.assertRaises(NotImplementedError) as context_manager: - PublicKey.get_digest(Hash.SHA2_512, public_key.der) - self.assertEqual(context_manager.exception.args, (Hash.SHA2_512, )) - def test_fingerprints(self): public_key = self._get_public_key('snakeoil_cert_pubkey') self.assertEqual( @@ -341,7 +337,7 @@ def test_der(self): def test_digest(self): public_key_x509 = self._get_public_key_x509('expired.badssl.com') self.assertEqual( - PublicKeyX509Base.get_digest(Hash.MD5, public_key_x509.der), + public_key_x509.get_digest(Hash.MD5), bytes_from_hex_string('67:34:4E:61:C0:43:1C:F1:F7:25:7C:1D:6D:E7:A7:85', ':') ) diff --git a/test/common/test_utils.py b/test/common/test_utils.py index 0a3be79..206b9b3 100644 --- a/test/common/test_utils.py +++ b/test/common/test_utils.py @@ -12,7 +12,14 @@ import six import urllib3 -from cryptodatahub.common.utils import bytes_from_hex_string, bytes_to_hex_string, name_to_enum_item_name, HttpFetcher +from cryptodatahub.common.algorithm import Hash +from cryptodatahub.common.utils import ( + HttpFetcher, + bytes_from_hex_string, + bytes_to_hex_string, + hash_bytes, + name_to_enum_item_name, +) class TestBytesToHexString(unittest.TestCase): @@ -44,6 +51,19 @@ def test_separator(self): self.assertEqual(bytes_from_hex_string('DE:AD:BE:EF', separator=':'), b'\xde\xad\xbe\xef') +class TestHashBytes(unittest.TestCase): + def test_error_unknown_hash_algorithm(self): + with self.assertRaises(NotImplementedError) as context_manager: + hash_bytes(Hash.SHA3_512, b'') + self.assertEqual(context_manager.exception.args, (Hash.SHA3_512, )) + + def test_hash(self): + self.assertEqual( + hash_bytes(Hash.SHA1, b'abc'), + b'\xA9\x99\x3E\x36\x47\x06\x81\x6A\xBA\x3E\x25\x71\x78\x50\xC2\x6C\x9C\xD0\xD8\x9D' + ) + + class TestNameToEnumItemName(unittest.TestCase): def test_convert_simple_name(self): self.assertEqual(name_to_enum_item_name('lower'), 'LOWER') From 060505b35b854a69bb827656ad72ff36dd5f38ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Szil=C3=A1rd=20Pfeiffer?= Date: Sat, 4 Nov 2023 17:45:59 +0100 Subject: [PATCH 3/3] feat(utils)!: Add hash function implemented by each supported Python version (#15) --- cryptodatahub/common/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cryptodatahub/common/utils.py b/cryptodatahub/common/utils.py index ff81d6c..6c5123a 100644 --- a/cryptodatahub/common/utils.py +++ b/cryptodatahub/common/utils.py @@ -45,7 +45,10 @@ def name_to_enum_item_name(name): _HASHLIB_FUNCS = { Hash.MD5: hashlib.md5, Hash.SHA1: hashlib.sha1, + Hash.SHA2_224: hashlib.sha224, Hash.SHA2_256: hashlib.sha256, + Hash.SHA2_384: hashlib.sha384, + Hash.SHA2_512: hashlib.sha512, }