-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
27 changed files
with
4,467 additions
and
58 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -140,3 +140,4 @@ cython_debug/ | |
# Project files | ||
.idea | ||
|
||
/test_data/test.json |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,17 @@ | ||
from common_osint_model.shodan import from_shodan, from_shodan_flattened | ||
from common_osint_model.censys import ( | ||
from common_osint_model.censys.v1 import ( | ||
from_censys_ipv4, | ||
from_censys_ipv4_flattened, | ||
from_censys_certificates, | ||
from_censys_certificates_flattened | ||
) | ||
from common_osint_model.censys.v2 import from_censys, from_censys_flattened | ||
from common_osint_model.certificate import from_x509_pem, from_x509_pem_flattened | ||
|
||
from common_osint_model.models.host import Host | ||
from common_osint_model.models.domain import Domain | ||
from common_osint_model.models.service import Service | ||
from common_osint_model.models.autonomous_system import AutonomousSystem | ||
from common_osint_model.models.http import * | ||
from common_osint_model.models.tls import * | ||
from common_osint_model.models.ssh import * |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
import warnings | ||
from typing import Dict, List, Any | ||
|
||
from DateTime import DateTime | ||
from mmh3 import hash as mmh3_hash | ||
|
||
from common_osint_model.utils import sha256_from_body_string, flatten | ||
|
||
|
||
def from_censys(raw: Dict) -> Dict: | ||
""" | ||
Convert Censys data model to the common data model. | ||
:param raw: Censys Search 2.0 dictionary | ||
""" | ||
warnings.warn("This function was deprecated in v0.4.0.", DeprecationWarning) | ||
common = {} | ||
common.update( | ||
censys_meta_extraction(raw) | ||
) | ||
|
||
for service in raw.get("services", []): | ||
common.update(censys_extract_service(service)) | ||
return common | ||
|
||
|
||
def from_censys_flattened(raw: Dict) -> Dict: | ||
warnings.warn("This function was deprecated in v0.4.0.", DeprecationWarning) | ||
return flatten(from_censys(raw)) | ||
|
||
|
||
def censys_meta_extraction(raw: Dict) -> Dict: | ||
""" | ||
Returns all metadata. | ||
:param raw: Censys Search 2.0 dictionary | ||
""" | ||
_as = raw.get("autonomous_system", {}) | ||
return { | ||
"ip": raw.get("ip", "None"), | ||
"as": { | ||
"name": _as.get("name", "None"), | ||
"number": _as.get("asn", "None"), | ||
"description": _as.get("description", "None"), | ||
"location": _as.get("country_code", "None"), | ||
"prefix": _as.get("bgp_prefix", "None") | ||
}, | ||
"location": { | ||
**raw.get("location", {}) | ||
}, | ||
"ports": [service["port"] for service in raw.get("services", [])] | ||
} | ||
|
||
|
||
def censys_extract_service(service: Dict) -> Dict: | ||
""" | ||
Extracts relevant information from a service object/dict. | ||
:param service: Censys Search 2.0 service dictionary | ||
""" | ||
port = service["port"] | ||
timestamp = service.get("observed_at", None) | ||
if timestamp: | ||
timestamp = DateTime(timestamp) | ||
s_common = { | ||
"banner": service.get("banner", None), | ||
"timestamp": int(timestamp), | ||
"timestamp_readable": timestamp.ISO8601() | ||
} | ||
if "http" in service: | ||
s_common.update({"http": censys_extract_http_service(service)}) | ||
if "tls" in service: | ||
s_common.update({"tls": censys_extract_tls_service(service)}) | ||
return { | ||
port: s_common | ||
} | ||
|
||
|
||
def censys_extract_http_service(service: Dict) -> Dict: | ||
"""Extracts relevant http service fields. | ||
:param service: Censys Search 2.0 service dictionary | ||
""" | ||
s_http = {} | ||
res = service.get("http", {}).get("response", None) | ||
if not res: | ||
return {} | ||
|
||
headers = res.get("headers", None) | ||
if headers: | ||
s_http["headers"] = {} | ||
for k, v in headers.items(): | ||
if k == "_encoding": | ||
continue | ||
|
||
s_http["headers"][k.lower()] = v[0] | ||
s_http["content"] = { | ||
"html": res.get("body"), | ||
"hash": { | ||
"shodan": mmh3_hash(res.get("body", None) or ""), | ||
"sha256": sha256_from_body_string(res.get("body", None) or ""), | ||
"censys": res.get("body_hash", None) | ||
} | ||
} | ||
return s_http | ||
|
||
|
||
def censys_extract_tls_service(service: Dict) -> Dict: | ||
"""Extracts relevant tls service fields. | ||
:param service: Censys Search 2.0 service dictionary | ||
""" | ||
s_tls = {} | ||
cert = service.get("tls", {}).get("certificates", {}).get("leaf_data", None) | ||
c_issuer = cert.get("issuer", None) or dict() | ||
c_subject = cert.get("subject", None) or dict() | ||
common_name = c_subject.get("common_name", []) | ||
common_name.extend(cert.get("names", [])) | ||
if len(common_name) == 0: | ||
common_name = None | ||
else: | ||
common_name = sorted(list(set(common_name))) | ||
if not cert: | ||
return {} | ||
|
||
s_tls["certificate"] = { | ||
"issuer_dn": cert.get("issuer_dn", None), | ||
"subject_dn": cert.get("subject_dn", None), | ||
"issuer": { | ||
"common_name": _first_or_none(c_issuer["common_name"]), | ||
# MISSING! "country": _first_or_none(c_issuer["country"]), | ||
"locality": _first_or_none(c_issuer["locality"]), | ||
"province": _first_or_none(c_issuer["province"]), | ||
"organization": _first_or_none(c_issuer["organization"]), | ||
"organizational_unit": _first_or_none(c_issuer["organizational_unit"]), | ||
# MISSING! "email_address": _first_or_none(c_issuer["email_address"]), | ||
}, | ||
"subject": { | ||
"common_name": common_name, | ||
# MISSING! "country": _first_or_none(c_issuer["country"]), | ||
"locality": _first_or_none(c_subject["locality"]), | ||
"province": _first_or_none(c_subject["province"]), | ||
"organization": _first_or_none(c_subject["organization"]), | ||
"organizational_unit": _first_or_none(c_subject["organizational_unit"]), | ||
# MISSING! "email_address": _first_or_none(c_subject["email_address"]), | ||
}, | ||
"fingerprint": { | ||
"sha256": cert.get("fingerprint", None) | ||
} | ||
} | ||
return s_tls | ||
|
||
|
||
def censys_extract_ssh(service: Dict) -> Dict: | ||
""" | ||
Extracts relevant ssh service fields. | ||
:param service: Censys Search 2.0 service dictionary | ||
""" | ||
s_ssh = {} | ||
|
||
|
||
def _first_or_none(l: List) -> Any: | ||
"""Returns first element of list or none, if list is empty.""" | ||
if not l: | ||
return None | ||
if len(l) > 0: | ||
return l[0] | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from abc import ABC | ||
from logging import getLogger, basicConfig | ||
from typing import Dict, List, Union | ||
|
||
basicConfig(level="INFO") | ||
|
||
|
||
class Logger(ABC): | ||
"""Abstract class which implements just an info method printing a message to stdout via Logger class.""" | ||
|
||
@classmethod | ||
def info(cls, message: str): | ||
logger = getLogger(cls.__name__) | ||
logger.info(message) | ||
|
||
@classmethod | ||
def debug(cls, message: str): | ||
logger = getLogger(cls.__name__) | ||
logger.debug(message) | ||
|
||
|
||
class ShodanDataHandler(ABC): | ||
"""Abstract base class indicating that a class implements from_shodan().""" | ||
|
||
@classmethod | ||
def from_shodan(cls, d: Dict): | ||
pass | ||
|
||
|
||
class CensysDataHandler(ABC): | ||
"""Abstract base class indicating that a class implements from_censys().""" | ||
|
||
@classmethod | ||
def from_censys(cls, d: Dict): | ||
pass | ||
|
||
|
||
class BinaryEdgeDataHandler(ABC): | ||
"""Abstract base class indicating that a class implements from_binaryedge().""" | ||
|
||
@classmethod | ||
def from_binaryedge(cls, d: Union[Dict, List]): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import ipaddress | ||
from typing import Dict, List, Optional | ||
|
||
from pydantic import BaseModel, validator | ||
|
||
from common_osint_model.models import ShodanDataHandler, CensysDataHandler, Logger | ||
|
||
|
||
class AutonomousSystem(BaseModel, ShodanDataHandler, CensysDataHandler, Logger): | ||
"""Represents an autonomous system""" | ||
number: int | ||
name: str | ||
country: Optional[str] | ||
prefix: Optional[str] | ||
source: str | ||
|
||
@validator("prefix") | ||
def validate_prefix(cls, v): | ||
if not v: | ||
return v | ||
try: | ||
ipaddress.ip_network(v) | ||
except Exception as e: | ||
raise ValueError(f"Prefix given could not be parsed by ipaddress module. Likely \"{v}\" has a " | ||
f"wrong format: {e}") | ||
return v | ||
|
||
@classmethod | ||
def from_shodan(cls, d: Dict): | ||
"""Creates an instance of this class using a typical Shodan dictionary.""" | ||
if isinstance(d, List): | ||
cls.debug("Got a list instead of a dictionary. Usually multiple services of the same host are represented" | ||
" as multiple list items by shodan, so this should not be a problem as the AS is the same for all." | ||
" Using the first item.") | ||
d = d[0] | ||
return AutonomousSystem( | ||
number=int(d.get("asn").replace("AS", "")), | ||
name=d.get("isp"), | ||
country=d.get("location", {}).get("country_code", None), | ||
prefix=None, # Not available in Shodan data | ||
source="shodan" | ||
) | ||
|
||
@classmethod | ||
def from_censys(cls, d: Dict): | ||
return AutonomousSystem( | ||
number=d["autonomous_system"]["asn"], | ||
name=d["autonomous_system"]["name"], | ||
country=d["autonomous_system"]["country_code"], | ||
prefix=d["autonomous_system"]["bgp_prefix"], | ||
source="censys" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from datetime import datetime | ||
from typing import Optional | ||
|
||
from pydantic import BaseModel | ||
|
||
|
||
class Domain(BaseModel): | ||
"""Represents a domain pointing to a specific host.""" | ||
domain: str | ||
first_seen: datetime = datetime.utcnow() | ||
last_seen: datetime = datetime.utcnow() | ||
source: Optional[str] | ||
type: Optional[str] |
Oops, something went wrong.