Skip to content

Commit

Permalink
Merge branch '15-http-subresource-integrity-check'
Browse files Browse the repository at this point in the history
Closes: #15
  • Loading branch information
c0r0n3r committed Nov 7, 2023
2 parents 927779b + 060505b commit acc33d3
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 104 deletions.
23 changes: 5 additions & 18 deletions cryptodatahub/common/key.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-

import abc
import hashlib
import base64
import collections
import datetime
Expand All @@ -26,7 +25,7 @@
Vulnerability,
)
from cryptodatahub.common.types import _ConverterBase
from cryptodatahub.common.utils import bytes_to_hex_string
from cryptodatahub.common.utils import bytes_to_hex_string, hash_bytes

from cryptodatahub.tls.algorithm import TlsExtensionType

Expand Down Expand Up @@ -156,12 +155,6 @@ class PublicKeyParamsRsa(PublicKeyParamBase):

@attr.s(eq=False)
class PublicKey(object):
_HASHLIB_FUNCS = {
Hash.MD5: hashlib.md5,
Hash.SHA1: hashlib.sha1,
Hash.SHA2_256: hashlib.sha256
}

_public_key = attr.ib(validator=attr.validators.instance_of(asn1crypto.keys.PublicKeyInfo))

@classmethod
Expand Down Expand Up @@ -317,17 +310,11 @@ def key_size(self):
def key_bytes(self):
return PublicKey.der.fget(self)

@classmethod
def get_digest(cls, hash_type, key_bytes):
try:
hashlib_funcs = cls._HASHLIB_FUNCS[hash_type]
except KeyError as e:
six.raise_from(NotImplementedError(hash_type), e)

return hashlib_funcs(key_bytes).digest()
def get_digest(self, hash_type):
return hash_bytes(hash_type, self.der)

def fingerprint(self, hash_type):
return bytes_to_hex_string(self.get_digest(hash_type, self.der), ':')
return bytes_to_hex_string(self.get_digest(hash_type), ':')

@property
def fingerprints(self):
Expand Down Expand Up @@ -472,7 +459,7 @@ def public_key(self):

@property
def public_key_pin(self):
return base64.b64encode(self.get_digest(Hash.SHA2_256, self.key_bytes)).decode('ascii')
return base64.b64encode(hash_bytes(Hash.SHA2_256, self.key_bytes)).decode('ascii')

def _has_any_policy_value(self, oid_values):
if self._certificate.certificate_policies_value is None:
Expand Down
4 changes: 1 addition & 3 deletions cryptodatahub/common/stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,7 @@ def get_json_encoding(cls):
def get_item_by_sha2_256_fingerprint(cls, fingerprint_value):
if not hasattr(cls, '_ITEMS_BY_SHA2_256_HASH'):
cls._ITEMS_BY_SHA2_256_HASH = {
bytes_to_hex_string(
item.value.certificate.get_digest(Hash.SHA2_256, item.value.certificate.der)
): item
bytes_to_hex_string(item.value.certificate.get_digest(Hash.SHA2_256)): item
for item in cls
}

Expand Down
73 changes: 73 additions & 0 deletions cryptodatahub/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# -*- coding: utf-8 -*-

import binascii
import hashlib

import attr
import six
import urllib3

from cryptodatahub.common.algorithm import Hash


def bytes_to_hex_string(byte_array, separator='', lowercase=False):
Expand Down Expand Up @@ -35,3 +40,71 @@ def name_to_enum_item_name(name):
converted_name += '_'

return converted_name.rstrip('_').upper()


_HASHLIB_FUNCS = {
Hash.MD5: hashlib.md5,
Hash.SHA1: hashlib.sha1,
Hash.SHA2_224: hashlib.sha224,
Hash.SHA2_256: hashlib.sha256,
Hash.SHA2_384: hashlib.sha384,
Hash.SHA2_512: hashlib.sha512,
}


def hash_bytes(hash_algorithm, hashable_value):
try:
hashlib_funcs = _HASHLIB_FUNCS[hash_algorithm]
except KeyError as e:
six.raise_from(NotImplementedError(hash_algorithm), e)

return hashlib_funcs(hashable_value).digest()


@attr.s
class HttpFetcher(object):
connect_timeout = attr.ib(default=2, validator=attr.validators.instance_of((int, float)))
read_timeout = attr.ib(default=1, validator=attr.validators.instance_of((int, float)))
retry = attr.ib(default=1, validator=attr.validators.instance_of(int))
_request_params = attr.ib(default=None, init=False)
_response = attr.ib(default=None, init=False)

def __attrs_post_init__(self):
request_params = {
'preload_content': False,
'timeout': urllib3.Timeout(connect=self.connect_timeout, read=self.read_timeout),
'retries': urllib3.Retry(
self.retry, status_forcelist=urllib3.Retry.RETRY_AFTER_STATUS_CODES | frozenset([502])
),
}

object.__setattr__(self, '_request_params', request_params)

def get_response_header(self, header_name):
if self._response is None:
raise AttributeError()

return self._response.headers.get(header_name, None)

@property
def response_data(self):
if self._response is None:
raise AttributeError()

return self._response.data

def fetch(self, url):
pool_manager = urllib3.PoolManager()

try:
self._response = pool_manager.request('GET', str(url), **self._request_params)
except BaseException as e: # pylint: disable=broad-except
if e.__class__.__name__ != 'TimeoutError' and not isinstance(e, urllib3.exceptions.HTTPError):
raise e

pool_manager.clear()

def __call__(self, url):
self.fetch(url)

return self.response_data
10 changes: 10 additions & 0 deletions test/common/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@
)


TEST_URL_PREFIX = '/'.join([
'https://gist.githubusercontent.com',
'c0r0n3r',
'54386701406df6e7299bd95c46a4c8d1',
'raw',
'e0b9cf606739d3fc1d97cc7f21e501e118bc4e07',
''
])


@attr.s
class TestJsonObjectComplex(CryptoDataParamsBase):
attr = attr.ib()
Expand Down
8 changes: 2 additions & 6 deletions test/common/test_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,10 @@ def test_der(self):
def test_digest(self):
public_key = self._get_public_key('snakeoil_cert_pubkey')
self.assertEqual(
PublicKey.get_digest(Hash.MD5, public_key.der),
public_key.get_digest(Hash.MD5),
bytes_from_hex_string('5D:B2:D9:9F:97:5C:C6:19:B3:91:7E:F8:1A:37:2C:78', ':')
)

with self.assertRaises(NotImplementedError) as context_manager:
PublicKey.get_digest(Hash.SHA2_512, public_key.der)
self.assertEqual(context_manager.exception.args, (Hash.SHA2_512, ))

def test_fingerprints(self):
public_key = self._get_public_key('snakeoil_cert_pubkey')
self.assertEqual(
Expand Down Expand Up @@ -341,7 +337,7 @@ def test_der(self):
def test_digest(self):
public_key_x509 = self._get_public_key_x509('expired.badssl.com')
self.assertEqual(
PublicKeyX509Base.get_digest(Hash.MD5, public_key_x509.der),
public_key_x509.get_digest(Hash.MD5),
bytes_from_hex_string('67:34:4E:61:C0:43:1C:F1:F7:25:7C:1D:6D:E7:A7:85', ':')
)

Expand Down
63 changes: 61 additions & 2 deletions test/common/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
# -*- coding: utf-8 -*-

import unittest
try:
import unittest
from unittest import mock
except ImportError:
import unittest2 as unittest
import mock

from test.common.classes import TEST_URL_PREFIX

import six
import urllib3

from cryptodatahub.common.utils import bytes_from_hex_string, bytes_to_hex_string, name_to_enum_item_name
from cryptodatahub.common.algorithm import Hash
from cryptodatahub.common.utils import (
HttpFetcher,
bytes_from_hex_string,
bytes_to_hex_string,
hash_bytes,
name_to_enum_item_name,
)


class TestBytesToHexString(unittest.TestCase):
Expand Down Expand Up @@ -36,6 +51,19 @@ def test_separator(self):
self.assertEqual(bytes_from_hex_string('DE:AD:BE:EF', separator=':'), b'\xde\xad\xbe\xef')


class TestHashBytes(unittest.TestCase):
def test_error_unknown_hash_algorithm(self):
with self.assertRaises(NotImplementedError) as context_manager:
hash_bytes(Hash.SHA3_512, b'')
self.assertEqual(context_manager.exception.args, (Hash.SHA3_512, ))

def test_hash(self):
self.assertEqual(
hash_bytes(Hash.SHA1, b'abc'),
b'\xA9\x99\x3E\x36\x47\x06\x81\x6A\xBA\x3E\x25\x71\x78\x50\xC2\x6C\x9C\xD0\xD8\x9D'
)


class TestNameToEnumItemName(unittest.TestCase):
def test_convert_simple_name(self):
self.assertEqual(name_to_enum_item_name('lower'), 'LOWER')
Expand All @@ -48,3 +76,34 @@ def test_convert_multipart_name(self):

def test_convert_i18n_name(self):
self.assertEqual(name_to_enum_item_name(six.ensure_text('αβγ')), six.ensure_text('ΑΒΓ'))


class TestHttpFetcher(unittest.TestCase):
@mock.patch.object(urllib3.poolmanager.PoolManager, 'request', side_effect=NotImplementedError)
def test_error_unhandaled_exception(self, _):
with self.assertRaises(NotImplementedError):
HttpFetcher()('http://example.com')

def test_error_fetch_timeout(self):
http_fetcher = HttpFetcher(
connect_timeout=0.001, read_timeout=0.001, retry=0
)
with self.assertRaises(AttributeError):
http_fetcher(TEST_URL_PREFIX + 'test.csv')
with self.assertRaises(AttributeError):
http_fetcher.get_response_header('Server')
with self.assertRaises(AttributeError):
_ = http_fetcher.response_data

def test_fetch(self):
http_fetcher = HttpFetcher()
http_fetcher.fetch(TEST_URL_PREFIX + 'test.html')
self.assertEqual(http_fetcher.get_response_header('Content-Type'), 'text/plain; charset=utf-8')
self.assertEqual(http_fetcher.response_data, b'\n'.join([
b'<!DOCTYPE html>',
b'<html>',
b' <body>',
b' Page content',
b' </body>',
b'</html>',
]))
44 changes: 3 additions & 41 deletions test/updaters/test_common.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
# -*- coding: utf-8 -*-

import urllib3

try:
import unittest
from unittest import mock
except ImportError:
import unittest2 as unittest
import mock

from test.common.classes import TEST_URL_PREFIX

import attr

from cryptodatahub.common.utils import name_to_enum_item_name

from updaters.common import FetcherBase, FetcherCsvBase, HttpFetcher


TEST_URL_PREFIX = '/'.join([
'https://gist.githubusercontent.com',
'c0r0n3r',
'54386701406df6e7299bd95c46a4c8d1',
'raw',
'e0b9cf606739d3fc1d97cc7f21e501e118bc4e07',
''
])
from updaters.common import FetcherBase, FetcherCsvBase


@attr.s
Expand Down Expand Up @@ -66,29 +54,3 @@ def test_current_data(self):
{'Col 1': 'Row 2, Col 1', 'Col 2': 'Row 2, Col 2'},
])
)


class TestHttpFetcher(unittest.TestCase):
@mock.patch.object(urllib3.poolmanager.PoolManager, 'request', side_effect=NotImplementedError)
def test_error_unhandaled_exception(self, _):
with self.assertRaises(NotImplementedError):
HttpFetcher()('http://example.com')

def test_error_fetch_timeout(self):
response = HttpFetcher(
connect_timeout=0.001, read_timeout=0.001, retry=0
)(
TEST_URL_PREFIX + 'test.csv'
)
self.assertEqual(response, None)

def test_fetch(self):
data = HttpFetcher()(TEST_URL_PREFIX + 'test.html')
self.assertEqual(data, b'\n'.join([
b'<!DOCTYPE html>',
b'<html>',
b' <body>',
b' Page content',
b' </body>',
b'</html>',
]))
35 changes: 1 addition & 34 deletions updaters/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,11 @@
from six.moves import collections_abc

import attr
import urllib3

from cryptodatahub.common.utils import HttpFetcher
from cryptodatahub.common.stores import RootCertificate


@attr.s(frozen=True)
class HttpFetcher(object):
connect_timeout = attr.ib(default=2, validator=attr.validators.instance_of((int, float)))
read_timeout = attr.ib(default=1, validator=attr.validators.instance_of((int, float)))
retry = attr.ib(default=1, validator=attr.validators.instance_of(int))
_request_params = attr.ib(default=None, init=False)

def __attrs_post_init__(self):
request_params = {
'preload_content': False,
'timeout': urllib3.Timeout(connect=self.connect_timeout, read=self.read_timeout),
'retries': urllib3.Retry(
self.retry, status_forcelist=urllib3.Retry.RETRY_AFTER_STATUS_CODES | frozenset([502])
),
}

object.__setattr__(self, '_request_params', request_params)

def __call__(self, url):
pool_manager = urllib3.PoolManager()
try:
response = pool_manager.request('GET', url, **self._request_params)
except BaseException as e: # pylint: disable=broad-except
if e.__class__.__name__ != 'TimeoutError' and not isinstance(e, urllib3.exceptions.HTTPError):
raise e

return None

pool_manager.clear()

return response.data


@attr.s(frozen=True)
class CertificatePemFetcher(object):
http_fetcher = attr.ib(
Expand Down

0 comments on commit acc33d3

Please sign in to comment.