diff --git a/python/agb/modules/file_transfer.py b/python/agb/modules/file_transfer.py index fc8a2c8..10be6df 100644 --- a/python/agb/modules/file_transfer.py +++ b/python/agb/modules/file_transfer.py @@ -7,6 +7,7 @@ import os import time from pathlib import PurePosixPath +from urllib.parse import urlsplit from typing import Callable, Dict, Optional, Tuple import httpx @@ -24,6 +25,12 @@ logger = get_logger("file_transfer") +def _sanitize_url(url: str) -> str: + """Return URL with query string removed to avoid leaking pre-signed tokens.""" + parts = urlsplit(url) + return parts._replace(query="", fragment="").geturl() + + class FileTransfer: """ Provides pre-signed URL upload/download functionality between local and OSS, @@ -222,7 +229,7 @@ def upload( upload_url = url_res.url req_id_upload = getattr(url_res, "request_id", None) - logger.info(f"Uploading {local_path} to OSS (URL: {upload_url})") + logger.info(f"Uploading {local_path} to OSS (URL: {_sanitize_url(upload_url)})") # 2. PUT upload to pre-signed URL try: @@ -483,7 +490,7 @@ def download( error_message=f"Destination exists and overwrite=False: {local_path}", ) - logger.info(f"Downloading from OSS to {local_path} (URL: {download_url})") + logger.info(f"Downloading from OSS to {local_path} (URL: {_sanitize_url(download_url)})") http_status, bytes_received = self._get_file_sync( download_url, local_path, diff --git a/python/tests/unit/test_cwe532_presigned_url_leak.py b/python/tests/unit/test_cwe532_presigned_url_leak.py new file mode 100644 index 0000000..f14ebfc --- /dev/null +++ b/python/tests/unit/test_cwe532_presigned_url_leak.py @@ -0,0 +1,199 @@ +""" +PoC test for CWE-532: Pre-signed URLs logged in plaintext (file_transfer.py). + +Pre-signed upload/download URLs contain authentication tokens in query-string +parameters (e.g. Signature, OSSAccessKeyId, Expires). These are bearer tokens: +anyone who reads log output can use them. + +The test captures log output emitted by FileTransfer.upload() and +FileTransfer.download() and asserts it does NOT contain the query parameters. +""" +from __future__ import annotations + +import os +from types import SimpleNamespace +from unittest.mock import patch + +import pytest +from loguru import logger + +from agb.modules.file_transfer import FileTransfer + +# --------------------------------------------------------------------------- +# Fakes (same pattern as existing unit tests) +# --------------------------------------------------------------------------- + +PRESIGNED_UPLOAD_URL = ( + "https://bucket.oss-cn-hangzhou.aliyuncs.com/path/file.txt" + "?OSSAccessKeyId=AKID1234567890" + "&Expires=1719999999" + "&Signature=dGhpcyBpcyBhIHNlY3JldA%3D%3D" +) + +PRESIGNED_DOWNLOAD_URL = ( + "https://bucket.oss-cn-hangzhou.aliyuncs.com/path/remote.txt" + "?OSSAccessKeyId=AKID0987654321" + "&Expires=1720000000" + "&Signature=c2Vuc2l0aXZlc2lnbmF0dXJl" +) + + +class _FakeInternalContextResponse: + def __init__(self, ok=True, data=None, err=None): + self._ok = ok + self._data = data + self._err = err + + def is_successful(self): + return self._ok + + def get_error_message(self): + return self._err + + def get_context_list(self): + return self._data + + +class _FakeSession: + def __init__(self, sid="s1"): + self._sid = sid + self.context = SimpleNamespace() + + def get_session_id(self): + return self._sid + + +class _FakeClient: + def __init__(self, resp): + self._resp = resp + + def get_and_load_internal_context(self, request): + return self._resp + + +class _FakeContextSvc: + def __init__(self, upload_url=None, download_url=None): + self._up = upload_url + self._dl = download_url + + def get_file_upload_url(self, cid, path): + return self._up + + def get_file_download_url(self, cid, path): + return self._dl + + +class _FakeAGB: + def __init__(self, api_key="ak", client=None, context=None): + self.api_key = api_key + self.client = client + self.context = context + + +# --------------------------------------------------------------------------- +# Helper: capture loguru output as a list of strings +# --------------------------------------------------------------------------- + +class _LogCapture: + """Sink for loguru that stores messages.""" + + def __init__(self): + self.messages: list[str] = [] + + def write(self, message): + self.messages.append(str(message)) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +def test_upload_does_not_log_presigned_url_query_params(tmp_path): + """upload() must not emit pre-signed URL query parameters to logs.""" + # Prepare local file + local = tmp_path / "data.bin" + local.write_bytes(b"hello") + + resp = _FakeInternalContextResponse( + ok=True, data=[{"contextId": "cid", "contextPath": "/tmp"}] + ) + url_res = SimpleNamespace( + success=True, + url=PRESIGNED_UPLOAD_URL, + request_id="rid-up", + message=None, + ) + agb = _FakeAGB( + client=_FakeClient(resp), + context=_FakeContextSvc(upload_url=url_res), + ) + ft = FileTransfer(agb, _FakeSession("s-1")) + + # Capture logs + capture = _LogCapture() + handler_id = logger.add(capture, format="{message}", level="DEBUG") + + try: + with patch.object(ft, "_put_file_sync", return_value=(200, "etag", 5)): + with patch.object(ft, "_await_sync", return_value="rid-sync"): + ft.upload(str(local), "/path/file.txt", wait=False) + finally: + logger.remove(handler_id) + + all_logs = "\n".join(capture.messages) + + # The URL path is fine to log, but query params with secrets must NOT appear + assert "OSSAccessKeyId" not in all_logs, ( + f"Pre-signed URL query parameter leaked in logs:\n{all_logs}" + ) + assert "Signature" not in all_logs, ( + f"Pre-signed URL signature leaked in logs:\n{all_logs}" + ) + assert "Expires" not in all_logs, ( + f"Pre-signed URL expiry leaked in logs:\n{all_logs}" + ) + + +def test_download_does_not_log_presigned_url_query_params(tmp_path): + """download() must not emit pre-signed URL query parameters to logs.""" + resp = _FakeInternalContextResponse( + ok=True, data=[{"contextId": "cid", "contextPath": "/tmp"}] + ) + dl_url_res = SimpleNamespace( + success=True, + url=PRESIGNED_DOWNLOAD_URL, + request_id="rid-dl", + message=None, + ) + agb = _FakeAGB( + client=_FakeClient(resp), + context=_FakeContextSvc(download_url=dl_url_res), + ) + ft = FileTransfer(agb, _FakeSession("s-2")) + + capture = _LogCapture() + handler_id = logger.add(capture, format="{message}", level="DEBUG") + + def _fake_get(url, dest, *a, **k): + with open(dest, "wb") as f: + f.write(b"data") + return 200, 4 + + try: + with patch.object(ft, "_await_sync", return_value="rid-sync"): + with patch.object(ft, "_get_file_sync", side_effect=_fake_get): + ft.download("/path/remote.txt", str(tmp_path / "out.bin"), wait=False) + finally: + logger.remove(handler_id) + + all_logs = "\n".join(capture.messages) + + assert "OSSAccessKeyId" not in all_logs, ( + f"Pre-signed URL query parameter leaked in logs:\n{all_logs}" + ) + assert "Signature" not in all_logs, ( + f"Pre-signed URL signature leaked in logs:\n{all_logs}" + ) + assert "Expires" not in all_logs, ( + f"Pre-signed URL expiry leaked in logs:\n{all_logs}" + )