-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #17 from PitterPatterPython/urlscan-and-ipqualitys…
…core-connectors Urlscan and ipqualityscore connectors
- Loading branch information
Showing
5 changed files
with
167 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from typing import Dict, Any, List | ||
from requests import Response | ||
from urllib.parse import quote | ||
from .broker import make_request | ||
from .helpers import check_required_env_vars, combine_env_configs | ||
|
||
env_config: Dict[str, Any] = combine_env_configs() | ||
|
||
def ipqs_malicious_url(query: str, **kwargs: Dict[str, Any]) -> Response: | ||
"""IPQualityScore's Malicious URL Scanner API scans links in real-time | ||
to detect suspicious URLs. Accurately identify phishing links, malware | ||
URLs and viruses, parked domains, and suspicious URLs with real-time risk | ||
scores. Industry leading phishing detection and domain reputation provide | ||
better signals for more accurate decision making. | ||
Args: | ||
query (str): The URL to scan | ||
Returns: | ||
Response: requests.Response json response from the request | ||
""" | ||
|
||
# Define required environment variables | ||
required_vars: List[str] = [ | ||
'IPQS_API_KEY' | ||
] | ||
|
||
# Check and ensure that required variables are present, exits if not | ||
check_required_env_vars(env_config, required_vars) | ||
|
||
method: str = 'post' | ||
url: str = f'https://ipqualityscore.com/api/json/url' | ||
headers: Dict = {'accept': 'application/json'} | ||
encoded_query: str = quote(query) | ||
params: Dict = { | ||
'key': env_config['IPQS_API_KEY'], | ||
'url': encoded_query, | ||
**kwargs | ||
} | ||
|
||
result: Response = make_request(method=method, url=url, headers=headers, json=params) | ||
|
||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
from typing import Dict, Any, List | ||
from requests import Response | ||
from .broker import make_request | ||
from .helpers import check_required_env_vars, combine_env_configs | ||
|
||
env_config: Dict[str, Any] = combine_env_configs() | ||
|
||
def urlscan_search(query: str, **kwargs: Dict[str, Any]) -> Response: | ||
"""Find archived scans of URLs on urlscan.io. Search query syntax can | ||
be found at https://urlscan.io/docs/search/ | ||
Args: | ||
query (str): The query term (ElasticSearch Query String Query). Default: "*" | ||
Returns: | ||
Response: requests.Response json response from the request | ||
""" | ||
|
||
# Define required environment variables | ||
required_vars: List[str] = [ | ||
'URLSCAN_API_KEY' | ||
] | ||
|
||
# Check and ensure that required variables are present, exits if not | ||
check_required_env_vars(env_config, required_vars) | ||
|
||
method: str = 'get' | ||
url: str = f'https://urlscan.io/api/v1/search/' | ||
headers: Dict = { | ||
'accept': 'application/json', | ||
'API-Key': env_config['URLSCAN_API_KEY'] | ||
} | ||
params: Dict = { | ||
'q': query, | ||
**kwargs | ||
} | ||
|
||
result: Response = make_request(method=method, url=url, headers=headers, params=params) | ||
|
||
return result | ||
|
||
def urlscan_scan(query: str, **kwargs: Dict[str, Any]) -> Response: | ||
"""Submit a URL to be scanned | ||
Args: | ||
query (str): the URL to be scanned | ||
Returns: | ||
Response: requests.Response json response from the request | ||
""" | ||
|
||
required_vars: List[str] = [ | ||
'URLSCAN_API_KEY' | ||
] | ||
|
||
# Check and ensure that required variables are present, exits if not | ||
check_required_env_vars(env_config, required_vars) | ||
|
||
method: str = 'post' | ||
url: str = 'https://urlscan.io/api/v1/scan' | ||
headers: Dict = { | ||
'accept': 'application/json', | ||
'API-Key': env_config['URLSCAN_API_KEY'] | ||
} | ||
payload: Dict = { | ||
'url': query, | ||
**kwargs | ||
} | ||
|
||
result: Response = make_request(method=method, url=url, headers=headers, json=payload) | ||
|
||
return result | ||
|
||
def urlscan_results(uuid: str, **kwargs: Dict[str, Any]) -> Response: | ||
"""Retrieve results of a URLScan scan | ||
Args: | ||
uuid (str): the UUID of the submitted URL scan | ||
Returns: | ||
Response: requests.Response json response from the request | ||
""" | ||
|
||
# Define required environment variables | ||
required_vars: List[str] = [ | ||
'URLSCAN_API_KEY' | ||
] | ||
|
||
# Check and ensure that required variables are present, exits if not | ||
check_required_env_vars(env_config, required_vars) | ||
|
||
method: str = 'get' | ||
url: str = f'https://urlscan.io/api/v1/result/{uuid}' | ||
headers: Dict = { | ||
'accept': 'application/json', | ||
'API-Key': env_config['URLSCAN_API_KEY'] | ||
} | ||
params: Dict = dict(kwargs) | ||
|
||
result: Response = make_request(method=method, url=url, headers=headers, params=params) | ||
|
||
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
[tool.poetry] | ||
name = "ppp-connectors" | ||
packages = [{ include = "ppp_connectors" }] | ||
version = "0.2.2" | ||
version = "0.3.0" | ||
description = "A simple, lightweight set of connectors and functions to various APIs, controlled by a central broker." | ||
authors = [ | ||
"Rob D'Aveta <[email protected]>", | ||
|