Conversation
| raise SynthesisExportError("No file instances to export") | ||
|
|
||
| output_dir = self._output_path or self._get_default_output_dir() | ||
| os.makedirs(output_dir, exist_ok=True) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 3 days ago
In general terms, we need to ensure that any user-controlled path is validated and constrained before being used with filesystem APIs. In this case, we should ensure that output_path (if provided) resolves to a subdirectory of a server-controlled base directory, after normalizing the path. If it does not, we should reject it with a clear error. We should also avoid letting the user force absolute paths.
The best fix here is to add a helper method in SynthesisDatasetExporter that computes a safe output directory: it should decide on a base export root (for example, a fixed directory under the application’s working directory), normalize the combination of that root and any user-supplied output_path, and then verify that the resulting path stays under the root. Then export_data should call this helper instead of using self._output_path directly. This maintains the existing behavior (allowing per-request subdirectories) but prevents directory traversal and arbitrary absolute paths.
Concretely, in runtime/datamate-python/app/module/generation/service/export_service.py:
-
Add a private method, e.g.
_get_safe_output_dir, toSynthesisDatasetExporter:- If
self._output_pathis falsy: return_get_default_output_dir()(existing behavior). - Otherwise:
- Choose a base root, for example
base_root = os.path.abspath(self._get_default_output_dir()). This keeps everything under whatever default export root the app already uses. - Normalize the user-specified relative path:
safe_rel = os.path.normpath(self._output_path). - Reject absolute paths or paths that start with
os.pardir(..) after normalization, by raisingSynthesisExportError. - Compose the final path:
candidate = os.path.abspath(os.path.join(base_root, safe_rel)). - Verify that
candidatestarts withbase_root + os.sepor equalsbase_root. If not, raiseSynthesisExportError. - Return
candidate.
- Choose a base root, for example
- If
-
Update
export_dataso that line 151 usesoutput_dir = self._get_safe_output_dir()instead ofself._output_path or self._get_default_output_dir().
No new imports are needed beyond os, which is already imported. All changes are confined to SynthesisDatasetExporter in export_service.py.
| @@ -46,6 +46,37 @@ | ||
| self._format = format if format in self.SUPPORTED_FORMATS else self.DEFAULT_FORMAT | ||
| self._output_path = output_path | ||
|
|
||
| def _get_safe_output_dir(self) -> str: | ||
| """ | ||
| 获取安全的导出目录。 | ||
|
|
||
| 如果未显式指定 output_path,则退回到默认导出目录。 | ||
| 如果指定了 output_path,则将其视为默认导出目录下的相对路径, | ||
| 并进行规范化和越权访问检查,防止目录遍历或写入任意位置。 | ||
| """ | ||
| # 默认根目录(应用已有的默认导出目录) | ||
| base_root = os.path.abspath(self._get_default_output_dir()) | ||
|
|
||
| # 如果没有用户指定的输出路径,直接使用默认导出目录 | ||
| if not self._output_path: | ||
| return base_root | ||
|
|
||
| # 规范化用户提供的路径,防止 ".." 等路径片段 | ||
| user_path = os.path.normpath(self._output_path) | ||
|
|
||
| # 禁止绝对路径,以及以父目录开头的相对路径 | ||
| if os.path.isabs(user_path) or user_path.startswith(os.pardir + os.sep) or user_path == os.pardir: | ||
| raise SynthesisExportError("Invalid output path") | ||
|
|
||
| # 将用户路径视为 base_root 下的子目录 | ||
| candidate = os.path.abspath(os.path.join(base_root, user_path)) | ||
|
|
||
| # 再次确认结果路径仍在 base_root 下 | ||
| if not (candidate == base_root or candidate.startswith(base_root + os.sep)): | ||
| raise SynthesisExportError("Invalid output path") | ||
|
|
||
| return candidate | ||
|
|
||
| async def export_task_to_dataset( | ||
| self, | ||
| task_id: str, | ||
| @@ -148,7 +179,7 @@ | ||
| if not file_instances: | ||
| raise SynthesisExportError("No file instances to export") | ||
|
|
||
| output_dir = self._output_path or self._get_default_output_dir() | ||
| output_dir = self._get_safe_output_dir() | ||
| os.makedirs(output_dir, exist_ok=True) | ||
|
|
||
| file_paths: List[str] = [] |
| def _write_jsonl(self, path: str, records: Iterable[dict], format: Optional[str] = None) -> None: | ||
| """写入 JSONL 文件""" | ||
| fmt = format or self._format | ||
| os.makedirs(os.path.dirname(path), exist_ok=True) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 3 days ago
In general, to fix uncontrolled path usage you should (a) restrict writes to a known safe base directory and/or (b) normalize and validate any user-provided path segments before using them with filesystem APIs. For paths that may include subdirectories, the standard pattern is: choose a fixed base_dir, compute full_path = os.path.normpath(os.path.join(base_dir, user_path)), then ensure full_path is still under base_dir before creating directories or opening files.
For this codebase, the minimal, non‑breaking fix is:
-
Introduce a helper on
SynthesisDatasetExporterthat turnsself._output_path(which may be user-controlled) into a validated, absolute directory under a server-controlled root. We can:- Pick a safe root directory, e.g. the system temp directory returned by
tempfile.gettempdir()(consistent with_get_default_output_dir), and create a fixed subfolder such as"synthesis_exports". - If
self._output_pathis provided, treat it as a subdirectory or file name under that root, not as an absolute path. Normalize withos.path.normpathand check that the resulting resolved directory starts with the chosen root (prefix check on absolute paths). - If the check fails, raise
SynthesisExportError.
- Pick a safe root directory, e.g. the system temp directory returned by
-
Use this helper both where the directory is created (
export_data) and where_write_jsonlconstructs parent directories:- In
export_data, replaceoutput_dir = self._output_path or self._get_default_output_dir()with logic that computes a safeoutput_dirusing the helper. - In
_write_jsonl, derive the parent directory withos.path.dirname(path)and still callos.makedirs(..., exist_ok=True)but only afterpathhas been constructed with the validatedoutput_dir.
- In
-
Keep the external behaviour similar: callers can still pass
output_path, but it will now be interpreted safely (as a subpath under the export root) instead of an arbitrary filesystem location.
Concretely, all changes are limited to runtime/datamate-python/app/module/generation/service/export_service.py:
-
Add a new private method
_get_safe_output_dirnear_get_default_output_dirthat:- Imports
tempfilelocally (like_get_default_output_dir). - Defines
base_root = os.path.join(tempfile.gettempdir(), "synthesis_exports"). - If
self._output_pathis falsy, returnsbase_root. - Otherwise, builds
candidate = os.path.join(base_root, self._output_path), normalizes tonormalized = os.path.normpath(candidate), and ensuresos.path.commonpath([normalized, base_root]) == base_root. If not, raiseSynthesisExportError. - Returns
normalized.
- Imports
-
Modify
export_datasooutput_diris assigned by callingself._get_safe_output_dir()and thenos.makedirs(output_dir, exist_ok=True)remains.
No changes are needed in generation_api.py or to imports beyond using existing os and a local tempfile import inside _get_safe_output_dir (similar to _get_default_output_dir).
| @@ -148,7 +148,8 @@ | ||
| if not file_instances: | ||
| raise SynthesisExportError("No file instances to export") | ||
|
|
||
| output_dir = self._output_path or self._get_default_output_dir() | ||
| # 使用受控且经过校验的输出目录,防止目录穿越或任意路径写入 | ||
| output_dir = self._get_safe_output_dir() | ||
| os.makedirs(output_dir, exist_ok=True) | ||
|
|
||
| file_paths: List[str] = [] | ||
| @@ -265,6 +266,38 @@ | ||
| import tempfile | ||
| return tempfile.gettempdir() | ||
|
|
||
| def _get_safe_output_dir(self) -> str: | ||
| """ | ||
| 获取并校验导出输出目录,确保位于受控根目录下 | ||
|
|
||
| - 如果未指定 output_path,则使用系统临时目录下的固定子目录。 | ||
| - 如果指定了 output_path,则将其视为该根目录下的相对子路径,并进行归一化和越界检查。 | ||
| """ | ||
| import tempfile | ||
|
|
||
| # 受控的导出根目录,例如: /tmp/synthesis_exports | ||
| base_root = os.path.join(tempfile.gettempdir(), "synthesis_exports") | ||
|
|
||
| # 未指定 output_path 时,直接使用受控根目录 | ||
| if not self._output_path: | ||
| return base_root | ||
|
|
||
| # 将用户提供的 output_path 视为 base_root 下的相对路径 | ||
| candidate = os.path.join(base_root, self._output_path) | ||
| normalized = os.path.normpath(candidate) | ||
|
|
||
| # 防止目录穿越,确保归一化后的路径仍位于 base_root 下 | ||
| try: | ||
| common = os.path.commonpath([normalized, base_root]) | ||
| except ValueError: | ||
| # 不同驱动器等情况视为非法路径 | ||
| raise SynthesisExportError("Invalid output path") | ||
|
|
||
| if common != base_root: | ||
| raise SynthesisExportError("Output path is outside of allowed export directory") | ||
|
|
||
| return normalized | ||
|
|
||
| @staticmethod | ||
| def _ensure_dataset_path(dataset: Dataset) -> str: | ||
| """确保数据集路径存在""" |
No description provided.