From 99157ff2e373c4c47136fc2e7598255ef82a3cb6 Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Wed, 20 May 2026 13:32:19 -0700 Subject: [PATCH 1/2] Add json.loads on bytes & surrogatepass --- aikido_zen/context/__init__.py | 15 ++++++- aikido_zen/context/init_test.py | 25 ++++++++++- aikido_zen/helpers/path_to_string.py | 8 ++-- aikido_zen/helpers/path_to_string_test.py | 6 ++- aikido_zen/sources/flask/extract_form_data.py | 2 +- aikido_zen/sources/quart.py | 2 +- .../detect_path_traversal_test.py | 13 ++++++ end2end/django_mysql_test.py | 42 +++++++++++++++++++ end2end/flask_mongo_test.py | 42 +++++++++++++++++++ sample-apps/django-mysql/sample_app/urls.py | 4 +- sample-apps/django-mysql/sample_app/views.py | 31 +++++++++++++- sample-apps/flask-mongo/app.py | 28 +++++++++++++ 12 files changed, 206 insertions(+), 12 deletions(-) diff --git a/aikido_zen/context/__init__.py b/aikido_zen/context/__init__.py index bb8eeb3b3..04a00e6a5 100644 --- a/aikido_zen/context/__init__.py +++ b/aikido_zen/context/__init__.py @@ -119,7 +119,20 @@ def set_body_internal(self, body): # Make sure that empty bodies like b"" don't get sent. self.body = None if isinstance(self.body, bytes): - self.body = self.body.decode("utf-8") # Decode byte input to string. + # json.loads on bytes uses surrogatepass internally, so try it first. + # This handles bodies with surrogate/invalid bytes that would otherwise + # cause decode("utf-8") to raise and leave the JSON unparsed. + try: + parsed_body = json.loads(self.body) + if parsed_body: + self.body = parsed_body + return + except (JSONDecodeError, ValueError): + pass + # Use errors="replace" so invalid bytes become � instead of raising. + # A strict decode would let attackers bypass detection by prepending a + # single invalid byte to any payload. + self.body = self.body.decode("utf-8", errors="replace") if not isinstance(self.body, str): return if self.body.strip()[0] in ["{", "[", '"']: diff --git a/aikido_zen/context/init_test.py b/aikido_zen/context/init_test.py index dff7e3632..a8ca625e0 100644 --- a/aikido_zen/context/init_test.py +++ b/aikido_zen/context/init_test.py @@ -175,9 +175,10 @@ def test_set_normal_byte_string(): def test_set_byte_string_wrong_encoding(): - body = "hello world! 😊".encode("utf-16") # UTF-16 unique character + body = "hello world! 😊".encode("utf-16") # UTF-16 bytes are not valid UTF-8 context = Context(req=basic_wsgi_req, body=body, source="flask") - assert context.body == body # Body remains unchanged because utf-8 failed. + # Invalid bytes are replaced with � so the body is still scannable. + assert context.body == body.decode("utf-8", errors="replace") def test_set_none(): @@ -296,3 +297,23 @@ def test_set_protection_forced_off(): assert context.protection_forced_off is False context.set_force_protection_off(None) assert context.protection_forced_off is None + + +def test_set_bytes_with_invalid_utf8_prefix(): + # Regression: AIKIDO-5RDTZW1V — a single invalid UTF-8 byte (e.g. \xff) prepended + # to a path traversal payload must not bypass detection. The body must be decoded + # with errors="replace" so the traversal string remains visible to sinks. + body = b"\xff/../../../../../etc/passwd" + context = Context(req=basic_wsgi_req, body=body, source="flask") + assert isinstance(context.body, str) + assert "/../../../../../etc/passwd" in context.body + + +def test_set_bytes_json_with_surrogate_bytes(): + # Regression: AIKIDO-B3YABOSP — surrogate bytes embedded in a JSON body must not + # bypass detection. json.loads(bytes) uses surrogatepass internally, so the dict + # is parsed and the attack payload (e.g. {"$regex": ".*"}) is visible. + body = b'{"username": {"$regex": ".*"}, "bypass": "\xed\xa0\x80"}' + context = Context(req=basic_wsgi_req, body=body, source="flask") + assert isinstance(context.body, dict) + assert context.body.get("username") == {"$regex": ".*"} diff --git a/aikido_zen/helpers/path_to_string.py b/aikido_zen/helpers/path_to_string.py index 0e96fe2a3..3839e1b97 100644 --- a/aikido_zen/helpers/path_to_string.py +++ b/aikido_zen/helpers/path_to_string.py @@ -16,10 +16,10 @@ def path_to_string(path): return path if isinstance(path, bytes): - try: - return path.decode("utf-8") - except UnicodeDecodeError: - return None + # Use errors="replace" so invalid bytes (e.g. \xff, surrogate sequences) + # don't silently suppress path traversal detection — the replacement char + # preserves the traversal components that follow. + return path.decode("utf-8", errors="replace") if isinstance(path, PurePath): # Stringify PurePath. This can still allow path traversal but in extremely # limited cases so it's safe to just stringify for now. diff --git a/aikido_zen/helpers/path_to_string_test.py b/aikido_zen/helpers/path_to_string_test.py index 0a3547edd..8986bccb8 100644 --- a/aikido_zen/helpers/path_to_string_test.py +++ b/aikido_zen/helpers/path_to_string_test.py @@ -16,7 +16,11 @@ def test_path_to_string_with_valid_url(): def test_path_to_string_with_bytes(): assert path_to_string(b"test.txt") == "test.txt" assert path_to_string(b"/home/user/file.txt") == "/home/user/file.txt" - assert path_to_string(b"\xff") is None # Invalid UTF-8 byte sequence + # Invalid bytes are replaced with � so traversal components are preserved. + assert path_to_string(b"\xff") == "�" + assert path_to_string(b"\xff/../../../etc/passwd") == "�/../../../etc/passwd" + # Surrogate bytes (AIKIDO-B3YABOSP pattern) also survive as replacement chars. + assert path_to_string(b"\xed\xa0\x80/../etc/passwd") == "���/../etc/passwd" def test_path_to_string_with_empty_string(): diff --git a/aikido_zen/sources/flask/extract_form_data.py b/aikido_zen/sources/flask/extract_form_data.py index 18f261730..991ed16cf 100644 --- a/aikido_zen/sources/flask/extract_form_data.py +++ b/aikido_zen/sources/flask/extract_form_data.py @@ -10,7 +10,7 @@ def extract_form_data_from_flask_request_and_save_data(req): if req.form: context.set_body(req.form) else: - context.set_body(req.data.decode("utf-8")) + context.set_body(req.data) context.set_as_current_context() except Exception as e: logger.debug("Exception occurred whilst extracting flask body data: %s", e) diff --git a/aikido_zen/sources/quart.py b/aikido_zen/sources/quart.py index 9e0fa6f63..bc56b93b8 100644 --- a/aikido_zen/sources/quart.py +++ b/aikido_zen/sources/quart.py @@ -32,7 +32,7 @@ async def _handle_request_before(func, instance, args, kwargs): context.set_body(form) else: data = await request.data - context.set_body(data.decode("utf-8")) + context.set_body(data) context.cookies = request.cookies.to_dict() context.set_as_current_context() diff --git a/aikido_zen/vulnerabilities/path_traversal/detect_path_traversal_test.py b/aikido_zen/vulnerabilities/path_traversal/detect_path_traversal_test.py index 53d508552..15e714ab3 100644 --- a/aikido_zen/vulnerabilities/path_traversal/detect_path_traversal_test.py +++ b/aikido_zen/vulnerabilities/path_traversal/detect_path_traversal_test.py @@ -132,3 +132,16 @@ def test_path_normalization(): # Combined slashes and dot: ///.///etc/passwd should normalize to /etc/passwd assert detect_path_traversal("///.///etc/passwd", "///.///etc") is True assert detect_path_traversal("///.///etc/passwd", "///.///etc/passwd") is True + + +def test_replacement_char_prefix_does_not_hide_traversal(): + # Regression: AIKIDO-5RDTZW1V / AIKIDO-B3YABOSP — an attacker prepends + # invalid UTF-8 bytes (\xff or surrogate sequences) to a traversal payload. + # After decode("utf-8", errors="replace") both the stored body string and the + # path_to_string() output start with the replacement character �, so the + # user-input substring is still found in the file path and traversal is detected. + replacement = "�" + traversal = "/../../../../../etc/passwd" + assert detect_path_traversal(replacement + traversal, replacement + traversal) is True + # Three replacement chars (from \xed\xa0\x80, three separate bad bytes) + assert detect_path_traversal(replacement * 3 + traversal, replacement * 3 + traversal) is True diff --git a/end2end/django_mysql_test.py b/end2end/django_mysql_test.py index 06161b875..771335600 100644 --- a/end2end/django_mysql_test.py +++ b/end2end/django_mysql_test.py @@ -111,3 +111,45 @@ def test_initial_heartbeat(): assert req_stats["rateLimited"] == 0 assert req_stats["attacksDetected"] == {"blocked": 2, "total": 2} assert req_stats["attackWaves"] == {"total": 0, "blocked": 0} + + +# --- AIKIDO-5RDTZW1V regression: invalid UTF-8 bytes must not bypass detection --- + +def test_bypass_invalid_utf8_bytes_path_traversal(): + # An attacker prepends \xff (invalid UTF-8) to a path traversal payload. + # Before the fix, decode("utf-8") raised UnicodeDecodeError and the body was + # never stored, so the firewall saw nothing. After the fix the body is decoded + # with errors="replace" and the traversal is still detected. + body = b"\xff/../../../../../etc/passwd" + res = requests.post(base_url_fw + "/read", data=body) + assert res.status_code == 500 + + time.sleep(5) + events = fetch_events_from_mock("http://localhost:5000") + attacks = filter_on_event_type(events, "detected_attack") + + assert len(attacks) == 3 + assert attacks[2]["attack"]["kind"] == "path_traversal" + assert attacks[2]["attack"]["blocked"] is True + assert attacks[2]["attack"]["source"] == "body" + + +# --- AIKIDO-B3YABOSP regression: surrogate bytes in JSON must not bypass detection --- + +def test_bypass_surrogate_bytes_sql_injection(): + # Surrogate bytes (\xed\xa0\x80) make decode("utf-8") raise, so the old code + # never parsed the body as JSON and the SQL injection payload was invisible. + # After the fix, json.loads(bytes) is tried first (it uses surrogatepass internally) + # so the dict is extracted and the injection is caught when the cursor executes. + body = b'{"dog_name": "Dangerous bobby\\", 1); -- ", "bypass": "\xed\xa0\x80"}' + res = requests.post(base_url_fw + "/json-sql", data=body) + assert res.status_code == 500 + + time.sleep(5) + events = fetch_events_from_mock("http://localhost:5000") + attacks = filter_on_event_type(events, "detected_attack") + + assert len(attacks) == 4 + assert attacks[3]["attack"]["kind"] == "sql_injection" + assert attacks[3]["attack"]["blocked"] is True + assert attacks[3]["attack"]["source"] == "body" diff --git a/end2end/flask_mongo_test.py b/end2end/flask_mongo_test.py index bb32589c9..3f545a0b8 100644 --- a/end2end/flask_mongo_test.py +++ b/end2end/flask_mongo_test.py @@ -126,3 +126,45 @@ def test_dangerous_auth_fw_force(): 'source': "body", 'user': None } + + +# --- AIKIDO-5RDTZW1V regression: invalid UTF-8 bytes must not bypass detection --- + +def test_bypass_invalid_utf8_bytes_path_traversal(): + # An attacker prepends \xff (invalid UTF-8) to a path traversal payload. + # Before the fix, decode("utf-8") raised UnicodeDecodeError and the body was + # never stored, so the firewall saw nothing. After the fix the body is decoded + # with errors="replace" and the traversal is still detected. + body = b"\xff/../../../../../etc/passwd" + res = requests.post("http://localhost:8094/read", data=body) + assert res.status_code == 500 + + time.sleep(5) + events = fetch_events_from_mock("http://localhost:5000") + attacks = filter_on_event_type(events, "detected_attack") + + assert len(attacks) == 3 + assert attacks[2]["attack"]["kind"] == "path_traversal" + assert attacks[2]["attack"]["blocked"] is True + assert attacks[2]["attack"]["source"] == "body" + + +# --- AIKIDO-B3YABOSP regression: surrogate bytes in JSON must not bypass detection --- + +def test_bypass_surrogate_bytes_nosql_injection(): + # Surrogate bytes (\xed\xa0\x80) make decode("utf-8") raise, so the old code + # never parsed the JSON and the NoSQL injection payload {"$ne":""} was invisible. + # After the fix, json.loads(bytes) is tried first (it uses surrogatepass internally) + # so the dict body is fully parsed and the injection is caught. + body = b'{"dog_name": "bobby_tables", "pswd": {"$ne": ""}, "bypass": "\xed\xa0\x80"}' + res = requests.post("http://localhost:8094/auth-raw", data=body) + assert res.status_code == 500 + + time.sleep(5) + events = fetch_events_from_mock("http://localhost:5000") + attacks = filter_on_event_type(events, "detected_attack") + + assert len(attacks) == 4 + assert attacks[3]["attack"]["kind"] == "nosql_injection" + assert attacks[3]["attack"]["blocked"] is True + assert attacks[3]["attack"]["source"] == "body" diff --git a/sample-apps/django-mysql/sample_app/urls.py b/sample-apps/django-mysql/sample_app/urls.py index 296442ee1..393635382 100644 --- a/sample-apps/django-mysql/sample_app/urls.py +++ b/sample-apps/django-mysql/sample_app/urls.py @@ -6,5 +6,7 @@ path("", views.index, name="index"), path("dogpage/", views.dog_page, name="dog_page"), path("shell/", views.shell_url, name="shell"), - path("create", views.create_dogpage, name="create") + path("create", views.create_dogpage, name="create"), + path("read", views.read_file, name="read"), + path("json-sql", views.json_sql, name="json_sql"), ] diff --git a/sample-apps/django-mysql/sample_app/views.py b/sample-apps/django-mysql/sample_app/views.py index 37e45702d..c33e9d0a4 100644 --- a/sample-apps/django-mysql/sample_app/views.py +++ b/sample-apps/django-mysql/sample_app/views.py @@ -4,7 +4,7 @@ from .models import Dogs from django.db import connection from django.views.decorators.csrf import csrf_exempt -# Create your views here. +import json import subprocess def index(request): @@ -37,3 +37,32 @@ def create_dogpage(request): print("QUERY : ", query) cursor.execute(query) return HttpResponse("Dog page created") + + +# --- bypass regression endpoints --- + +@csrf_exempt +def read_file(request): + # Passes raw bytes body directly to open() — path traversal sink. + # Used by AIKIDO-5RDTZW1V regression test: a leading \xff byte must not + # prevent the firewall from detecting the traversal in the rest of the path. + if request.method == 'POST': + with open(request.body) as f: + return HttpResponse(f.read()) + return HttpResponse("Use POST") + + +@csrf_exempt +def json_sql(request): + # Parses body via json.loads(bytes) without relying on Content-Type. + # Used by AIKIDO-B3YABOSP regression test: surrogate bytes (\xed\xa0\x80) + # embedded in the JSON body must not prevent the firewall from parsing the + # body and detecting the SQL injection payload. + if request.method == 'POST': + data = json.loads(request.body) + dog_name = data.get('dog_name', '') + with connection.cursor() as cursor: + query = 'INSERT INTO sample_app_dogs (dog_name, dog_boss) VALUES ("%s", "N/A")' % dog_name + cursor.execute(query) + return HttpResponse("OK") + return HttpResponse("Use POST") diff --git a/sample-apps/flask-mongo/app.py b/sample-apps/flask-mongo/app.py index 240f1fe97..80fc24ae1 100644 --- a/sample-apps/flask-mongo/app.py +++ b/sample-apps/flask-mongo/app.py @@ -67,3 +67,31 @@ def post_auth2(): return f'Dog with name {dog_name} authenticated successfully' else: return f'Auth failed' + + +# --- bypass regression endpoints --- + +@app.route("/read", methods=['POST']) +def read_file(): + # Passes the raw bytes body directly to open() — path traversal sink. + # Used by AIKIDO-5RDTZW1V regression test: a leading \xff byte must not + # prevent the firewall from detecting the traversal in the rest of the path. + with open(request.data) as f: + return f.read() + + +@app.route("/auth-raw", methods=['POST']) +def post_auth_raw(): + # Parses the body via json.loads(bytes) without relying on Content-Type. + # Used by AIKIDO-B3YABOSP regression test: surrogate bytes (\xed\xa0\x80) + # embedded in the JSON body must not prevent the firewall from parsing the + # body and detecting the NoSQL injection payload. + data = json.loads(request.data) + dog_info = { + 'dog_name': data.get('dog_name'), + 'pswd': data.get('pswd'), + } + dog = mongo.db.dogs.find_one(dog_info) + if dog: + return f'Dog with name {dog["dog_name"]} authenticated successfully' + return 'Auth failed' From 526d09df65ee6efef428f0bb12e7aa31aff39abf Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Wed, 20 May 2026 13:33:04 -0700 Subject: [PATCH 2/2] lint --- .../path_traversal/detect_path_traversal_test.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/aikido_zen/vulnerabilities/path_traversal/detect_path_traversal_test.py b/aikido_zen/vulnerabilities/path_traversal/detect_path_traversal_test.py index 15e714ab3..f627e3754 100644 --- a/aikido_zen/vulnerabilities/path_traversal/detect_path_traversal_test.py +++ b/aikido_zen/vulnerabilities/path_traversal/detect_path_traversal_test.py @@ -142,6 +142,11 @@ def test_replacement_char_prefix_does_not_hide_traversal(): # user-input substring is still found in the file path and traversal is detected. replacement = "�" traversal = "/../../../../../etc/passwd" - assert detect_path_traversal(replacement + traversal, replacement + traversal) is True + assert ( + detect_path_traversal(replacement + traversal, replacement + traversal) is True + ) # Three replacement chars (from \xed\xa0\x80, three separate bad bytes) - assert detect_path_traversal(replacement * 3 + traversal, replacement * 3 + traversal) is True + assert ( + detect_path_traversal(replacement * 3 + traversal, replacement * 3 + traversal) + is True + )