Skip to content

Commit

Permalink
chore(iast): Header source in werkzeug 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
avara1986 committed Feb 4, 2025
1 parent 2ccaaef commit e18e4ac
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 8 deletions.
19 changes: 12 additions & 7 deletions ddtrace/appsec/_iast/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,28 @@ def _on_flask_patch(flask_version):
"Headers.items",
functools.partial(if_iast_taint_yield_tuple_for, (OriginType.HEADER_NAME, OriginType.HEADER)),
)
_set_metric_iast_instrumented_source(OriginType.HEADER_NAME)
_set_metric_iast_instrumented_source(OriginType.HEADER)

try_wrap_function_wrapper(
"werkzeug.datastructures",
"ImmutableMultiDict.__getitem__",
functools.partial(if_iast_taint_returned_object_for, OriginType.PARAMETER),
"EnvironHeaders.__getitem__",
functools.partial(if_iast_taint_returned_object_for, OriginType.HEADER),
)
_set_metric_iast_instrumented_source(OriginType.PARAMETER)

# Since werkzeug 3.1.0 get doesn't call to __getitem__
try_wrap_function_wrapper(
"werkzeug.datastructures",
"EnvironHeaders.__getitem__",
"EnvironHeaders.get",
functools.partial(if_iast_taint_returned_object_for, OriginType.HEADER),
)
_set_metric_iast_instrumented_source(OriginType.HEADER_NAME)
_set_metric_iast_instrumented_source(OriginType.HEADER)

try_wrap_function_wrapper(
"werkzeug.datastructures",
"ImmutableMultiDict.__getitem__",
functools.partial(if_iast_taint_returned_object_for, OriginType.PARAMETER),
)
_set_metric_iast_instrumented_source(OriginType.PARAMETER)

if flask_version >= (2, 0, 0):
# instance.query_string: raising an error on werkzeug/_internal.py "AttributeError: read only property"
try_wrap_function_wrapper("werkzeug.wrappers.request", "Request.__init__", _on_request_init)
Expand Down
2 changes: 1 addition & 1 deletion hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ flask = ["~=2.2"]

[[envs.appsec_integrations_flask.matrix]]
python = ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
flask = ["~=3.0"]
flask = ["~=3.1"]


[envs.appsec_integrations_fastapi]
Expand Down
56 changes: 56 additions & 0 deletions tests/appsec/integrations/flask_tests/test_iast_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,62 @@ def sqli_2(param_str):
assert vulnerability["location"]["path"] == TEST_FILE_PATH
assert vulnerability["hash"] == hash_value

@pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST")
def test_flask_full_sqli_iast_enabled_http_request_header_get(self):
@self.app.route("/sqli/<string:param_str>/", methods=["GET", "POST"])
def sqli_2(param_str):
import sqlite3

from flask import request

from ddtrace.appsec._iast._taint_tracking.aspects import add_aspect

con = sqlite3.connect(":memory:")
cur = con.cursor()
# label test_flask_full_sqli_iast_enabled_http_request_header_get
cur.execute(add_aspect("SELECT 1 FROM ", request.headers.get("User-Agent")))

return "OK", 200

with override_global_config(
dict(
_iast_enabled=True,
_iast_deduplication_enabled=False,
)
):
resp = self.client.post(
"/sqli/sqlite_master/", data={"name": "test"}, headers={"User-Agent": "sqlite_master"}
)
assert resp.status_code == 200

root_span = self.pop_spans()[0]
assert root_span.get_metric(IAST.ENABLED) == 1.0

loaded = json.loads(root_span.get_tag(IAST.JSON))
assert loaded["sources"] == [
{"origin": "http.request.header", "name": "User-Agent", "value": "sqlite_master"}
]

line, hash_value = get_line_and_hash(
"test_flask_full_sqli_iast_enabled_http_request_header_get",
VULN_SQL_INJECTION,
filename=TEST_FILE_PATH,
)
vulnerability = loaded["vulnerabilities"][0]

assert vulnerability["type"] == VULN_SQL_INJECTION
assert vulnerability["evidence"] == {
"valueParts": [
{"value": "SELECT "},
{"redacted": True},
{"value": " FROM "},
{"value": "sqlite_master", "source": 0},
]
}
assert vulnerability["location"]["line"] == line
assert vulnerability["location"]["path"] == TEST_FILE_PATH
assert vulnerability["hash"] == hash_value

@pytest.mark.skipif(not asm_config._iast_supported, reason="Python version not supported by IAST")
def test_flask_full_sqli_iast_enabled_http_request_header_name_keys(self):
@self.app.route("/sqli/<string:param_str>/", methods=["GET", "POST"])
Expand Down

0 comments on commit e18e4ac

Please sign in to comment.