Skip to content

Commit

Permalink
Merge pull request #22 from PitterPatterPython/timeout-param
Browse files Browse the repository at this point in the history
add optional time parameter. linting.
  • Loading branch information
robd518 authored Mar 7, 2025
2 parents 15610b9 + 3693d81 commit 3563dd8
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 41 deletions.
23 changes: 13 additions & 10 deletions ppp_connectors/broker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
from typing import Callable, Dict, Any, List
import requests
from requests.auth import HTTPBasicAuth
Expand All @@ -15,7 +14,8 @@ def make_request(
auth: HTTPBasicAuth = None,
params: Dict[str, Any] = None,
data: Dict[str, Any] = None,
json: Dict[str, Any] = None
json: Dict[str, Any] = None,
timeout: int = 15
) -> requests.Response:
"""Perform an HTTP request on behalf of a calling function
Expand Down Expand Up @@ -62,11 +62,14 @@ def make_request(
if not request_func:
raise ValueError(f'Unsupported HTTP method: {method}')

return request_func(url,
headers=headers,
auth=auth,
params=params,
data=data,
json=json,
proxies=proxies,
verify=verify)
return request_func(
url,
headers=headers,
auth=auth,
params=params,
data=data,
json=json,
proxies=proxies,
verify=verify,
timeout=timeout
)
37 changes: 31 additions & 6 deletions ppp_connectors/flashpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

env_config: Dict[str, Any] = combine_env_configs()


def flashpoint_search_communities(query: str, **kwargs: Dict[str, Any]) -> Response:
"""Communities Search allows search requests over article and conversation data.
Article data is made up of things like blogs and paste sites. Conversation data
Expand Down Expand Up @@ -37,10 +38,17 @@ def flashpoint_search_communities(query: str, **kwargs: Dict[str, Any]) -> Respo
**kwargs
}

result: Response = make_request(method=method, url=url, headers=headers, json=payload)
result: Response = make_request(
method=method,
url=url,
headers=headers,
json=payload,
timeout=kwargs.get("timeout")
)

return result


def flashpoint_search_media(query: str, **kwargs: Dict[str, Any]) -> Response:
"""Media search allows search requests over our media data, specifically
media that have been through our Optical Character Recogintion (OCR) process.
Expand Down Expand Up @@ -73,10 +81,17 @@ def flashpoint_search_media(query: str, **kwargs: Dict[str, Any]) -> Response:
**kwargs
}

result: Response = make_request(method=method, url=url, headers=headers, json=payload)
result: Response = make_request(
method=method,
url=url,
headers=headers,
json=payload,
timeout=kwargs.get("timeout")
)

return result


def flashpoint_get_media_object(id: str) -> Response:
"""Media ID request allows users to directly lookup the document based on the media ID provided.
Expand All @@ -102,10 +117,15 @@ def flashpoint_get_media_object(id: str) -> Response:
'Authorization': f'Bearer {env_config["FLASHPOINT_API_KEY"]}'
}

result: Response = make_request(method=method, url=url, headers=headers)
result: Response = make_request(
method=method,
url=url,
headers=headers
)

return result


def flashpoint_get_media_image(storage_uri: str) -> Response:
"""Download the media from a media object by its storage_uri field
Expand All @@ -124,7 +144,7 @@ def flashpoint_get_media_image(storage_uri: str) -> Response:
check_required_env_vars(env_config, required_vars)

method: str = 'get'
url: str = f'https://api.flashpoint.io/sources/v1/media/'
url: str = 'https://api.flashpoint.io/sources/v1/media/'
headers: Dict = {
'accept': 'application/json',
'content-type': 'application/json',
Expand All @@ -135,6 +155,11 @@ def flashpoint_get_media_image(storage_uri: str) -> Response:
"asset_id": storage_uri
}

result: Response = make_request(method=method, url=url, headers=headers, params=params)
result: Response = make_request(
method=method,
url=url,
headers=headers,
params=params
)

return result
return result
6 changes: 4 additions & 2 deletions ppp_connectors/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import date, datetime
from datetime import datetime
from dotenv import dotenv_values, find_dotenv
import os
import sys
Expand All @@ -24,6 +24,7 @@ def check_required_env_vars(config: Dict[str, str], required_vars: List[str]) ->
'system\'s environment variables.', file=sys.stderr)
sys.exit(1)


def combine_env_configs() -> Dict[str, Any]:
"""Find a .env file if it exists, and combine it with system environment
variables to form a "combined_config" dictionary of environment variables
Expand All @@ -39,6 +40,7 @@ def combine_env_configs() -> Dict[str, Any]:

return combined_config


def validate_date_string(date_str: str) -> bool:
"""Validates that a date string is, well, a valid date string
Expand All @@ -52,4 +54,4 @@ def validate_date_string(date_str: str) -> bool:
datetime.strptime(date_str, "%Y-%m-%d")
return True
except ValueError:
return False
return False
13 changes: 10 additions & 3 deletions ppp_connectors/ipqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

env_config: Dict[str, Any] = combine_env_configs()


def ipqs_malicious_url(query: str, **kwargs: Dict[str, Any]) -> Response:
"""IPQualityScore's Malicious URL Scanner API scans links in real-time
to detect suspicious URLs. Accurately identify phishing links, malware
Expand All @@ -29,7 +30,7 @@ def ipqs_malicious_url(query: str, **kwargs: Dict[str, Any]) -> Response:
check_required_env_vars(env_config, required_vars)

method: str = 'post'
url: str = f'https://ipqualityscore.com/api/json/url'
url: str = 'https://ipqualityscore.com/api/json/url'
headers: Dict = {'accept': 'application/json'}
encoded_query: str = quote(query)
params: Dict = {
Expand All @@ -38,6 +39,12 @@ def ipqs_malicious_url(query: str, **kwargs: Dict[str, Any]) -> Response:
**kwargs
}

result: Response = make_request(method=method, url=url, headers=headers, json=params)
result: Response = make_request(
method=method,
url=url,
headers=headers,
json=params,
timeout=kwargs.get("timeout")
)

return result
return result
34 changes: 26 additions & 8 deletions ppp_connectors/spycloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def spycloud_sip_cookie_domains(cookie_domains: str, **kwargs: Dict[str, Any]) -
return result


def spycloud_ato_breach_catalog(query:str, **kwargs: Dict[str, Any]) -> Response:
def spycloud_ato_breach_catalog(query: str, **kwargs: Dict[str, Any]) -> Response:
"""List or Query the Breach Catalog
Args:
Expand All @@ -61,7 +61,7 @@ def spycloud_ato_breach_catalog(query:str, **kwargs: Dict[str, Any]) -> Response
check_required_env_vars(env_config, required_vars)

method: str = 'get'
url: str = f'https://api.spycloud.io/sp-v2/breach/catalog'
url: str = 'https://api.spycloud.io/sp-v2/breach/catalog'
headers: Dict = {
'accept': 'application/json',
'x-api-key': env_config['SPYCLOUD_API_ATO_KEY']
Expand All @@ -71,12 +71,18 @@ def spycloud_ato_breach_catalog(query:str, **kwargs: Dict[str, Any]) -> Response
**kwargs
}

result: Response = make_request(method=method, url=url, headers=headers, params=params)
result: Response = make_request(
method=method,
url=url,
headers=headers,
params=params,
timeout=kwargs.get("timeout")
)

return result


def spycloud_ato_search(search_type: str, query:str, **kwargs: Dict[str, Any]) -> Response:
def spycloud_ato_search(search_type: str, query: str, **kwargs: Dict[str, Any]) -> Response:
"""Perform search against Spycloud's Consumer ATO API to query its vast collection of
breach records and surrounding metadata
Expand All @@ -97,7 +103,7 @@ def spycloud_ato_search(search_type: str, query:str, **kwargs: Dict[str, Any]) -

# These are valid endpoints and their corresponding full URLs. We'll use these
# to check that the user passed a valid 'search_type' parameter
base_url: str = f'https://api.spycloud.io/sp-v2/breach/data'
base_url: str = 'https://api.spycloud.io/sp-v2/breach/data'
valid_endpoints: Dict[str, str] = {
'domain': f'{base_url}/domains',
'email': f'{base_url}/emails',
Expand All @@ -121,12 +127,18 @@ def spycloud_ato_search(search_type: str, query:str, **kwargs: Dict[str, Any]) -
}
params: Dict = dict(kwargs)

result: Response = make_request(method=method, url=url, headers=headers, params=params)
result: Response = make_request(
method=method,
url=url,
headers=headers,
params=params,
timeout=kwargs.get("timeout")
)

return result


def spycloud_inv_search(search_type: str, query:str, **kwargs: Dict[str, Any]) -> Response:
def spycloud_inv_search(search_type: str, query: str, **kwargs: Dict[str, Any]) -> Response:
"""Perform search against Spycloud's Investigations API to query its vast collection of
breach records and surrounding metadata
Expand Down Expand Up @@ -185,6 +197,12 @@ def spycloud_inv_search(search_type: str, query:str, **kwargs: Dict[str, Any]) -
}
params: Dict = dict(kwargs)

result: Response = make_request(method=method, url=url, headers=headers, params=params)
result: Response = make_request(
method=method,
url=url,
headers=headers,
params=params,
timeout=kwargs.get("timeout")
)

return result
26 changes: 19 additions & 7 deletions ppp_connectors/twilio.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

env_config: Dict[str, Any] = combine_env_configs()

def twilio_lookup(phone_number: str, data_packages: list=[], **kwargs: Dict[str, Any]) -> Response:

def twilio_lookup(phone_number: str, data_packages: list = [], **kwargs: Dict[str, Any]) -> Response:
"""query information on a phone number so that you can make a trusted interaction with your user.
With this endpoint, you can format and validate phone numbers with the free Basic Lookup request
and add on data packages to get even more in-depth carrier and caller information.
Expand Down Expand Up @@ -39,7 +40,7 @@ def twilio_lookup(phone_number: str, data_packages: list=[], **kwargs: Dict[str,
'line_type_intelligence', 'identity_match', 'reassigned_number',
'sms_pumping_risk', 'phone_number_quality_score', 'pre_fill'}
data_packages_set: Set = set(data_packages)
invalid_packages = data_packages_set - valid_data_packages
invalid_packages = data_packages_set - valid_data_packages
if len(invalid_packages) != 0:
print(f'[!] Error: "{", ".join(invalid_packages)}" are not valid data packages. Valid '
f'packages include {", ".join(valid_data_packages)}', file=sys.stderr)
Expand All @@ -55,12 +56,19 @@ def twilio_lookup(phone_number: str, data_packages: list=[], **kwargs: Dict[str,
**kwargs
}

result: Response = make_request(method=method, url=url, auth=auth, params=params)
result: Response = make_request(
method=method,
url=url,
auth=auth,
params=params,
timeout=kwargs.get("timeout")
)

return result


def twilio_usage_report(start_date: Union[str, date],
end_date: Optional[Union[str, date]]=None) -> Response:
end_date: Optional[Union[str, date]] = None) -> Response:
"""Return a usage report for all activities between the start_date and end_date.
Args:
Expand Down Expand Up @@ -91,7 +99,6 @@ def twilio_usage_report(start_date: Union[str, date],
'does not match the format YYYY-MM-DD')
sys.exit()


method: str = 'get'
url: str = f'https://api.twilio.com/2010-04-01/Accounts/{env_config["TWILIO_ACCOUNT_SID"]}/Usage/Records.json'

Expand All @@ -102,6 +109,11 @@ def twilio_usage_report(start_date: Union[str, date],
'EndDate': end_date
}

result: Response = make_request(method=method, url=url, auth=auth, params=params)
result: Response = make_request(
method=method,
url=url,
auth=auth,
params=params
)

return result
return result
Loading

0 comments on commit 3563dd8

Please sign in to comment.