Skip to content

Commit

Permalink
Merge branch 'release/0.4.10'
Browse files Browse the repository at this point in the history
  • Loading branch information
3c7 committed Jul 28, 2022
2 parents 67f28ee + 6eeceea commit 813e7de
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 169 deletions.
228 changes: 128 additions & 100 deletions common_osint_model/models/ssh.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import base64
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicNumbers
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
Expand All @@ -9,7 +9,7 @@
from common_osint_model.utils import hash_all


class SSHComponentAlgorithms(BaseModel, ShodanDataHandler, CensysDataHandler, BinaryEdgeDataHandler):
class SSHComponentAlgorithms(BaseModel, ShodanDataHandler, CensysDataHandler, BinaryEdgeDataHandler, Logger):
"""Represents algorithms supported by SSH server."""
encryption: Optional[List[str]]
key_exchange: Optional[List[str]]
Expand All @@ -18,40 +18,52 @@ class SSHComponentAlgorithms(BaseModel, ShodanDataHandler, CensysDataHandler, Bi
compression: Optional[List[str]]

@classmethod
def from_shodan(cls, d: Dict):
def from_shodan(cls, d: Dict) -> Union["SSHComponentAlgorithms", None]:
"""Returns an instance of this class based on Shodan data given as dictionary."""
if not isinstance(d, Dict):
raise TypeError(f"Method SSHComponentAlgorithms.from_shodan expects parameter d to be a dictionary, "
f"but it was {type(d)}.")
return SSHComponentAlgorithms(
encryption=d["ssh"]["kex"]["encryption_algorithms"],
key_exchange=d["ssh"]["kex"]["kex_algorithms"],
mac=d["ssh"]["kex"]["mac_algorithms"],
key_algorithms=d["ssh"]["kex"]["server_host_key_algorithms"],
compression=d["ssh"]["kex"]["compression_algorithms"]
)
try:
return SSHComponentAlgorithms(
encryption=d["ssh"]["kex"]["encryption_algorithms"],
key_exchange=d["ssh"]["kex"]["kex_algorithms"],
mac=d["ssh"]["kex"]["mac_algorithms"],
key_algorithms=d["ssh"]["kex"]["server_host_key_algorithms"],
compression=d["ssh"]["kex"]["compression_algorithms"]
)
except KeyError as ke:
cls.warning(f"Shodan data is missing key: {ke}")
return None

@classmethod
def from_censys(cls, d: Dict):
def from_censys(cls, d: Dict) -> Union["SSHComponentAlgorithms", None]:
"""Returns an instance of this class based on Censys data given as dictionary."""
return SSHComponentAlgorithms(
encyption=d["ssh"]["kex_init_message"]["client_to_server_ciphers"],
key_exchange=d["ssh"]["kex_init_message"]["kex_algorithms"],
mac=d["ssh"]["kex_init_message"]["server_to_client_macs"],
key_algorithms=d["ssh"]["kex_init_message"]["host_key_algorithms"],
compression=d["ssh"]["kex_init_message"]["server_to_client_compression"]
)
try:
return SSHComponentAlgorithms(
encyption=d["ssh"]["kex_init_message"]["client_to_server_ciphers"],
key_exchange=d["ssh"]["kex_init_message"]["kex_algorithms"],
mac=d["ssh"]["kex_init_message"]["server_to_client_macs"],
key_algorithms=d["ssh"]["kex_init_message"]["host_key_algorithms"],
compression=d["ssh"]["kex_init_message"]["server_to_client_compression"]
)
except KeyError as ke:
cls.warning(f"Censys data is missing key: {ke}")
return None

@classmethod
def from_binaryedge(cls, d: Dict):
def from_binaryedge(cls, d: Dict) -> Union["SSHComponentAlgorithms", None]:
"""Returns an instance of this class based on BinaryEdge data given as dictionary."""
return SSHComponentAlgorithms(
encryption=d["encryption"],
key_exchange=d["kex"],
mac=d["mac"],
key_algorithms=d["server_host_key"],
compression=d["compression"]
)
try:
return SSHComponentAlgorithms(
encryption=d["encryption"],
key_exchange=d["kex"],
mac=d["mac"],
key_algorithms=d["server_host_key"],
compression=d["compression"]
)
except KeyError as ke:
cls.warning(f"BinaryEdge data is missing key: {ke}")
return None


class SSHComponentKey(BaseModel, ShodanDataHandler, CensysDataHandler, BinaryEdgeDataHandler, Logger):
Expand All @@ -65,89 +77,101 @@ class SSHComponentKey(BaseModel, ShodanDataHandler, CensysDataHandler, BinaryEdg
murmur: Optional[str]

@classmethod
def from_shodan(cls, d: Dict):
def from_shodan(cls, d: Dict) -> Union["SSHComponentKey", None]:
"""Returns an instance of this class based on Shodan data given as dictionary."""
if not isinstance(d, Dict):
raise TypeError(f"Method SSHComponentKey.from_shodan expects parameter d to be a dictionary, but it was "
f"{type(d)}.")

key = d["ssh"]["key"]
key = base64.b64decode(key)
md5, sha1, sha256, murmur = hash_all(key)
return SSHComponentKey(
raw=d["ssh"]["key"],
type=d["ssh"]["type"],
md5=md5,
sha1=sha1,
sha256=sha256,
murmur=murmur
)
try:
key = d["ssh"]["key"]
key = base64.b64decode(key)
md5, sha1, sha256, murmur = hash_all(key)
return SSHComponentKey(
raw=d["ssh"]["key"],
type=d["ssh"]["type"],
md5=md5,
sha1=sha1,
sha256=sha256,
murmur=murmur
)
except KeyError as ke:
cls.warning(f"Shodan data is missing key: {ke}")
return None

@classmethod
def from_censys(cls, d: Dict):
def from_censys(cls, d: Dict) -> Union["SSHComponentKey", None]:
"""Returns an instance of this class based on Censys data given as dictionary."""
cls.info("Censys data does not contain the key as raw data. The public key can be constructed with given "
"data, however, currently this is only supported for RSA keys.")

if "rsa_public_key" in d["ssh"]["server_host_key"]:
cls.debug("Seems to be a RSA key. Trying to create public key from modulus and exponent.")
public_numbers = RSAPublicNumbers(
e=int.from_bytes(
base64.b64decode(d["ssh"]["server_host_key"]["rsa_public_key"]["exponent"]),
byteorder="big",
signed=False
),
n=int.from_bytes(
base64.b64decode(d["ssh"]["server_host_key"]["rsa_public_key"]["modulus"]),
byteorder="big",
signed=False
try:
if "rsa_public_key" in d["ssh"]["server_host_key"]:
cls.debug("Seems to be a RSA key. Trying to create public key from modulus and exponent.")
public_numbers = RSAPublicNumbers(
e=int.from_bytes(
base64.b64decode(d["ssh"]["server_host_key"]["rsa_public_key"]["exponent"]),
byteorder="big",
signed=False
),
n=int.from_bytes(
base64.b64decode(d["ssh"]["server_host_key"]["rsa_public_key"]["modulus"]),
byteorder="big",
signed=False
)
)
)
public_key = public_numbers.public_key()
public_key_string = public_key.public_bytes(
encoding=Encoding.OpenSSH,
format=PublicFormat.OpenSSH
).decode("utf-8")
cls.debug(f"Created public key from modulus and exponent: {public_key_string}")
public_key_b64 = public_key_string.split(" ", maxsplit=1)[1]
public_key_raw_data = base64.b64decode(public_key_b64)
public_key = public_numbers.public_key()
public_key_string = public_key.public_bytes(
encoding=Encoding.OpenSSH,
format=PublicFormat.OpenSSH
).decode("utf-8")
cls.debug(f"Created public key from modulus and exponent: {public_key_string}")
public_key_b64 = public_key_string.split(" ", maxsplit=1)[1]
public_key_raw_data = base64.b64decode(public_key_b64)
md5, sha1, sha256, murmur = hash_all(public_key_raw_data)
return SSHComponentKey(
raw=public_key_b64,
type="ssh-rsa",
md5=md5,
sha1=sha1,
sha256=sha256,
murmur=murmur
)
else:
key_type = "unknown"
for key in d["ssh"]["server_host_key"].keys():
if "public_key" in key:
key_type = key.replace("_public_key", "")

cls.info(f"SSH key type is {key_type}. Currently, only RSA SSH keys are supported in Censys model.")
return SSHComponentKey(
type=key_type,
sha256=d["ssh"]["server_host_key"]["fingerprint_sha256"]
)
except KeyError as ke:
cls.warning(f"Censys data is missing key: {ke}")
return None

@classmethod
def from_binaryedge(cls, d: Dict) -> Union["SSHComponentKey", None]:
"""Returns an instance of this class based on BinaryEdge data given as dictionary."""
try:
public_key_raw_data = base64.b64decode(d["key"])
md5, sha1, sha256, murmur = hash_all(public_key_raw_data)
return SSHComponentKey(
raw=public_key_b64,
type="ssh-rsa",
raw=d["key"],
type=d["cypher"],
md5=md5,
sha1=sha1,
sha256=sha256,
murmur=murmur
)
else:
key_type = "unknown"
for key in d["ssh"]["server_host_key"].keys():
if "public_key" in key:
key_type = key.replace("_public_key", "")

cls.info(f"SSH key type is {key_type}. Currently, only RSA SSH keys are supported in Censys model.")
return SSHComponentKey(
type=key_type,
sha256=d["ssh"]["server_host_key"]["fingerprint_sha256"]
)

@classmethod
def from_binaryedge(cls, d: Dict):
"""Returns an instance of this class based on BinaryEdge data given as dictionary."""
public_key_raw_data = base64.b64decode(d["key"])
md5, sha1, sha256, murmur = hash_all(public_key_raw_data)
return SSHComponentKey(
raw=d["key"],
type=d["cypher"],
md5=md5,
sha1=sha1,
sha256=sha256,
murmur=murmur
)
except KeyError as ke:
cls.warning(f"BinaryEdge data is missing key: {ke}")
return None


class SSHComponent(BaseModel, ShodanDataHandler, CensysDataHandler, BinaryEdgeDataHandler):
class SSHComponent(BaseModel, ShodanDataHandler, CensysDataHandler, BinaryEdgeDataHandler, Logger):
"""Represents the SSH component of services."""
algorithms: Optional[SSHComponentAlgorithms]
key: Optional[SSHComponentKey]
Expand All @@ -173,16 +197,20 @@ def from_censys(cls, d: Dict):
)

@classmethod
def from_binaryedge(cls, d: Dict):
def from_binaryedge(cls, d: Dict) -> Union["SSHComponent", None]:
"""Creates an instance of this class based on BinaryEdge data given as dictionary."""
cyphers = d["result"]["data"]["cyphers"]
algorithms = d["result"]["data"]["algorithms"]
cypher = None
for c in cyphers:
if c["cypher"] == "ssh-dss":
continue
cypher = c
return SSHComponent(
algorithms=SSHComponentAlgorithms.from_binaryedge(algorithms),
key=SSHComponentKey.from_binaryedge(cypher)
)
try:
cyphers = d["result"]["data"]["cyphers"]
algorithms = d["result"]["data"]["algorithms"]
cypher = None
for c in cyphers:
if c["cypher"] == "ssh-dss":
continue
cypher = c
return SSHComponent(
algorithms=SSHComponentAlgorithms.from_binaryedge(algorithms),
key=SSHComponentKey.from_binaryedge(cypher)
)
except KeyError as ke:
cls.warning(f"BinaryEdge data is missing key: {ke}")
return None
36 changes: 31 additions & 5 deletions common_osint_model/models/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ def from_binaryedge(cls, d: Union[Dict, List]):

class TLSComponent(BaseModel, ShodanDataHandler, CensysDataHandler, BinaryEdgeDataHandler, Logger):
"""Represents the TLS component of services."""
certificate: TLSComponentCertificate
certificate: Optional[TLSComponentCertificate]
ja3: Optional[str]
ja3s: Optional[str]
jarm: Optional[str]

# Todo: Add other attributes relevant to TLS such as CipherSuits etc.

Expand All @@ -331,15 +334,19 @@ def from_shodan(cls, d: Dict):
f"but it was {type(d)}.")

return TLSComponent(
certificate=TLSComponentCertificate.from_shodan(d)
certificate=TLSComponentCertificate.from_shodan(d),
ja3s=d.get("ssl", {}).get("ja3s", None),
jarm=d.get("ssl", {}).get("jarm", None)
)

@classmethod
def from_censys(cls, d: Dict):
try:
tls = d["tls"]
return TLSComponent(
certificate=TLSComponentCertificate.from_censys(tls["certificates"]["leaf_data"])
certificate=TLSComponentCertificate.from_censys(tls["certificates"]["leaf_data"]),
ja3s=tls.get("ja3s", None),
jarm=d.get("jarm", {}).get("fingerprint", None)
)
except KeyError as e:
cls.error(f"Exception during certificate data extraction. "
Expand All @@ -349,7 +356,26 @@ def from_censys(cls, d: Dict):
@classmethod
def from_binaryedge(cls, d: Union[Dict, List]):
"""Creates an instance of this class based on BinaryEdge data given as dictionary."""
certificate_chain = d["result"]["data"]["cert_info"]["certificate_chain"]
data = d.get("result", {}).get("data", None)
if not data:
cls.error("No data key available in binary edge dictionary. Returning None...")
return None

cert = None
if "cert_info" in data:
cert = TLSComponentCertificate.from_binaryedge(
data["cert_info"]["certificate_chain"][0]
)
ja3 = None
if "server_info" in data:
ja3 = data["server_info"]["ja3_digest"]

jarm = None
if "jarm_hash" in data:
jarm = data["jarm_hash"]

return TLSComponent(
certificate=TLSComponentCertificate.from_binaryedge(certificate_chain[0])
certificate=cert,
ja3=ja3,
jarm=jarm
)
Loading

0 comments on commit 813e7de

Please sign in to comment.