From eff31558bf05bb9d2e57faf1701a476e4edffc92 Mon Sep 17 00:00:00 2001 From: Alberto Vara Date: Tue, 4 Feb 2025 10:54:58 +0100 Subject: [PATCH] feat(iast): xss vulnerability for django applications (#12116) XSS vulnerability detection. System tests (merge after this PR): https://github.com/DataDog/system-tests/pull/3923 ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Federico Mon --- .../_evidence_redaction/_sensitive_handler.py | 2 + ddtrace/appsec/_iast/_patch_modules.py | 1 + ddtrace/appsec/_iast/constants.py | 1 + ddtrace/appsec/_iast/taint_sinks/xss.py | 78 ++ docker-compose.yml | 1 - ...iast-feat-xss-django-6781a8b9a4092832.yaml | 6 + .../iast/taint_sinks/_taint_sinks_utils.py | 1 + .../iast/taint_sinks/test_xss_redacted.py | 48 + .../integrations/django_tests/conftest.py | 14 +- .../django_tests/django_app/settings.py | 2 +- .../django_app/templates/index.html | 5 + .../templates/index_autoescape.html | 7 + .../django_app/templates/index_safe.html | 5 + .../django_tests/django_app/urls.py | 4 + .../django_tests/django_app/views.py | 30 + .../django_tests/test_django_appsec_iast.py | 1008 +++++++++-------- .../integrations/pygoat_tests/test_pygoat.py | 34 +- 17 files changed, 736 insertions(+), 511 deletions(-) create mode 100644 ddtrace/appsec/_iast/taint_sinks/xss.py create mode 100644 releasenotes/notes/iast-feat-xss-django-6781a8b9a4092832.yaml create mode 100644 tests/appsec/iast/taint_sinks/test_xss_redacted.py create mode 100644 tests/appsec/integrations/django_tests/django_app/templates/index.html create mode 100644 tests/appsec/integrations/django_tests/django_app/templates/index_autoescape.html create mode 100644 tests/appsec/integrations/django_tests/django_app/templates/index_safe.html diff --git a/ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py b/ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py index a10455dee42..dccc18a39b6 100644 --- a/ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py +++ b/ddtrace/appsec/_iast/_evidence_redaction/_sensitive_handler.py @@ -10,6 +10,7 @@ from ..constants import VULN_HEADER_INJECTION from ..constants import VULN_SQL_INJECTION from ..constants import VULN_SSRF +from ..constants import VULN_XSS from .command_injection_sensitive_analyzer import command_injection_sensitive_analyzer from .default_sensitive_analyzer import default_sensitive_analyzer from .header_injection_sensitive_analyzer import header_injection_sensitive_analyzer @@ -45,6 +46,7 @@ def __init__(self): VULN_SQL_INJECTION: sql_sensitive_analyzer, VULN_SSRF: url_sensitive_analyzer, VULN_HEADER_INJECTION: header_injection_sensitive_analyzer, + VULN_XSS: default_sensitive_analyzer, VULN_CODE_INJECTION: default_sensitive_analyzer, } diff --git a/ddtrace/appsec/_iast/_patch_modules.py b/ddtrace/appsec/_iast/_patch_modules.py index e91438ebd49..634cd6399c5 100644 --- a/ddtrace/appsec/_iast/_patch_modules.py +++ b/ddtrace/appsec/_iast/_patch_modules.py @@ -7,6 +7,7 @@ "header_injection": True, "weak_cipher": True, "weak_hash": True, + "xss": True, } diff --git a/ddtrace/appsec/_iast/constants.py b/ddtrace/appsec/_iast/constants.py index 9ac6edb0ab1..3d0edc31b83 100644 --- a/ddtrace/appsec/_iast/constants.py +++ b/ddtrace/appsec/_iast/constants.py @@ -14,6 +14,7 @@ VULN_CMDI = "COMMAND_INJECTION" VULN_HEADER_INJECTION = "HEADER_INJECTION" VULN_CODE_INJECTION = "CODE_INJECTION" +VULN_XSS = "XSS" VULN_SSRF = "SSRF" VULN_STACKTRACE_LEAK = "STACKTRACE_LEAK" diff --git a/ddtrace/appsec/_iast/taint_sinks/xss.py b/ddtrace/appsec/_iast/taint_sinks/xss.py new file mode 100644 index 00000000000..425affac77a --- /dev/null +++ b/ddtrace/appsec/_iast/taint_sinks/xss.py @@ -0,0 +1,78 @@ +from typing import Text + +from ddtrace.appsec._common_module_patches import try_unwrap +from ddtrace.appsec._constants import IAST_SPAN_TAGS +from ddtrace.appsec._iast import oce +from ddtrace.appsec._iast._iast_request_context import is_iast_request_enabled +from ddtrace.appsec._iast._metrics import _set_metric_iast_executed_sink +from ddtrace.appsec._iast._metrics import _set_metric_iast_instrumented_sink +from ddtrace.appsec._iast._metrics import increment_iast_span_metric +from ddtrace.appsec._iast._patch import set_and_check_module_is_patched +from ddtrace.appsec._iast._patch import set_module_unpatched +from ddtrace.appsec._iast._patch import try_wrap_function_wrapper +from ddtrace.appsec._iast._taint_tracking._taint_objects import is_pyobject_tainted +from ddtrace.appsec._iast.constants import VULN_XSS +from ddtrace.appsec._iast.taint_sinks._base import VulnerabilityBase +from ddtrace.internal.logger import get_logger +from ddtrace.settings.asm import config as asm_config + + +log = get_logger(__name__) + + +@oce.register +class XSS(VulnerabilityBase): + vulnerability_type = VULN_XSS + + +def get_version() -> Text: + return "" + + +def patch(): + if not asm_config._iast_enabled: + return + + if not set_and_check_module_is_patched("flask", default_attr="_datadog_xss_patch"): + return + if not set_and_check_module_is_patched("django", default_attr="_datadog_xss_patch"): + return + if not set_and_check_module_is_patched("fastapi", default_attr="_datadog_xss_patch"): + return + + try_wrap_function_wrapper( + "django.utils.safestring", + "mark_safe", + _iast_django_xss, + ) + + try_wrap_function_wrapper( + "django.template.defaultfilters", + "mark_safe", + _iast_django_xss, + ) + + _set_metric_iast_instrumented_sink(VULN_XSS) + + +def unpatch(): + try_unwrap("django.utils.safestring", "mark_safe") + try_unwrap("django.template.defaultfilters", "mark_safe") + + set_module_unpatched("flask", default_attr="_datadog_xss_patch") + set_module_unpatched("django", default_attr="_datadog_xss_patch") + set_module_unpatched("fastapi", default_attr="_datadog_xss_patch") + + +def _iast_django_xss(wrapped, instance, args, kwargs): + if args and len(args) >= 1: + _iast_report_xss(args[0]) + return wrapped(*args, **kwargs) + + +def _iast_report_xss(code_string: Text): + increment_iast_span_metric(IAST_SPAN_TAGS.TELEMETRY_EXECUTED_SINK, XSS.vulnerability_type) + _set_metric_iast_executed_sink(XSS.vulnerability_type) + if is_iast_request_enabled(): + if is_pyobject_tainted(code_string): + XSS.report(evidence_value=code_string) diff --git a/docker-compose.yml b/docker-compose.yml index 701b5a7d0f0..dfcee9a54ce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -198,7 +198,6 @@ services: ports: - "127.0.0.1:8321:8321" environment: - - DD_APPSEC_ENABLED=true - DD_IAST_ENABLED=true - DD_IAST_REQUEST_SAMPLING=100 - DD_IAST_VULNERABILITIES_PER_REQUEST=100 diff --git a/releasenotes/notes/iast-feat-xss-django-6781a8b9a4092832.yaml b/releasenotes/notes/iast-feat-xss-django-6781a8b9a4092832.yaml new file mode 100644 index 00000000000..db470196b36 --- /dev/null +++ b/releasenotes/notes/iast-feat-xss-django-6781a8b9a4092832.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Code Security (IAST): XSS detection for Django applications, + which will be displayed on your DataDog Vulnerability Explorer dashboard. + See the `Application Vulnerability Management `_ documentation for more information about this feature. diff --git a/tests/appsec/iast/taint_sinks/_taint_sinks_utils.py b/tests/appsec/iast/taint_sinks/_taint_sinks_utils.py index 288b72d015c..ae69fbda120 100644 --- a/tests/appsec/iast/taint_sinks/_taint_sinks_utils.py +++ b/tests/appsec/iast/taint_sinks/_taint_sinks_utils.py @@ -22,6 +22,7 @@ def get_parametrize(vuln_type, ignore_list=None): "$1 - Tainted range based redaction - multiple ranges", "Redacted source that needs to be truncated", "Query with single quoted string literal and null source", + "No redacted that needs to be truncated - whole text", ): continue diff --git a/tests/appsec/iast/taint_sinks/test_xss_redacted.py b/tests/appsec/iast/taint_sinks/test_xss_redacted.py new file mode 100644 index 00000000000..c192962e53e --- /dev/null +++ b/tests/appsec/iast/taint_sinks/test_xss_redacted.py @@ -0,0 +1,48 @@ +import os + +import pytest + +from ddtrace.appsec._iast._taint_tracking import origin_to_str +from ddtrace.appsec._iast._taint_tracking import str_to_origin +from ddtrace.appsec._iast.constants import VULN_XSS +from ddtrace.appsec._iast.taint_sinks.xss import XSS +from tests.appsec.iast.taint_sinks._taint_sinks_utils import _taint_pyobject_multiranges +from tests.appsec.iast.taint_sinks._taint_sinks_utils import get_parametrize +from tests.appsec.iast.taint_sinks.conftest import _get_iast_data + + +ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) + + +@pytest.mark.parametrize( + "evidence_input, sources_expected, vulnerabilities_expected,element", list(get_parametrize(VULN_XSS)) +) +def test_xss_redaction_suite( + evidence_input, sources_expected, vulnerabilities_expected, iast_context_defaults, element +): + tainted_object = evidence_input_value = evidence_input.get("value", "") + if evidence_input_value: + tainted_object = _taint_pyobject_multiranges( + evidence_input_value, + [ + ( + input_ranges["iinfo"]["parameterName"], + input_ranges["iinfo"]["parameterValue"], + str_to_origin(input_ranges["iinfo"]["type"]), + input_ranges["start"], + input_ranges["end"] - input_ranges["start"], + ) + for input_ranges in evidence_input.get("ranges", {}) + ], + ) + + XSS.report(tainted_object) + + data = _get_iast_data() + vulnerability = list(data["vulnerabilities"])[0] + source = list(data["sources"])[0] + source["origin"] = origin_to_str(source["origin"]) + + assert vulnerability["type"] == VULN_XSS + assert vulnerability["evidence"] == vulnerabilities_expected["evidence"] + assert source == sources_expected diff --git a/tests/appsec/integrations/django_tests/conftest.py b/tests/appsec/integrations/django_tests/conftest.py index d150edf68be..57bd68db6a6 100644 --- a/tests/appsec/integrations/django_tests/conftest.py +++ b/tests/appsec/integrations/django_tests/conftest.py @@ -5,6 +5,7 @@ import pytest from ddtrace.appsec._iast import enable_iast_propagation +from ddtrace.appsec._iast._patch_modules import patch_iast from ddtrace.contrib.internal.django.patch import patch from ddtrace.trace import Pin from tests.appsec.iast.conftest import _end_iast_context_and_oce @@ -27,11 +28,22 @@ def pytest_configure(): ) ): settings.DEBUG = False - enable_iast_propagation() + patch_iast() patch() + enable_iast_propagation() django.setup() +@pytest.fixture +def debug_mode(): + from django.conf import settings + + original_debug = settings.DEBUG + settings.DEBUG = True + yield + settings.DEBUG = original_debug + + @pytest.fixture def tracer(): tracer = DummyTracer() diff --git a/tests/appsec/integrations/django_tests/django_app/settings.py b/tests/appsec/integrations/django_tests/django_app/settings.py index 836c7602c5f..cbd7eea9c25 100644 --- a/tests/appsec/integrations/django_tests/django_app/settings.py +++ b/tests/appsec/integrations/django_tests/django_app/settings.py @@ -40,7 +40,7 @@ { "BACKEND": "django.template.backends.django.DjangoTemplates", "DIRS": [ - os.path.join(BASE_DIR, "templates"), + os.path.join(BASE_DIR, "django_app", "templates"), ], "APP_DIRS": True, "OPTIONS": { diff --git a/tests/appsec/integrations/django_tests/django_app/templates/index.html b/tests/appsec/integrations/django_tests/django_app/templates/index.html new file mode 100644 index 00000000000..7135619ca9d --- /dev/null +++ b/tests/appsec/integrations/django_tests/django_app/templates/index.html @@ -0,0 +1,5 @@ + + +

Input: {{ user_input }}

+ + \ No newline at end of file diff --git a/tests/appsec/integrations/django_tests/django_app/templates/index_autoescape.html b/tests/appsec/integrations/django_tests/django_app/templates/index_autoescape.html new file mode 100644 index 00000000000..ef5f5a64ed4 --- /dev/null +++ b/tests/appsec/integrations/django_tests/django_app/templates/index_autoescape.html @@ -0,0 +1,7 @@ + + +

{% autoescape on %} + {{ user_input }} +{% endautoescape %}

+ + diff --git a/tests/appsec/integrations/django_tests/django_app/templates/index_safe.html b/tests/appsec/integrations/django_tests/django_app/templates/index_safe.html new file mode 100644 index 00000000000..8bc39da3351 --- /dev/null +++ b/tests/appsec/integrations/django_tests/django_app/templates/index_safe.html @@ -0,0 +1,5 @@ + + +

Input: {{ user_input|safe }}

+ + \ No newline at end of file diff --git a/tests/appsec/integrations/django_tests/django_app/urls.py b/tests/appsec/integrations/django_tests/django_app/urls.py index dd1d069ad77..e79b6bee284 100644 --- a/tests/appsec/integrations/django_tests/django_app/urls.py +++ b/tests/appsec/integrations/django_tests/django_app/urls.py @@ -73,6 +73,10 @@ def shutdown(request): handler("appsec/insecure-cookie/test_insecure/$", views.view_insecure_cookies_insecure), handler("appsec/insecure-cookie/test_secure/$", views.view_insecure_cookies_secure), handler("appsec/insecure-cookie/test_empty_cookie/$", views.view_insecure_cookies_empty), + handler("appsec/xss/$", views.xss_http_request_parameter_mark_safe), + handler("appsec/xss/secure/$", views.xss_secure), + handler("appsec/xss/safe/$", views.xss_http_request_parameter_template_safe), + handler("appsec/xss/autoscape/$", views.xss_http_request_parameter_autoscape), path( "appsec/sqli_http_path_parameter//", views.sqli_http_path_parameter, diff --git a/tests/appsec/integrations/django_tests/django_app/views.py b/tests/appsec/integrations/django_tests/django_app/views.py index 693a9eab365..685b3c598c2 100644 --- a/tests/appsec/integrations/django_tests/django_app/views.py +++ b/tests/appsec/integrations/django_tests/django_app/views.py @@ -8,6 +8,8 @@ from django.db import connection from django.http import HttpResponse from django.http import JsonResponse +from django.shortcuts import render +from django.utils.safestring import mark_safe from ddtrace.appsec import _asm_request_context from ddtrace.appsec._iast._taint_tracking import OriginType @@ -68,6 +70,34 @@ def checkuser_view(request, user_id): return HttpResponse(status=200) +def xss_http_request_parameter_mark_safe(request): + user_input = request.GET.get("input", "") + + # label xss_http_request_parameter_mark_safe + return render(request, "index.html", {"user_input": mark_safe(user_input)}) + + +def xss_secure(request): + user_input = request.GET.get("input", "") + + # label xss_http_request_parameter_mark_safe + return render(request, "index.html", {"user_input": user_input}) + + +def xss_http_request_parameter_template_safe(request): + user_input = request.GET.get("input", "") + + # label xss_http_request_parameter_template_safe + return render(request, "index_safe.html", {"user_input": user_input}) + + +def xss_http_request_parameter_autoscape(request): + user_input = request.GET.get("input", "") + + # label xss_http_request_parameter_autoscape + return render(request, "index_autoescape.html", {"user_input": user_input}) + + def sqli_http_request_parameter(request): import bcrypt from django.contrib.auth.hashers import BCryptSHA256PasswordHasher diff --git a/tests/appsec/integrations/django_tests/test_django_appsec_iast.py b/tests/appsec/integrations/django_tests/test_django_appsec_iast.py index dd400c64df6..657688f5760 100644 --- a/tests/appsec/integrations/django_tests/test_django_appsec_iast.py +++ b/tests/appsec/integrations/django_tests/test_django_appsec_iast.py @@ -3,20 +3,16 @@ import pytest -from ddtrace.appsec._asm_request_context import start_context +from ddtrace.appsec._common_module_patches import patch_common_modules from ddtrace.appsec._constants import IAST -from ddtrace.appsec._iast import oce -from ddtrace.appsec._iast._patch_modules import patch_iast from ddtrace.appsec._iast.constants import VULN_CMDI from ddtrace.appsec._iast.constants import VULN_HEADER_INJECTION from ddtrace.appsec._iast.constants import VULN_INSECURE_COOKIE from ddtrace.appsec._iast.constants import VULN_SQL_INJECTION from ddtrace.appsec._iast.constants import VULN_STACKTRACE_LEAK -from ddtrace.ext import SpanTypes from ddtrace.internal.compat import urlencode from ddtrace.settings.asm import config as asm_config from tests.appsec.iast.iast_utils import get_line_and_hash -from tests.utils import override_env from tests.utils import override_global_config @@ -25,7 +21,9 @@ @pytest.fixture(autouse=True) def iast_context(): - with override_env({IAST.ENV: "True", IAST.ENV_REQUEST_SAMPLING: "100", "DD_IAST_DEDUPLICATION_ENABLED": "false"}): + with override_global_config( + dict(_iast_enabled=True, _iast_deduplication_enabled=False, _iast_request_sampling=100.0) + ): yield @@ -85,32 +83,28 @@ def _aux_appsec_get_root_span_with_exception( @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_weak_hash(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - oce.reconfigure() - patch_iast({"weak_hash": True}) - root_span, _ = _aux_appsec_get_root_span(client, test_spans, tracer, url="/appsec/weak-hash/") - str_json = root_span.get_tag(IAST.JSON) - assert str_json is not None, "no JSON tag in root span" - vulnerability = json.loads(str_json)["vulnerabilities"][0] - assert vulnerability["location"]["path"].endswith(TEST_FILE) - assert vulnerability["evidence"]["value"] == "md5" + root_span, _ = _aux_appsec_get_root_span(client, test_spans, tracer, url="/appsec/weak-hash/") + str_json = root_span.get_tag(IAST.JSON) + assert str_json is not None, "no JSON tag in root span" + vulnerability = json.loads(str_json)["vulnerabilities"][0] + assert vulnerability["location"]["path"].endswith(TEST_FILE) + assert vulnerability["evidence"]["value"] == "md5" @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - payload=urlencode({"mytestingbody_key": "mytestingbody_value"}), - content_type="application/x-www-form-urlencoded", - url="/appsec/taint-checking-enabled/?q=aaa", - headers={"HTTP_USER_AGENT": "test/1.2.3"}, - ) + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + payload=urlencode({"mytestingbody_key": "mytestingbody_value"}), + content_type="application/x-www-form-urlencoded", + url="/appsec/taint-checking-enabled/?q=aaa", + headers={"HTTP_USER_AGENT": "test/1.2.3"}, + ) - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + assert response.status_code == 200 + assert response.content == b"test/1.2.3" @pytest.mark.parametrize( @@ -156,8 +150,6 @@ def test_django_view_with_exception(client, test_spans, tracer, payload, content @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_disabled(client, test_spans, tracer): with override_global_config(dict(_iast_enabled=False, _iast_deduplication_enabled=False)): - oce.reconfigure() - root_span, response = _aux_appsec_get_root_span( client, test_spans, @@ -177,191 +169,181 @@ def test_django_tainted_user_agent_iast_disabled(client, test_spans, tracer): @pytest.mark.django_db() @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_tainted_user_agent_iast_enabled_sqli_http_request_parameter(client, test_spans, tracer): - with override_global_config( - dict(_iast_enabled=True, _iast_deduplication_enabled=False, _iast_request_sampling=100.0) - ): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - payload=urlencode({"mytestingbody_key": "mytestingbody_value"}), - content_type="application/x-www-form-urlencoded", - url="/appsec/sqli_http_request_parameter/?q=SELECT 1 FROM sqlite_master WHERE name='", - headers={"HTTP_USER_AGENT": "test/1.2.3"}, - ) + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + payload=urlencode({"mytestingbody_key": "mytestingbody_value"}), + content_type="application/x-www-form-urlencoded", + url="/appsec/sqli_http_request_parameter/?q=SELECT 1 FROM sqlite_master WHERE name='", + headers={"HTTP_USER_AGENT": "test/1.2.3"}, + ) - vuln_type = "SQL_INJECTION" + vuln_type = "SQL_INJECTION" - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + assert response.status_code == 200 + assert response.content == b"test/1.2.3" - loaded = json.loads(root_span.get_tag(IAST.JSON)) + loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash("iast_enabled_sqli_http_request_parameter", vuln_type, filename=TEST_FILE) + line, hash_value = get_line_and_hash("iast_enabled_sqli_http_request_parameter", vuln_type, filename=TEST_FILE) - assert loaded["sources"] == [ - { - "name": "q", - "origin": "http.request.parameter", - "pattern": "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN", - "redacted": True, - } - ] - - assert loaded["vulnerabilities"][0]["type"] == vuln_type - assert loaded["vulnerabilities"][0]["evidence"] == { - "valueParts": [ - {"source": 0, "value": "SELECT "}, - {"pattern": "h", "redacted": True, "source": 0}, - {"source": 0, "value": " FROM sqlite_master WHERE name='"}, - {"redacted": True}, - {"value": "'"}, - ] + assert loaded["sources"] == [ + { + "name": "q", + "origin": "http.request.parameter", + "pattern": "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN", + "redacted": True, } - assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE - assert loaded["vulnerabilities"][0]["location"]["line"] == line - assert loaded["vulnerabilities"][0]["hash"] == hash_value + ] + + assert loaded["vulnerabilities"][0]["type"] == vuln_type + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [ + {"source": 0, "value": "SELECT "}, + {"pattern": "h", "redacted": True, "source": 0}, + {"source": 0, "value": " FROM sqlite_master WHERE name='"}, + {"redacted": True}, + {"value": "'"}, + ] + } + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["hash"] == hash_value @pytest.mark.django_db() @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_sqli_http_request_parameter_name_get(client, test_spans, tracer): - with override_global_config( - dict(_iast_enabled=True, _iast_deduplication_enabled=False, _iast_request_sampling=100.0) - ): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - content_type="application/x-www-form-urlencoded", - url="/appsec/sqli_http_request_parameter_name_get/?SELECT=unused", - headers={"HTTP_USER_AGENT": "test/1.2.3"}, - ) + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + content_type="application/x-www-form-urlencoded", + url="/appsec/sqli_http_request_parameter_name_get/?SELECT=unused", + headers={"HTTP_USER_AGENT": "test/1.2.3"}, + ) - vuln_type = "SQL_INJECTION" + vuln_type = "SQL_INJECTION" - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + assert response.status_code == 200 + assert response.content == b"test/1.2.3" - loaded = json.loads(root_span.get_tag(IAST.JSON)) + loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_request_parameter_name_get", vuln_type, filename=TEST_FILE - ) + line, hash_value = get_line_and_hash( + "iast_enabled_sqli_http_request_parameter_name_get", vuln_type, filename=TEST_FILE + ) - assert loaded["sources"] == [ + assert loaded["sources"] == [ + { + "name": "SELECT", + "origin": "http.request.parameter.name", + "value": "SELECT", + } + ] + + assert loaded["vulnerabilities"][0]["type"] == vuln_type + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [ + {"source": 0, "value": "SELECT"}, { - "name": "SELECT", - "origin": "http.request.parameter.name", - "value": "SELECT", - } + "value": " ", + }, + { + "redacted": True, + }, ] - - assert loaded["vulnerabilities"][0]["type"] == vuln_type - assert loaded["vulnerabilities"][0]["evidence"] == { - "valueParts": [ - {"source": 0, "value": "SELECT"}, - { - "value": " ", - }, - { - "redacted": True, - }, - ] - } - assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE - assert loaded["vulnerabilities"][0]["location"]["line"] == line - assert loaded["vulnerabilities"][0]["hash"] == hash_value + } + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["hash"] == hash_value @pytest.mark.django_db() @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_sqli_http_request_parameter_name_post(client, test_spans, tracer): - with override_global_config( - dict(_iast_enabled=True, _iast_deduplication_enabled=False, _iast_request_sampling=100.0) - ): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - payload=urlencode({"SELECT": "unused"}), - content_type="application/x-www-form-urlencoded", - url="/appsec/sqli_http_request_parameter_name_post/", - headers={"HTTP_USER_AGENT": "test/1.2.3"}, - ) + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + payload=urlencode({"SELECT": "unused"}), + content_type="application/x-www-form-urlencoded", + url="/appsec/sqli_http_request_parameter_name_post/", + headers={"HTTP_USER_AGENT": "test/1.2.3"}, + ) - vuln_type = "SQL_INJECTION" + vuln_type = "SQL_INJECTION" - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + assert response.status_code == 200 + assert response.content == b"test/1.2.3" + + loaded = json.loads(root_span.get_tag(IAST.JSON)) - loaded = json.loads(root_span.get_tag(IAST.JSON)) + line, hash_value = get_line_and_hash( + "iast_enabled_sqli_http_request_parameter_name_post", vuln_type, filename=TEST_FILE + ) - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_request_parameter_name_post", vuln_type, filename=TEST_FILE - ) + assert loaded["sources"] == [ + { + "name": "SELECT", + "origin": "http.request.parameter.name", + "value": "SELECT", + } + ] - assert loaded["sources"] == [ + assert loaded["vulnerabilities"][0]["type"] == vuln_type + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [ + {"source": 0, "value": "SELECT"}, + { + "value": " ", + }, { - "name": "SELECT", - "origin": "http.request.parameter.name", - "value": "SELECT", - } + "redacted": True, + }, ] - - assert loaded["vulnerabilities"][0]["type"] == vuln_type - assert loaded["vulnerabilities"][0]["evidence"] == { - "valueParts": [ - {"source": 0, "value": "SELECT"}, - { - "value": " ", - }, - { - "redacted": True, - }, - ] - } - assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE - assert loaded["vulnerabilities"][0]["location"]["line"] == line - assert loaded["vulnerabilities"][0]["hash"] == hash_value + } + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["hash"] == hash_value @pytest.mark.django_db() @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_sqli_http_request_header_value(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - payload=urlencode({"mytestingbody_key": "mytestingbody_value"}), - content_type="application/x-www-form-urlencoded", - url="/appsec/sqli_http_request_header_value/", - headers={"HTTP_USER_AGENT": "master"}, - ) + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + payload=urlencode({"mytestingbody_key": "mytestingbody_value"}), + content_type="application/x-www-form-urlencoded", + url="/appsec/sqli_http_request_header_value/", + headers={"HTTP_USER_AGENT": "master"}, + ) - assert response.status_code == 200 - assert response.content == b"master" + assert response.status_code == 200 + assert response.content == b"master" - loaded = json.loads(root_span.get_tag(IAST.JSON)) - - assert loaded["sources"] == [{"origin": "http.request.header", "name": "HTTP_USER_AGENT", "value": "master"}] - assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION - assert loaded["vulnerabilities"][0]["evidence"] == { - "valueParts": [ - {"value": "SELECT "}, - {"redacted": True}, - {"value": " FROM sqlite_"}, - {"source": 0, "value": "master"}, - ] - } + loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_request_header_value", VULN_SQL_INJECTION, filename=TEST_FILE - ) - assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE - assert loaded["vulnerabilities"][0]["location"]["line"] == line - assert loaded["vulnerabilities"][0]["hash"] == hash_value + assert loaded["sources"] == [{"origin": "http.request.header", "name": "HTTP_USER_AGENT", "value": "master"}] + assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [ + {"value": "SELECT "}, + {"redacted": True}, + {"value": " FROM sqlite_"}, + {"source": 0, "value": "master"}, + ] + } + + line, hash_value = get_line_and_hash( + "iast_enabled_sqli_http_request_header_value", VULN_SQL_INJECTION, filename=TEST_FILE + ) + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["hash"] == hash_value @pytest.mark.django_db() @@ -387,39 +369,38 @@ def test_django_iast_disabled_sqli_http_request_header_value(client, test_spans, @pytest.mark.django_db() @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_sqli_http_request_header_name(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - payload=urlencode({"mytestingbody_key": "mytestingbody_value"}), - content_type="application/x-www-form-urlencoded", - url="/appsec/sqli_http_request_header_name/", - headers={"master": "test/1.2.3"}, - ) + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + payload=urlencode({"mytestingbody_key": "mytestingbody_value"}), + content_type="application/x-www-form-urlencoded", + url="/appsec/sqli_http_request_header_name/", + headers={"master": "test/1.2.3"}, + ) - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + assert response.status_code == 200 + assert response.content == b"test/1.2.3" - loaded = json.loads(root_span.get_tag(IAST.JSON)) - - assert loaded["sources"] == [{"origin": "http.request.header.name", "name": "master", "value": "master"}] - assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION - assert loaded["vulnerabilities"][0]["evidence"] == { - "valueParts": [ - {"value": "SELECT "}, - {"redacted": True}, - {"value": " FROM sqlite_"}, - {"value": "master", "source": 0}, - ] - } + loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_request_header_name", VULN_SQL_INJECTION, filename=TEST_FILE - ) - assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE - assert loaded["vulnerabilities"][0]["location"]["line"] == line - assert loaded["vulnerabilities"][0]["hash"] == hash_value + assert loaded["sources"] == [{"origin": "http.request.header.name", "name": "master", "value": "master"}] + assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [ + {"value": "SELECT "}, + {"redacted": True}, + {"value": " FROM sqlite_"}, + {"value": "master", "source": 0}, + ] + } + + line, hash_value = get_line_and_hash( + "iast_enabled_sqli_http_request_header_name", VULN_SQL_INJECTION, filename=TEST_FILE + ) + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["hash"] == hash_value @pytest.mark.django_db() @@ -498,41 +479,38 @@ def test_django_iast_disabled_sqli_http_path_parameter(client, test_spans, trace @pytest.mark.django_db() @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_sqli_http_cookies_name(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/sqli_http_request_cookie_name/", - cookies={"master": "test/1.2.3"}, - ) - assert response.status_code == 200 - assert response.content == b"test/1.2.3" + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/sqli_http_request_cookie_name/", + cookies={"master": "test/1.2.3"}, + ) + assert response.status_code == 200 + assert response.content == b"test/1.2.3" - loaded = json.loads(root_span.get_tag(IAST.JSON)) + loaded = json.loads(root_span.get_tag(IAST.JSON)) - vulnerability = False - for vuln in loaded["vulnerabilities"]: - if vuln["type"] == VULN_SQL_INJECTION: - vulnerability = vuln + vulnerability = False + for vuln in loaded["vulnerabilities"]: + if vuln["type"] == VULN_SQL_INJECTION: + vulnerability = vuln - assert vulnerability, "No {} reported".format(VULN_SQL_INJECTION) + assert vulnerability, "No {} reported".format(VULN_SQL_INJECTION) - assert loaded["sources"] == [{"origin": "http.request.cookie.name", "name": "master", "value": "master"}] - assert vulnerability["evidence"] == { - "valueParts": [ - {"value": "SELECT "}, - {"redacted": True}, - {"value": " FROM sqlite_"}, - {"value": "master", "source": 0}, - ] - } - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_cookies_name", VULN_SQL_INJECTION, filename=TEST_FILE - ) - assert vulnerability["location"]["path"] == TEST_FILE - assert vulnerability["location"]["line"] == line - assert vulnerability["hash"] == hash_value + assert loaded["sources"] == [{"origin": "http.request.cookie.name", "name": "master", "value": "master"}] + assert vulnerability["evidence"] == { + "valueParts": [ + {"value": "SELECT "}, + {"redacted": True}, + {"value": " FROM sqlite_"}, + {"value": "master", "source": 0}, + ] + } + line, hash_value = get_line_and_hash("iast_enabled_sqli_http_cookies_name", VULN_SQL_INJECTION, filename=TEST_FILE) + assert vulnerability["location"]["path"] == TEST_FILE + assert vulnerability["location"]["line"] == line + assert vulnerability["hash"] == hash_value @pytest.mark.django_db() @@ -556,43 +534,40 @@ def test_django_iast_disabled_sqli_http_cookies_name(client, test_spans, tracer) @pytest.mark.django_db() @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_sqli_http_cookies_value(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/sqli_http_request_cookie_value/", - cookies={"master": "master"}, - ) - assert response.status_code == 200 - assert response.content == b"master" + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/sqli_http_request_cookie_value/", + cookies={"master": "master"}, + ) + assert response.status_code == 200 + assert response.content == b"master" - loaded = json.loads(root_span.get_tag(IAST.JSON)) + loaded = json.loads(root_span.get_tag(IAST.JSON)) - vulnerability = False - for vuln in loaded["vulnerabilities"]: - if vuln["type"] == VULN_SQL_INJECTION: - vulnerability = vuln + vulnerability = False + for vuln in loaded["vulnerabilities"]: + if vuln["type"] == VULN_SQL_INJECTION: + vulnerability = vuln - assert vulnerability, "No {} reported".format(VULN_SQL_INJECTION) - assert loaded["sources"] == [{"origin": "http.request.cookie.value", "name": "master", "value": "master"}] - assert vulnerability["type"] == "SQL_INJECTION" + assert vulnerability, "No {} reported".format(VULN_SQL_INJECTION) + assert loaded["sources"] == [{"origin": "http.request.cookie.value", "name": "master", "value": "master"}] + assert vulnerability["type"] == "SQL_INJECTION" - assert vulnerability["evidence"] == { - "valueParts": [ - {"value": "SELECT "}, - {"redacted": True}, - {"value": " FROM sqlite_"}, - {"value": "master", "source": 0}, - ] - } + assert vulnerability["evidence"] == { + "valueParts": [ + {"value": "SELECT "}, + {"redacted": True}, + {"value": " FROM sqlite_"}, + {"value": "master", "source": 0}, + ] + } - line, hash_value = get_line_and_hash( - "iast_enabled_sqli_http_cookies_value", VULN_SQL_INJECTION, filename=TEST_FILE - ) - assert vulnerability["location"]["line"] == line - assert vulnerability["location"]["path"] == TEST_FILE - assert vulnerability["hash"] == hash_value + line, hash_value = get_line_and_hash("iast_enabled_sqli_http_cookies_value", VULN_SQL_INJECTION, filename=TEST_FILE) + assert vulnerability["location"]["line"] == line + assert vulnerability["location"]["path"] == TEST_FILE + assert vulnerability["hash"] == hash_value @pytest.mark.django_db() @@ -623,35 +598,34 @@ def test_django_iast_disabled_sqli_http_cookies_value(client, test_spans, tracer @pytest.mark.django_db() @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_sqli_http_body(client, test_spans, tracer, payload, content_type): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/sqli_http_request_body/", - payload=payload, - content_type=content_type, - ) - loaded = json.loads(root_span.get_tag(IAST.JSON)) - - line, hash_value = get_line_and_hash("iast_enabled_sqli_http_body", VULN_SQL_INJECTION, filename=TEST_FILE) - - assert loaded["sources"] == [{"origin": "http.request.body", "name": "http.request.body", "value": "master"}] - assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION - assert loaded["vulnerabilities"][0]["hash"] == hash_value - assert loaded["vulnerabilities"][0]["evidence"] == { - "valueParts": [ - {"value": "SELECT "}, - {"redacted": True}, - {"value": " FROM sqlite_"}, - {"value": "master", "source": 0}, - ] - } - assert loaded["vulnerabilities"][0]["location"]["line"] == line - assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/sqli_http_request_body/", + payload=payload, + content_type=content_type, + ) + loaded = json.loads(root_span.get_tag(IAST.JSON)) - assert response.status_code == 200 - assert response.content == b"master" + line, hash_value = get_line_and_hash("iast_enabled_sqli_http_body", VULN_SQL_INJECTION, filename=TEST_FILE) + + assert loaded["sources"] == [{"origin": "http.request.body", "name": "http.request.body", "value": "master"}] + assert loaded["vulnerabilities"][0]["type"] == VULN_SQL_INJECTION + assert loaded["vulnerabilities"][0]["hash"] == hash_value + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [ + {"value": "SELECT "}, + {"redacted": True}, + {"value": " FROM sqlite_"}, + {"value": "master", "source": 0}, + ] + } + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + + assert response.status_code == 200 + assert response.content == b"master" @pytest.mark.parametrize( @@ -718,244 +692,314 @@ def test_django_iast_disabled_sqli_http_body(client, test_spans, tracer): @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_querydict(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True)): - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/validate_querydict/?x=1&y=2&x=3", - ) + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/validate_querydict/?x=1&y=2&x=3", + ) - assert root_span.get_tag(IAST.JSON) is None - assert response.status_code == 200 - assert ( - response.content == b"x=['1', '3'], all=[('x', ['1', '3']), ('y', ['2'])]," - b" keys=['x', 'y'], urlencode=x=1&x=3&y=2" - ) + assert root_span.get_tag(IAST.JSON) is None + assert response.status_code == 200 + assert ( + response.content == b"x=['1', '3'], all=[('x', ['1', '3']), ('y', ['2'])]," + b" keys=['x', 'y'], urlencode=x=1&x=3&y=2" + ) @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_command_injection(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - oce.reconfigure() - patch_iast({"command_injection": True}) - from ddtrace.appsec._common_module_patches import patch_common_modules - - patch_common_modules() - root_span, _ = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/command-injection/", - payload="master", - content_type="application/json", - ) + patch_common_modules() + root_span, _ = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/command-injection/", + payload="master", + content_type="application/json", + ) - loaded = json.loads(root_span.get_tag(IAST.JSON)) + loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash("iast_command_injection", VULN_CMDI, filename=TEST_FILE) + line, hash_value = get_line_and_hash("iast_command_injection", VULN_CMDI, filename=TEST_FILE) - assert loaded["sources"] == [ - {"name": "http.request.body", "origin": "http.request.body", "pattern": "abcdef", "redacted": True} - ] - assert loaded["vulnerabilities"][0]["type"] == VULN_CMDI - assert loaded["vulnerabilities"][0]["hash"] == hash_value - assert loaded["vulnerabilities"][0]["evidence"] == { - "valueParts": [{"value": "dir "}, {"redacted": True}, {"pattern": "abcdef", "redacted": True, "source": 0}] - } - assert loaded["vulnerabilities"][0]["location"]["line"] == line - assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["sources"] == [ + {"name": "http.request.body", "origin": "http.request.body", "pattern": "abcdef", "redacted": True} + ] + assert loaded["vulnerabilities"][0]["type"] == VULN_CMDI + assert loaded["vulnerabilities"][0]["hash"] == hash_value + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [{"value": "dir "}, {"redacted": True}, {"pattern": "abcdef", "redacted": True, "source": 0}] + } + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_header_injection(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - oce.reconfigure() - patch_iast({"header_injection": True}) - root_span, _ = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/header-injection/", - payload="master", - content_type="application/json", - ) + root_span, _ = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/header-injection/", + payload="master", + content_type="application/json", + ) - loaded = json.loads(root_span.get_tag(IAST.JSON)) + loaded = json.loads(root_span.get_tag(IAST.JSON)) - line, hash_value = get_line_and_hash("iast_header_injection", VULN_HEADER_INJECTION, filename=TEST_FILE) + line, hash_value = get_line_and_hash("iast_header_injection", VULN_HEADER_INJECTION, filename=TEST_FILE) - assert loaded["sources"] == [{"origin": "http.request.body", "name": "http.request.body", "value": "master"}] - assert loaded["vulnerabilities"][0]["type"] == VULN_HEADER_INJECTION - assert loaded["vulnerabilities"][0]["hash"] == hash_value - assert loaded["vulnerabilities"][0]["evidence"] == { - "valueParts": [{"value": "Header-Injection: "}, {"source": 0, "value": "master"}] - } - assert loaded["vulnerabilities"][0]["location"]["line"] == line - assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["sources"] == [{"origin": "http.request.body", "name": "http.request.body", "value": "master"}] + assert loaded["vulnerabilities"][0]["type"] == VULN_HEADER_INJECTION + assert loaded["vulnerabilities"][0]["hash"] == hash_value + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [{"value": "Header-Injection: "}, {"source": 0, "value": "master"}] + } + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_insecure_cookie(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - oce.reconfigure() - root_span, _ = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/insecure-cookie/test_insecure/", - ) + root_span, _ = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/insecure-cookie/test_insecure/", + ) - assert root_span.get_metric(IAST.ENABLED) == 1.0 + assert root_span.get_metric(IAST.ENABLED) == 1.0 - loaded = json.loads(root_span.get_tag(IAST.JSON)) - assert loaded["sources"] == [] - assert len(loaded["vulnerabilities"]) == 1 - vulnerability = loaded["vulnerabilities"][0] - assert vulnerability["type"] == VULN_INSECURE_COOKIE - assert vulnerability["evidence"] == {"valueParts": [{"value": "insecure"}]} - assert "path" not in vulnerability["location"].keys() - assert "line" not in vulnerability["location"].keys() - assert vulnerability["location"]["spanId"] - assert vulnerability["hash"] + loaded = json.loads(root_span.get_tag(IAST.JSON)) + assert loaded["sources"] == [] + assert len(loaded["vulnerabilities"]) == 1 + vulnerability = loaded["vulnerabilities"][0] + assert vulnerability["type"] == VULN_INSECURE_COOKIE + assert vulnerability["evidence"] == {"valueParts": [{"value": "insecure"}]} + assert "path" not in vulnerability["location"].keys() + assert "line" not in vulnerability["location"].keys() + assert vulnerability["location"]["spanId"] + assert vulnerability["hash"] @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_insecure_cookie_secure(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - oce.reconfigure() - root_span, _ = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/insecure-cookie/test_secure/", - ) + root_span, _ = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/insecure-cookie/test_secure/", + ) - assert root_span.get_metric(IAST.ENABLED) == 1.0 + assert root_span.get_metric(IAST.ENABLED) == 1.0 - assert root_span.get_tag(IAST.JSON) is None + assert root_span.get_tag(IAST.JSON) is None @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_insecure_cookie_empty_cookie(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - oce.reconfigure() - root_span, _ = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/insecure-cookie/test_empty_cookie/", - ) + root_span, _ = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/insecure-cookie/test_empty_cookie/", + ) - assert root_span.get_metric(IAST.ENABLED) == 1.0 + assert root_span.get_metric(IAST.ENABLED) == 1.0 - assert root_span.get_tag(IAST.JSON) is None + assert root_span.get_tag(IAST.JSON) is None @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_insecure_cookie_2_insecure_1_secure(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - oce.reconfigure() - root_span, _ = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/insecure-cookie/test_insecure_2_1/", - ) + root_span, _ = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/insecure-cookie/test_insecure_2_1/", + ) - assert root_span.get_metric(IAST.ENABLED) == 1.0 + assert root_span.get_metric(IAST.ENABLED) == 1.0 - loaded = json.loads(root_span.get_tag(IAST.JSON)) - assert loaded["sources"] == [] - assert len(loaded["vulnerabilities"]) == 2 + loaded = json.loads(root_span.get_tag(IAST.JSON)) + assert loaded["sources"] == [] + assert len(loaded["vulnerabilities"]) == 2 @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_insecure_cookie_special_characters(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _iast_deduplication_enabled=False)): - oce.reconfigure() - root_span, _ = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/insecure-cookie/test_insecure_special/", - ) + root_span, _ = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/insecure-cookie/test_insecure_special/", + ) - assert root_span.get_metric(IAST.ENABLED) == 1.0 + assert root_span.get_metric(IAST.ENABLED) == 1.0 - loaded = json.loads(root_span.get_tag(IAST.JSON)) - assert loaded["sources"] == [] - assert len(loaded["vulnerabilities"]) == 1 - vulnerability = loaded["vulnerabilities"][0] - assert vulnerability["type"] == VULN_INSECURE_COOKIE - assert vulnerability["evidence"] == {"valueParts": [{"value": "insecure"}]} - assert "path" not in vulnerability["location"].keys() - assert "line" not in vulnerability["location"].keys() - assert vulnerability["location"]["spanId"] - assert vulnerability["hash"] + loaded = json.loads(root_span.get_tag(IAST.JSON)) + assert loaded["sources"] == [] + assert len(loaded["vulnerabilities"]) == 1 + vulnerability = loaded["vulnerabilities"][0] + assert vulnerability["type"] == VULN_INSECURE_COOKIE + assert vulnerability["evidence"] == {"valueParts": [{"value": "insecure"}]} + assert "path" not in vulnerability["location"].keys() + assert "line" not in vulnerability["location"].keys() + assert vulnerability["location"]["spanId"] + assert vulnerability["hash"] @pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") def test_django_stacktrace_leak(client, test_spans, tracer): - with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): - oce.reconfigure() - root_span, _ = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/stacktrace_leak/", - ) + root_span, _ = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/stacktrace_leak/", + ) + + assert root_span.get_metric(IAST.ENABLED) == 1.0 + + loaded = json.loads(root_span.get_tag(IAST.JSON)) + assert loaded["sources"] == [] + assert len(loaded["vulnerabilities"]) == 1 + vulnerability = loaded["vulnerabilities"][0] + assert vulnerability["type"] == VULN_STACKTRACE_LEAK + assert vulnerability["evidence"] == { + "valueParts": [ + {"value": 'Module: ".home.foobaruser.sources.minimal-django-example.app.py"\nException: IndexError'} + ] + } + assert vulnerability["hash"] + + +def test_django_stacktrace_from_technical_500_response(client, test_spans, tracer, debug_mode): + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/stacktrace_leak_500/", + content_type="text/html", + ) + + assert response.status_code == 500, "Expected a 500 status code" + assert root_span.get_metric(IAST.ENABLED) == 1.0 + + loaded = json.loads(root_span.get_tag(IAST.JSON)) + # technical_500_response reports a XSS also + vulnerability = [vln for vln in loaded["vulnerabilities"] if vln["type"] == VULN_STACKTRACE_LEAK][0] + assert vulnerability["evidence"] == { + "valueParts": [ + {"value": "Module: tests.appsec.integrations.django_tests.django_app.views\nException: Exception"} + ] + } + assert vulnerability["hash"] + + +def test_django_xss(client, test_spans, tracer): + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/xss/?input=", + ) + + vuln_type = "XSS" - assert root_span.get_metric(IAST.ENABLED) == 1.0 - - loaded = json.loads(root_span.get_tag(IAST.JSON)) - assert loaded["sources"] == [] - assert len(loaded["vulnerabilities"]) == 1 - vulnerability = loaded["vulnerabilities"][0] - assert vulnerability["type"] == VULN_STACKTRACE_LEAK - assert vulnerability["evidence"] == { - "valueParts": [ - {"value": 'Module: ".home.foobaruser.sources.minimal-django-example.app.py"\nException: IndexError'} - ] + assert response.status_code == 200 + assert response.content == b"\n\n

Input:

\n\n" + + loaded = json.loads(root_span.get_tag(IAST.JSON)) + + line, hash_value = get_line_and_hash("xss_http_request_parameter_mark_safe", vuln_type, filename=TEST_FILE) + + assert loaded["sources"] == [ + { + "name": "input", + "origin": "http.request.parameter", + "value": "", } - assert vulnerability["hash"] + ] + assert loaded["vulnerabilities"][0]["type"] == vuln_type + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [ + {"source": 0, "value": ""}, + ] + } + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["hash"] == hash_value -@pytest.fixture -def debug_mode(): - from django.conf import settings - original_debug = settings.DEBUG - settings.DEBUG = True - yield - settings.DEBUG = original_debug +def test_django_xss_safe_template_tag(client, test_spans, tracer): + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/xss/safe/?input=", + ) + vuln_type = "XSS" -@pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST") -def test_django_stacktrace_from_technical_500_response(client, test_spans, tracer, debug_mode): - with override_global_config(dict(_iast_enabled=True, _deduplication_enabled=False)): - with tracer.trace("test", span_type=SpanTypes.WEB, service="test") as span: - start_context(span) - oce.reconfigure() - root_span, response = _aux_appsec_get_root_span( - client, - test_spans, - tracer, - url="/appsec/stacktrace_leak_500/", - content_type="text/html", - ) - - assert response.status_code == 500, "Expected a 500 status code" - assert root_span.get_metric(IAST.ENABLED) == 1.0 - - loaded = json.loads(root_span.get_tag(IAST.JSON)) - assert loaded["sources"] == [] - assert len(loaded["vulnerabilities"]) == 1 - vulnerability = loaded["vulnerabilities"][0] - assert vulnerability["type"] == VULN_STACKTRACE_LEAK - assert vulnerability["evidence"] == { - "valueParts": [ - {"value": "Module: tests.appsec.integrations.django_tests.django_app.views\nException: Exception"} - ] - } - assert vulnerability["hash"] + assert response.status_code == 200 + assert response.content == b"\n\n

Input:

\n\n" + + loaded = json.loads(root_span.get_tag(IAST.JSON)) + + line, hash_value = get_line_and_hash("xss_http_request_parameter_template_safe", vuln_type, filename=TEST_FILE) + + assert loaded["sources"] == [ + { + "name": "input", + "origin": "http.request.parameter", + "value": "", + } + ] + + assert loaded["vulnerabilities"][0]["type"] == vuln_type + assert loaded["vulnerabilities"][0]["evidence"] == { + "valueParts": [ + {"source": 0, "value": ""}, + ] + } + assert loaded["vulnerabilities"][0]["location"]["path"] == TEST_FILE + assert loaded["vulnerabilities"][0]["location"]["line"] == line + assert loaded["vulnerabilities"][0]["hash"] == hash_value + + +def test_django_xss_autoscape(client, test_spans, tracer): + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/xss/autoscape/?input=", + ) + + assert response.status_code == 200 + assert ( + response.content + == b"\n\n

\n <script>alert('XSS')</script>\n

\n\n\n" + ), f"Error. content is {response.content}" + + loaded = root_span.get_tag(IAST.JSON) + assert loaded is None + + +def test_django_xss_secure(client, test_spans, tracer): + root_span, response = _aux_appsec_get_root_span( + client, + test_spans, + tracer, + url="/appsec/xss/secure/?input=", + ) + + assert response.status_code == 200 + assert ( + response.content + == b"\n\n

Input: <script>alert('XSS')</script>

\n\n" + ) + + loaded = root_span.get_tag(IAST.JSON) + assert loaded is None diff --git a/tests/appsec/integrations/pygoat_tests/test_pygoat.py b/tests/appsec/integrations/pygoat_tests/test_pygoat.py index 8bb8baae1bd..8e6ff5da0ff 100644 --- a/tests/appsec/integrations/pygoat_tests/test_pygoat.py +++ b/tests/appsec/integrations/pygoat_tests/test_pygoat.py @@ -5,7 +5,6 @@ import requests from tests.appsec.iast.conftest import iast_context_defaults -from tests.utils import flaky span_defaults = iast_context_defaults # So ruff does not remove it @@ -108,7 +107,6 @@ def test_nohttponly_cookie(client): assert vulnerability_in_traces("NO_HTTPONLY_COOKIE", client.agent_session) -@flaky(1735812000) def test_weak_random(client): reply = client.pygoat_session.get(PYGOAT_URL + "/otp?email=test%40test.com", headers=TESTAGENT_HEADERS) assert reply.status_code == 200 @@ -124,7 +122,6 @@ def test_weak_hash(client): assert vulnerability_in_traces("WEAK_HASH", client.agent_session) -@flaky(1735812000) def test_cmdi(client): payload = {"domain": "google.com && ls", "csrfmiddlewaretoken": client.csrftoken} reply = client.pygoat_session.post(PYGOAT_URL + "/cmd_lab", data=payload, headers=TESTAGENT_HEADERS) @@ -132,7 +129,6 @@ def test_cmdi(client): assert vulnerability_in_traces("COMMAND_INJECTION", client.agent_session) -@pytest.mark.skip("TODO: fix interaction with new RASP rules") def test_sqli(client): payload = {"name": "admin", "pass": "anything' OR '1' ='1", "csrfmiddlewaretoken": client.csrftoken} reply = client.pygoat_session.post(PYGOAT_URL + "/sql_lab", data=payload, headers=TESTAGENT_HEADERS) @@ -142,34 +138,20 @@ def test_sqli(client): @pytest.mark.skip("TODO: SSRF is not implemented for open()") def test_ssrf1(client, iast_context_defaults): - from ddtrace.appsec._iast._taint_tracking import OriginType - from ddtrace.appsec._iast._taint_tracking._taint_objects import taint_pyobject - - s = "templates/Lab/ssrf/blogs/blog2.txt" - tainted_path = taint_pyobject( - pyobject=s, - source_name="test_ssrf", - source_value=s, - source_origin=OriginType.PARAMETER, - ) - payload = {"blog": tainted_path, "csrfmiddlewaretoken": client.csrftoken} + payload = {"blog": "templates/Lab/ssrf/blogs/blog2.txt", "csrfmiddlewaretoken": client.csrftoken} reply = client.pygoat_session.post(PYGOAT_URL + "/ssrf_lab", data=payload, headers=TESTAGENT_HEADERS) assert reply.status_code == 200 assert vulnerability_in_traces("SSRF", client.agent_session) def test_ssrf2(client, iast_context_defaults): - from ddtrace.appsec._iast._taint_tracking import OriginType - from ddtrace.appsec._iast._taint_tracking._taint_objects import taint_pyobject - - s = "http://example.com" - tainted_path = taint_pyobject( - pyobject=s, - source_name="test_ssrf", - source_value=s, - source_origin=OriginType.PARAMETER, - ) - payload = {"url": tainted_path, "csrfmiddlewaretoken": client.csrftoken} + payload = {"url": "http://example.com", "csrfmiddlewaretoken": client.csrftoken} reply = client.pygoat_session.post(PYGOAT_URL + "/ssrf_lab2", data=payload, headers=TESTAGENT_HEADERS) assert reply.status_code == 200 assert vulnerability_in_traces("SSRF", client.agent_session) + + +def test_xss(client): + reply = client.pygoat_session.get(PYGOAT_URL + '/xssL?q=', headers=TESTAGENT_HEADERS) + assert reply.status_code == 200 + assert vulnerability_in_traces("XSS", client.agent_session)