Skip to content

Commit

Permalink
More robust time-based SQL injection detector (#1507)
Browse files Browse the repository at this point in the history
  • Loading branch information
kazet authored Feb 5, 2025
1 parent 3a19073 commit 50e707f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 32 deletions.
43 changes: 21 additions & 22 deletions artemis/modules/sql_injection_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,27 +91,17 @@ def change_url_params(url: str, payload: str, param_batch: tuple[Any]) -> str:
new_url = f"{new_url}" + concatenation + "&".join([f"{key}={value}" for key, value in assignments.items()])
return unquote(new_url)

@staticmethod
def is_response_time_within_threshold(elapsed_time: float) -> bool:
if elapsed_time < Config.Modules.SqlInjectionDetector.SQL_INJECTION_TIME_THRESHOLD:
return True
return False

def are_requests_time_efficient(self, url: str, **kwargs: Dict[str, Any]) -> bool:
def measure_request_time(self, url: str, **kwargs: Dict[str, Any]) -> float:
start = timer()
try:
if "headers" not in kwargs:
self.http_get(url)
else:
self.http_get(url, headers=kwargs.get("headers"))
except requests.exceptions.Timeout:
return False

elapsed_time = datetime.timedelta(seconds=timer() - start).seconds

flag = self.is_response_time_within_threshold(elapsed_time)
return Config.Modules.SqlInjectionDetector.SQL_INJECTION_TIME_THRESHOLD

return flag
return datetime.timedelta(seconds=timer() - start).seconds

def contains_error(self, url: str, response: HTTPResponse) -> str | None:
# 500 error code will not be matched as it's a significant source of FPs
Expand Down Expand Up @@ -199,9 +189,12 @@ def scan(self, urls: List[str], task: Task) -> List[Dict[str, Any]]:
flags = []
for _ in range(Config.Modules.SqlInjectionDetector.SQL_INJECTION_NUM_RETRIES_TIME_BASED):
# We explicitely want to re-check whether current URL is still time efficient
if self.are_requests_time_efficient(
url_with_no_sleep_payload
) and not self.are_requests_time_efficient(url_with_sleep_payload):
if (
self.measure_request_time(url_with_no_sleep_payload)
< Config.Modules.SqlInjectionDetector.SQL_INJECTION_TIME_THRESHOLD / 2
and self.measure_request_time(url_with_sleep_payload)
>= Config.Modules.SqlInjectionDetector.SQL_INJECTION_TIME_THRESHOLD
):
flags.append(True)
else:
flags.append(False)
Expand Down Expand Up @@ -253,9 +246,12 @@ def scan(self, urls: List[str], task: Task) -> List[Dict[str, Any]]:

for _ in range(Config.Modules.SqlInjectionDetector.SQL_INJECTION_NUM_RETRIES_TIME_BASED):
# We explicitely want to re-check whether current URL is still time efficient
if self.are_requests_time_efficient(
url_with_no_sleep_payload
) and not self.are_requests_time_efficient(url_with_sleep_payload):
if (
self.measure_request_time(url_with_no_sleep_payload)
< Config.Modules.SqlInjectionDetector.SQL_INJECTION_TIME_THRESHOLD / 2
and self.measure_request_time(url_with_sleep_payload)
>= Config.Modules.SqlInjectionDetector.SQL_INJECTION_TIME_THRESHOLD
):
flags.append(True)
else:
flags.append(False)
Expand Down Expand Up @@ -303,9 +299,12 @@ def scan(self, urls: List[str], task: Task) -> List[Dict[str, Any]]:

for _ in range(Config.Modules.SqlInjectionDetector.SQL_INJECTION_NUM_RETRIES_TIME_BASED):
# We explicitely want to re-check whether current URL is still time efficient
if self.are_requests_time_efficient(
current_url, headers=headers_no_sleep_payload
) and not self.are_requests_time_efficient(current_url, headers=headers):
if (
self.measure_request_time(current_url, headers=headers_no_sleep_payload)
< Config.Modules.SqlInjectionDetector.SQL_INJECTION_TIME_THRESHOLD / 2
and self.measure_request_time(current_url, headers=headers)
>= Config.Modules.SqlInjectionDetector.SQL_INJECTION_TIME_THRESHOLD
):
flags.append(True)
else:
flags.append(False)
Expand Down
1 change: 1 addition & 0 deletions artemis/reporting/severity.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Severity(str, Enum):
ReportType("old_drupal"): Severity.MEDIUM,
ReportType("old_joomla"): Severity.MEDIUM,
ReportType("xss"): Severity.HIGH,
ReportType("moodle_vulnerability_found"): Severity.HIGH,
# This doesn't mean that a version is insecure, as WordPress maintains a separate list
# of insecure versions. This just means "turn on the automatic updates"
ReportType("old_wordpress"): Severity.LOW,
Expand Down
20 changes: 10 additions & 10 deletions test/modules/test_sql_injection_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@ def test_is_url_with_parameters(self) -> None:
self.assertTrue(self.karton_class.is_url_with_parameters(url_with_payload))
self.assertFalse(self.karton_class.is_url_with_parameters(current_url))

def test_are_request_efficient(self) -> None:
def test_measure_request_time(self) -> None:
current_url = "http://test-apache-with-sql-injection-postgres:80/sql_injection.php?id=1"
url_with_sleep_payload = (
"http://test-apache-with-sql-injection-postgres:80/sql_injection.php?id='||pg_sleep(5)||'"
)
url_to_headers_vuln = "http://test-apache-with-sql-injection-postgres:80/headers_vuln.php"

self.assertTrue(self.karton.are_requests_time_efficient(current_url))
self.assertFalse(self.karton.are_requests_time_efficient(url_with_sleep_payload))
self.assertFalse(
self.karton.are_requests_time_efficient(url_to_headers_vuln, headers={"User-Agent": "'||pg_sleep(5)||'"})
self.assertTrue(self.karton.measure_request_time(current_url) < 1)
self.assertTrue(self.karton.measure_request_time(url_with_sleep_payload) >= 5)
self.assertTrue(
self.karton.measure_request_time(url_to_headers_vuln, headers={"User-Agent": "'||pg_sleep(5)||'"}) >= 5
)

def test_contains_error(self) -> None:
Expand Down Expand Up @@ -148,15 +148,15 @@ def test_is_url_with_parameters(self) -> None:
self.assertTrue(self.karton_class.is_url_with_parameters(url_with_payload))
self.assertFalse(self.karton_class.is_url_with_parameters(current_url))

def test_are_request_efficient(self) -> None:
def test_measure_request_time(self) -> None:
current_url = "http://test-apache-with-sql-injection-mysql/sql_injection.php?id=5"
url_with_sleep_payload = "http://test-apache-with-sql-injection-mysql/sql_injection.php?id='||sleep(5)||'"
url_to_headers_vuln = "http://test-apache-with-sql-injection-mysql/headers_vuln.php"

self.assertTrue(self.karton.are_requests_time_efficient(current_url))
self.assertFalse(self.karton.are_requests_time_efficient(url_with_sleep_payload))
self.assertFalse(
self.karton.are_requests_time_efficient(url_to_headers_vuln, headers={"User-Agent": "'||sleep(5)||'"})
self.assertTrue(self.karton.measure_request_time(current_url) < 1)
self.assertTrue(self.karton.measure_request_time(url_with_sleep_payload) >= 5)
self.assertTrue(
self.karton.measure_request_time(url_to_headers_vuln, headers={"User-Agent": "'||sleep(5)||'"}) >= 5
)

def test_contains_error(self) -> None:
Expand Down

0 comments on commit 50e707f

Please sign in to comment.