Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions gigl/src/common/custom_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Subprocess dispatch for ``CustomLauncherConfig``-backed launchers.

Takes ``CustomLauncherConfig.command`` and ``CustomLauncherConfig.args``
verbatim and shells out via ``subprocess.run(shell_line, shell=True)``.
The shell-style invocation honors leading ``KEY=VALUE`` env-var
assignments in ``command`` so callers can self-document required env
without forcing the dispatcher to parse env separately.

The receiving subprocess has no special protocol — it is expected to be
a plain CLI that argparses whatever flags the YAML wires up via
``args[]``. The dispatcher performs no template substitution; any
dynamic content (runtime URIs, image refs, etc.) is the caller's
responsibility — typically resolved at YAML-load time before the
proto reaches this module.
"""

import shlex
import subprocess
from collections.abc import Mapping
from typing import Optional

from gigl.common import Uri
from gigl.common.logger import Logger
from gigl.src.common.constants.components import GiGLComponents
from snapchat.research.gbml.gigl_resource_config_pb2 import CustomLauncherConfig

logger = Logger()

_LAUNCHABLE_COMPONENTS: frozenset[GiGLComponents] = frozenset(
{GiGLComponents.Trainer, GiGLComponents.Inferencer}
)


def launch_custom(
custom_launcher_config: CustomLauncherConfig,
applied_task_identifier: str,
task_config_uri: Uri,
resource_config_uri: Uri,
process_command: str,
process_runtime_args: Mapping[str, str],
cpu_docker_uri: Optional[str],
cuda_docker_uri: Optional[str],
component: GiGLComponents,
) -> None:
"""Shell out to ``custom_launcher_config.command`` with ``args[]`` appended.

Composes a shell line as ``command`` followed by each ``args[]``
element passed through ``shlex.quote``, then invokes
``subprocess.run(shell_line, shell=True, check=True)``.

The dispatcher takes ``command`` and ``args[]`` verbatim — no
template substitution of any kind. Any placeholder text in those
fields reaches ``subprocess.run`` literally; consumers that want
substitution should resolve it at YAML-load time before the proto
reaches this module.

``applied_task_identifier``, ``task_config_uri``,
``resource_config_uri``, ``process_command``,
``process_runtime_args``, ``cpu_docker_uri``, and ``cuda_docker_uri``
are accepted for API symmetry with the GLT-side Vertex AI launchers
but are intentionally not plumbed into the subprocess — the
receiving CLI is expected to source whatever context it needs from
the resource config it gets handed (or from env vars inherited from
the parent process).

Args:
custom_launcher_config: Proto whose ``command`` is the shell
snippet to execute and whose ``args`` are positional
arguments appended verbatim.
applied_task_identifier: Accepted for back-compat; ignored.
task_config_uri: Accepted for back-compat; ignored.
resource_config_uri: Accepted for back-compat; ignored.
process_command: Accepted for back-compat; ignored.
process_runtime_args: Accepted for back-compat; ignored.
cpu_docker_uri: Accepted for back-compat; ignored.
cuda_docker_uri: Accepted for back-compat; ignored.
component: Which GiGL component is being launched. Must be in
``_LAUNCHABLE_COMPONENTS``.

Raises:
ValueError: If ``component`` is not Trainer or Inferencer, or if
``custom_launcher_config.command`` is empty.
subprocess.CalledProcessError: If the spawned subprocess exits
non-zero.
"""
if component not in _LAUNCHABLE_COMPONENTS:
raise ValueError(f"Invalid component: {component}")
if not custom_launcher_config.command:
raise ValueError("CustomLauncherConfig.command must be set")

command: str = custom_launcher_config.command
args: list[str] = list(custom_launcher_config.args)

shell_line = " ".join([command, *(shlex.quote(a) for a in args)])
logger.info(f"Launching {component.name} via subprocess: {shell_line!r}")
subprocess.run(shell_line, shell=True, check=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified an issue in your code:

The subprocess.run() call uses shell=True on user-controlled command strings, allowing shell injection attacks that could execute arbitrary commands with the process's privileges.

More details about this

The subprocess.run() call executes shell_line with shell=True, which spawns a shell process to interpret the command. This is dangerous because if resolved_command or any element in resolved_args comes from untrusted input (e.g., user-provided configuration, external data), an attacker can inject arbitrary shell commands.

For example, if an attacker controls custom_resource_config.command and sets it to id; rm -rf /, the shell will execute both id and the destructive rm -rf / command. Even though shlex.quote() escapes individual arguments, it doesn't protect against injection in the command itself—only in the args list. An attacker who controls the command field can bypass this protection entirely.

Exploit scenario:

  1. Attacker provides custom_resource_config.command = "echo test; cat /etc/passwd #"
  2. After joining with args, shell_line becomes something like "echo test; cat /etc/passwd #"
  3. When subprocess.run() executes this with shell=True, the shell interprets the semicolon as a command separator
  4. Both echo test and cat /etc/passwd execute, leaking sensitive system files
  5. If the process runs with elevated privileges, the attacker can exfiltrate or modify sensitive data

The shell inherits environment variables and settings from the parent process, which further expands the attack surface. Using shell=False would treat the entire string as a single command name rather than allowing shell metacharacters to be interpreted.

To resolve this comment:

💡 Follow autofix suggestion

Suggested change
subprocess.run(shell_line, shell=True, check=True)
subprocess.run(shell_line, shell=False, check=True)
View step-by-step instructions
  1. Replace subprocess.run(shell_line, shell=True, check=True) with an invocation that does not use shell=True.
  2. Change the code to directly pass the command and arguments as a list, rather than joining into a string. Use: subprocess.run([resolved_command] + resolved_args, check=True).
  3. Remove any uses of shlex.quote when building the argument list, since quoting is only necessary when passing a string to the shell.
  4. Ensure that resolved_command and every item in resolved_args are unquoted strings representing the command and its arguments.

Passing the command and arguments as a list with shell=False (the default) is safer, because it avoids any interpretation by a shell, preventing command injection vulnerabilities.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by subprocess-shell-true.

You can view more details about this finding in the Semgrep AppSec Platform.

22 changes: 20 additions & 2 deletions gigl/src/common/types/pb_wrappers/gigl_resource_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from gigl.common.logger import Logger
from gigl.src.common.constants.components import GiGLComponents
from snapchat.research.gbml.gigl_resource_config_pb2 import (
CustomLauncherConfig,
DataflowResourceConfig,
DataPreprocessorConfig,
DistributedTrainerConfig,
Expand Down Expand Up @@ -37,12 +38,14 @@
_KFP_TRAINER_CONFIG = "kfp_trainer_config"
_LOCAL_TRAINER_CONFIG = "local_trainer_config"
_VERTEX_AI_GRAPH_STORE_TRAINER_CONFIG = "vertex_ai_graph_store_trainer_config"
_CUSTOM_TRAINER_CONFIG = "custom_trainer_config"

_INFERENCER_CONFIG_FIELD = "inferencer_config"
_VERTEX_AI_INFERENCER_CONFIG = "vertex_ai_inferencer_config"
_DATAFLOW_INFERENCER_CONFIG = "dataflow_inferencer_config"
_LOCAL_INFERENCER_CONFIG = "local_inferencer_config"
_VERTEX_AI_GRAPH_STORE_INFERENCER_CONFIG = "vertex_ai_graph_store_inferencer_config"
_CUSTOM_INFERENCER_CONFIG = "custom_inferencer_config"


@dataclass
Expand All @@ -55,6 +58,7 @@ class GiglResourceConfigWrapper:
KFPResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]
] = None
_inference_config: Optional[
Expand All @@ -63,6 +67,7 @@ class GiglResourceConfigWrapper:
VertexAiResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]
] = None

Expand Down Expand Up @@ -283,9 +288,10 @@ def trainer_config(
KFPResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]:
"""
Returns the trainer config specified in the resource config. (e.g. Vertex AI, KFP, Local)
Returns the trainer config specified in the resource config. (e.g. Vertex AI, KFP, Local, Custom)
"""

if not self._trainer_config:
Expand All @@ -305,6 +311,7 @@ def trainer_config(
KFPResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]
if (
deprecated_config.WhichOneof(_TRAINER_CONFIG_FIELD) # type: ignore[arg-type]
Expand Down Expand Up @@ -365,6 +372,11 @@ def trainer_config(
== _VERTEX_AI_GRAPH_STORE_TRAINER_CONFIG
):
_trainer_config = config.vertex_ai_graph_store_trainer_config
elif (
config.WhichOneof(_TRAINER_CONFIG_FIELD) # type: ignore[arg-type]
== _CUSTOM_TRAINER_CONFIG
):
_trainer_config = config.custom_trainer_config
else:
raise ValueError(f"Invalid trainer_config type: {config}")
else:
Expand All @@ -383,9 +395,10 @@ def inferencer_config(
VertexAiResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]:
"""
Returns the inferencer config specified in the resource config. (Dataflow)
Returns the inferencer config specified in the resource config. (e.g. Dataflow, Vertex AI, Local, Custom)
"""
if self._inference_config is None:
# TODO: (svij) Marked for deprecation
Expand Down Expand Up @@ -421,6 +434,11 @@ def inferencer_config(
self._inference_config = (
config.vertex_ai_graph_store_inferencer_config
)
elif (
config.WhichOneof(_INFERENCER_CONFIG_FIELD) # type: ignore[arg-type]
== _CUSTOM_INFERENCER_CONFIG
):
self._inference_config = config.custom_inferencer_config
else:
raise ValueError("Invalid inferencer_config type")
else:
Expand Down
17 changes: 17 additions & 0 deletions gigl/src/validation_check/config_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
assert_trained_model_exists,
)
from gigl.src.validation_check.libs.gbml_and_resource_config_compatibility_checks import (
check_custom_launcher_config_requires_glt_backend,
check_inferencer_graph_store_compatibility,
check_trainer_graph_store_compatibility,
)
from gigl.src.validation_check.libs.name_checks import (
check_if_kfp_pipeline_job_name_valid,
)
from gigl.src.validation_check.libs.resource_config_checks import (
check_custom_launcher_config_shape,
check_if_inferencer_resource_config_valid,
check_if_preprocessor_resource_config_valid,
check_if_shared_resource_config_valid,
Expand Down Expand Up @@ -202,25 +204,31 @@
GiGLComponents.ConfigPopulator.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
check_custom_launcher_config_requires_glt_backend,
],
GiGLComponents.DataPreprocessor.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
check_custom_launcher_config_requires_glt_backend,
],
GiGLComponents.SubgraphSampler.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
check_custom_launcher_config_requires_glt_backend,
],
GiGLComponents.SplitGenerator.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
check_custom_launcher_config_requires_glt_backend,
],
GiGLComponents.Trainer.value: [
check_trainer_graph_store_compatibility,
check_inferencer_graph_store_compatibility,
check_custom_launcher_config_requires_glt_backend,
],
GiGLComponents.Inferencer.value: [
check_inferencer_graph_store_compatibility,
check_custom_launcher_config_requires_glt_backend,
],
# PostProcessor doesn't need graph store compatibility checks
}
Expand Down Expand Up @@ -347,6 +355,15 @@ def kfp_validation_checks(
resource_config_wrapper=resource_config_wrapper,
)

# Validate any populated CustomLauncherConfig has a non-empty command.
# Unconditional — the check is shape-only and does not call out to any
# external service.
for component in (GiGLComponents.Trainer, GiGLComponents.Inferencer):
check_custom_launcher_config_shape(
resource_config_pb=resource_config_pb,
component=component,
)

# check if trained model file exist when skipping training
if gbml_config_pb.shared_config.should_skip_training == True:
assert_trained_model_exists(gbml_config_pb=gbml_config_pb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,63 @@ def check_inferencer_graph_store_compatibility(
raise AssertionError(
f"If one of GbmlConfig.inferencer_config.graph_store_storage_config or GiglResourceConfig.inferencer_resource_config is set, the other must also be set. GbmlConfig.inferencer_config.graph_store_storage_config is set: {gbml_has_graph_store}, GiglResourceConfig.inferencer_resource_config is set: {resource_has_graph_store}."
)


def check_custom_launcher_config_requires_glt_backend(
gbml_config_pb_wrapper: GbmlConfigPbWrapper,
resource_config_wrapper: GiglResourceConfigWrapper,
) -> None:
"""Enforce that ``CustomLauncherConfig`` is only used with the GLT (v2) backend.

The v1 trainer/inferencer dispatchers never consult the
``custom_trainer_config`` / ``custom_inferencer_config`` oneof, so pairing
a ``CustomLauncherConfig`` with a task config that has
``should_use_glt_backend=False`` would silently fall through the v1 path
and fail at runtime. Catch it up-front here so the failure is loud and
actionable at validation time.

Note on naming: the wrapper exposes ``should_use_glt_backend`` (bool) but
the raw YAML key users set is ``feature_flags.should_run_glt_backend``.
The wrapper translates one into the other; this check always reads the
wrapper property and never the raw map.

Args:
gbml_config_pb_wrapper: The GbmlConfig wrapper (template config).
resource_config_wrapper: The GiglResourceConfig wrapper (resource config).

Raises:
ValueError: If either the trainer or inferencer resource config is a
``CustomLauncherConfig`` and ``should_use_glt_backend`` is False.
"""
logger.info(
"Config validation check: CustomLauncherConfig requires GLT (v2) backend."
)
trainer_is_custom = isinstance(
resource_config_wrapper.trainer_config,
gigl_resource_config_pb2.CustomLauncherConfig,
)
inferencer_is_custom = isinstance(
resource_config_wrapper.inferencer_config,
gigl_resource_config_pb2.CustomLauncherConfig,
)
if not (trainer_is_custom or inferencer_is_custom):
return

if not gbml_config_pb_wrapper.should_use_glt_backend:
offending: list[str] = []
if trainer_is_custom:
offending.append("trainer_resource_config.custom_trainer_config")
if inferencer_is_custom:
offending.append("inferencer_resource_config.custom_inferencer_config")
raise ValueError(
"CustomLauncherConfig is only wired into the GLT (v2) dispatchers "
"(glt_trainer.py / glt_inferencer.py); the v1 trainer/inferencer "
"never consult the custom oneof and would fall through to an "
"'Unsupported resource config' error at runtime. The following "
f"custom resource configs were set: {offending}, but the task "
"config has should_use_glt_backend=False (raw YAML key: "
"feature_flags.should_run_glt_backend). Either set "
"feature_flags.should_run_glt_backend='True' in the task config, "
"or replace the CustomLauncherConfig with a built-in resource "
"config."
)
Loading