Skip to content

Commit

Permalink
Merge pull request #21 from PitterPatterPython/spycloud-ato-refactor
Browse files Browse the repository at this point in the history
Spycloud ato refactor
  • Loading branch information
robd518 authored Oct 23, 2024
2 parents e234e69 + fa1bbaa commit 15610b9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 104 deletions.
144 changes: 41 additions & 103 deletions ppp_connectors/spycloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def spycloud_sip_cookie_domains(cookie_domains: str, **kwargs: Dict[str, Any]) -
result in only that specific cookie subdomain returned.
Returns:
Response: requests.Response json response from the request
Response: requests.Response object from the request
"""

# Define required environment variables
Expand All @@ -41,45 +41,15 @@ def spycloud_sip_cookie_domains(cookie_domains: str, **kwargs: Dict[str, Any]) -

return result

def spycloud_ato_email(email_addresses:str, **kwargs: Dict[str, Any]) -> Response:
"""Return account takeover (ATO) breach data related to a comma-separated string of emails

Args:
email_addresses (str): a comma-separated list of email addresses (limit of 10 at a time)
Returns:
Response: requests.Respone json response from the request
"""

# Define required environment variables
required_vars: List[str] = [
'SPYCLOUD_API_ATO_KEY'
]

# Check and ensure that required variables are present, exits if not
check_required_env_vars(env_config, required_vars)

method: str = 'get'
url: str = f'https://api.spycloud.io/sp-v2/breach/data/emails/{email_addresses}'
headers: Dict = {
'accept': 'application/json',
'x-api-key': env_config['SPYCLOUD_API_ATO_KEY']
}
params: Dict = dict(kwargs)

result: Response = make_request(method=method, url=url, headers=headers, params=params)

return result

def spycloud_ato_ip(ip_address:str, **kwargs: Dict[str, Any]) -> Response:
"""Return account takeover (ATO) breach data related to an IP address
def spycloud_ato_breach_catalog(query:str, **kwargs: Dict[str, Any]) -> Response:
"""List or Query the Breach Catalog
Args:
ip_address (str): IP address or network CIDR notation to search \
for. For CIDR notation, use an underscore instead of a slash.
query (str): Query value to search the breach catalog for.
Returns:
Response: requests.Respone json response from the request
Response: requests.Response object from the request
"""

# Define required environment variables
Expand All @@ -91,70 +61,60 @@ def spycloud_ato_ip(ip_address:str, **kwargs: Dict[str, Any]) -> Response:
check_required_env_vars(env_config, required_vars)

method: str = 'get'
url: str = f'https://api.spycloud.io/sp-v2/breach/data/ips/{ip_address}'
url: str = f'https://api.spycloud.io/sp-v2/breach/catalog'
headers: Dict = {
'accept': 'application/json',
'x-api-key': env_config['SPYCLOUD_API_ATO_KEY']
}
params: Dict = dict(kwargs)
params: Dict = {
'query': query,
**kwargs
}

result: Response = make_request(method=method, url=url, headers=headers, params=params)

return result

def spycloud_ato_username(username:str, **kwargs: Dict[str, Any]) -> Response:
"""Return account takeover (ATO) breach data related to a username

def spycloud_ato_search(search_type: str, query:str, **kwargs: Dict[str, Any]) -> Response:
"""Perform search against Spycloud's Consumer ATO API to query its vast collection of
breach records and surrounding metadata
Args:
username (str): Username you wish to search for. You can also \
search for the sha1, sha256, or sha512 hash of the username.
search_type (str): can be one of domain, email, ip, username, or phone-number
query (str): the search query
Returns:
Response: requests.Respone json response from the request
Response: requests.Response object from the request
"""

# Define required environment variables
required_vars: List[str] = [
'SPYCLOUD_API_ATO_KEY'
]

# Check and ensure that required variables are present, exits if not
check_required_env_vars(env_config, required_vars)

method: str = 'get'
url: str = f'https://api.spycloud.io/sp-v2/breach/data/usernames/{username}'
headers: Dict = {
'accept': 'application/json',
'x-api-key': env_config['SPYCLOUD_API_ATO_KEY']
# These are valid endpoints and their corresponding full URLs. We'll use these
# to check that the user passed a valid 'search_type' parameter
base_url: str = f'https://api.spycloud.io/sp-v2/breach/data'
valid_endpoints: Dict[str, str] = {
'domain': f'{base_url}/domains',
'email': f'{base_url}/emails',
'ip': f'{base_url}/ips',
'username': f'{base_url}/usernames',
'phone-number': f'{base_url}/phone-numbers',
}
params: Dict = dict(kwargs)

result: Response = make_request(method=method, url=url, headers=headers, params=params)

return result

def spycloud_ato_phone_number(phone_number:str, **kwargs: Dict[str, Any]) -> Response:
"""Return account takeover (ATO) breach data related to a phone number
Args:
username (str): phone number you wish to search for. Must only be \
numerical values of length 7 to 15 characters. You can also \
search for the sha1, sha256, or sha512 hash of the phone number.
Returns:
Response: requests.Response json response from the request
"""

# Define required environment variables
required_vars: List[str] = [
'SPYCLOUD_API_ATO_KEY'
]

# Check and ensure that required variables are present, exits if not
check_required_env_vars(env_config, required_vars)
# Completely exit if they supply an invalid search_type
if search_type not in valid_endpoints:
print(f'[!] Error: "{search_type}" is not a valid search type. Must be one of '
f'{", ".join(valid_endpoints.keys())}', file=sys.stderr)
sys.exit(1)

method: str = 'get'
url: str = f'https://api.spycloud.io/sp-v2/breach/data/phone-numbers/{phone_number}'
url: str = f'{valid_endpoints[search_type]}/{query}'

headers: Dict = {
'accept': 'application/json',
'x-api-key': env_config['SPYCLOUD_API_ATO_KEY']
Expand All @@ -165,43 +125,21 @@ def spycloud_ato_phone_number(phone_number:str, **kwargs: Dict[str, Any]) -> Res

return result

def spycloud_ato_breach_catalog(query:str, **kwargs: Dict[str, Any]) -> Response:
"""List or Query the Breach Catalog

def spycloud_inv_search(search_type: str, query:str, **kwargs: Dict[str, Any]) -> Response:
"""Perform search against Spycloud's Investigations API to query its vast collection of
breach records and surrounding metadata
Args:
query (str): Query value to search the breach catalog for.
search_type (str): can be one of domain, email, ip, infected-machine-id, log-id,
password, username, email-username, phone-number, social-handle, bank-number,
cc-number, drivers-license, national-id, passport-number, or ssn
query (str): the search query
Returns:
Response: requests.Response json response from the request
Response: requests.Response object from the request
"""

# Define required environment variables
required_vars: List[str] = [
'SPYCLOUD_API_ATO_KEY'
]

# Check and ensure that required variables are present, exits if not
check_required_env_vars(env_config, required_vars)

method: str = 'get'
url: str = f'https://api.spycloud.io/sp-v2/breach/catalog'
headers: Dict = {
'accept': 'application/json',
'x-api-key': env_config['SPYCLOUD_API_ATO_KEY']
}
params: Dict = {
'query': query,
**kwargs
}

result: Response = make_request(method=method, url=url, headers=headers, params=params)

return result

def spycloud_inv_search(search_type: str, query:str, **kwargs: Dict[str, Any]) -> Response:

# Define required environment variables
required_vars: List[str] = [
'SPYCLOUD_API_INV_KEY'
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "ppp-connectors"
packages = [{ include = "ppp_connectors" }]
version = "0.4.0"
version = "0.4.1"
description = "A simple, lightweight set of connectors and functions to various APIs, controlled by a central broker."
authors = [
"Rob D'Aveta <[email protected]>",
Expand Down

0 comments on commit 15610b9

Please sign in to comment.