Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2b69b8f
Add `CustomResourceConfig` proto for shell-command launchers
May 6, 2026
ddbafc2
Merge branch 'main' into kmonte/custom-resource-config-pr1-proto
kmontemayor2-sc May 7, 2026
0425547
Skip machine-config validation for CustomResourceConfig
May 8, 2026
066c473
Add `launch_custom` subprocess dispatcher for CustomResourceConfig
May 6, 2026
5742480
Validate `CustomResourceConfig` shape + GLT-backend compatibility
May 6, 2026
917f11b
Dispatch `CustomResourceConfig` through the v2 GLT trainer + inferencer
May 6, 2026
bbdea89
Inject GIGL_* env vars on launch_custom subprocess dispatch
May 7, 2026
67a6584
Skip enumeration ThreadPoolExecutor on empty input
Apr 30, 2026
423b0d0
Skip Dataflow ThreadPoolExecutor on empty preprocessing spec
May 1, 2026
b606b28
Merge branch 'main' into kmonte/custom-resource-config-pr1-proto
kmontemayor2-sc May 11, 2026
844c434
Update docstring
kmonte May 12, 2026
937276f
Merge branch 'kmonte/custom-resource-config-pr1-proto' of ssh://githu…
kmonte May 12, 2026
d0fb63f
Rename CustomResourceConfig -> CustomLauncherConfig
May 12, 2026
a3b8e82
Merge branch 'kmonte/custom-resource-config-pr1-proto' into kmonte/cu…
May 12, 2026
f23fb44
Apply CustomResourceConfig -> CustomLauncherConfig rename to launcher
May 12, 2026
4d583d4
Merge branch 'kmonte/custom-resource-config-pr2-launcher' into kmonte…
May 12, 2026
009a7af
Apply CustomResourceConfig -> CustomLauncherConfig rename to validators
May 12, 2026
24ba3c6
Merge branch 'kmonte/custom-resource-config-pr3-validation' into kmon…
May 12, 2026
9045cb0
Apply CustomResourceConfig -> CustomLauncherConfig rename to v2 wiring
May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions gigl/src/common/custom_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Subprocess dispatch for ``CustomLauncherConfig``-backed launchers.

Takes ``CustomLauncherConfig.command`` and ``CustomLauncherConfig.args``
verbatim and shells out via ``subprocess.run(shell_line, shell=True)``.
The shell-style invocation honors leading ``KEY=VALUE`` env-var
assignments in ``command`` so callers can self-document required env
without forcing the dispatcher to parse env separately.

Before invoking the subprocess, the dispatcher composes a per-call
``env`` dict from ``os.environ.copy()`` plus the launcher-managed
``GIGL_*`` keys below, and passes it via ``subprocess.run(..., env=...)``.
The parent process's ``os.environ`` is **not** mutated, so concurrent
``launch_custom`` calls in the same process do not race or leak
context across launches.

Env-var contract (set on every call):

- ``GIGL_TASK_CONFIG_URI`` — ``str(task_config_uri)``
- ``GIGL_RESOURCE_CONFIG_URI`` — ``str(resource_config_uri)``
- ``GIGL_COMPONENT`` — ``component.name`` (``"Trainer"`` / ``"Inferencer"``)
- ``GIGL_APPLIED_TASK_IDENTIFIER`` — ``str(applied_task_identifier)``
- ``GIGL_CUDA_DOCKER_IMAGE`` — ``cuda_docker_uri or ""``
- ``GIGL_CPU_DOCKER_IMAGE`` — ``cpu_docker_uri or ""``

``GIGL_*`` is a reserved prefix for launcher-managed context. Custom-
launcher YAML authors must not collide with these names: a leading
``KEY=VALUE python ...`` shell prefix on ``command`` overrides the
inherited env *for that command*, so a stray ``GIGL_FOO=...`` prefix
would silently shadow what the dispatcher set.

The receiving subprocess is otherwise a plain CLI that argparses
whatever flags the YAML wires up via ``args[]``, plus reads any
``GIGL_*`` keys it cares about from ``os.environ``.
"""

import os
import shlex
import subprocess
from collections.abc import Mapping
from typing import Optional

from gigl.common import Uri
from gigl.common.logger import Logger
from gigl.src.common.constants.components import GiGLComponents
from snapchat.research.gbml.gigl_resource_config_pb2 import CustomLauncherConfig

logger = Logger()

_LAUNCHABLE_COMPONENTS: frozenset[GiGLComponents] = frozenset(
{GiGLComponents.Trainer, GiGLComponents.Inferencer}
)


def launch_custom(
custom_launcher_config: CustomLauncherConfig,
applied_task_identifier: str,
task_config_uri: Uri,
resource_config_uri: Uri,
process_command: str,
process_runtime_args: Mapping[str, str],
cpu_docker_uri: Optional[str],
cuda_docker_uri: Optional[str],
component: GiGLComponents,
) -> None:
"""Shell out to ``custom_launcher_config.command`` with ``args[]`` appended.

Composes a shell line as ``command`` followed by each ``args[]``
element passed through ``shlex.quote``, then invokes
``subprocess.run(shell_line, shell=True, check=True, env=...)``.

The dispatcher takes ``command`` and ``args[]`` verbatim — no
template substitution of any kind. Any placeholder text in those
fields reaches ``subprocess.run`` literally; consumers that want
runtime context should read it from the ``GIGL_*`` env vars the
dispatcher injects on the subprocess (see module docstring).

The subprocess's env is built as ``os.environ.copy()`` plus the
six launcher-managed ``GIGL_*`` keys; the parent process's
``os.environ`` is not mutated.

Args:
custom_launcher_config: Proto whose ``command`` is the shell
snippet to execute and whose ``args`` are positional
arguments appended verbatim.
applied_task_identifier: Stable job identifier; exposed to the
subprocess as ``GIGL_APPLIED_TASK_IDENTIFIER``.
task_config_uri: Frozen GbmlConfig URI; exposed as
``GIGL_TASK_CONFIG_URI``.
resource_config_uri: GiglResourceConfig URI; exposed as
``GIGL_RESOURCE_CONFIG_URI``.
process_command: Accepted for API symmetry with the GLT-side
Vertex AI launchers; not plumbed to the subprocess.
process_runtime_args: Accepted for API symmetry; not plumbed.
cpu_docker_uri: CPU image URI; exposed as
``GIGL_CPU_DOCKER_IMAGE`` (empty string when ``None``).
cuda_docker_uri: CUDA image URI; exposed as
``GIGL_CUDA_DOCKER_IMAGE`` (empty string when ``None``).
component: Which GiGL component is being launched. Must be in
``_LAUNCHABLE_COMPONENTS``. Exposed as ``GIGL_COMPONENT``.

Raises:
ValueError: If ``component`` is not Trainer or Inferencer, or if
``custom_launcher_config.command`` is empty.
subprocess.CalledProcessError: If the spawned subprocess exits
non-zero.
"""
if component not in _LAUNCHABLE_COMPONENTS:
raise ValueError(f"Invalid component: {component}")
if not custom_launcher_config.command:
raise ValueError("CustomLauncherConfig.command must be set")

command: str = custom_launcher_config.command
args: list[str] = list(custom_launcher_config.args)

shell_line = " ".join([command, *(shlex.quote(a) for a in args)])

# Per-call env dict: copy the parent env so the subprocess inherits
# PATH / GCP creds / etc., then overlay the launcher-managed
# GIGL_* keys. Parent ``os.environ`` is not mutated.
env = os.environ.copy()
env.update(
{
"GIGL_TASK_CONFIG_URI": str(task_config_uri),
"GIGL_RESOURCE_CONFIG_URI": str(resource_config_uri),
"GIGL_COMPONENT": component.name,
"GIGL_APPLIED_TASK_IDENTIFIER": str(applied_task_identifier),
"GIGL_CUDA_DOCKER_IMAGE": cuda_docker_uri or "",
"GIGL_CPU_DOCKER_IMAGE": cpu_docker_uri or "",
}
)

logger.info(f"Launching {component.name} via subprocess: {shell_line!r}")
subprocess.run(shell_line, shell=True, check=True, env=env)
22 changes: 20 additions & 2 deletions gigl/src/common/types/pb_wrappers/gigl_resource_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from gigl.common.logger import Logger
from gigl.src.common.constants.components import GiGLComponents
from snapchat.research.gbml.gigl_resource_config_pb2 import (
CustomLauncherConfig,
DataflowResourceConfig,
DataPreprocessorConfig,
DistributedTrainerConfig,
Expand Down Expand Up @@ -37,12 +38,14 @@
_KFP_TRAINER_CONFIG = "kfp_trainer_config"
_LOCAL_TRAINER_CONFIG = "local_trainer_config"
_VERTEX_AI_GRAPH_STORE_TRAINER_CONFIG = "vertex_ai_graph_store_trainer_config"
_CUSTOM_TRAINER_CONFIG = "custom_trainer_config"

_INFERENCER_CONFIG_FIELD = "inferencer_config"
_VERTEX_AI_INFERENCER_CONFIG = "vertex_ai_inferencer_config"
_DATAFLOW_INFERENCER_CONFIG = "dataflow_inferencer_config"
_LOCAL_INFERENCER_CONFIG = "local_inferencer_config"
_VERTEX_AI_GRAPH_STORE_INFERENCER_CONFIG = "vertex_ai_graph_store_inferencer_config"
_CUSTOM_INFERENCER_CONFIG = "custom_inferencer_config"


@dataclass
Expand All @@ -55,6 +58,7 @@ class GiglResourceConfigWrapper:
KFPResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]
] = None
_inference_config: Optional[
Expand All @@ -63,6 +67,7 @@ class GiglResourceConfigWrapper:
VertexAiResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]
] = None

Expand Down Expand Up @@ -283,9 +288,10 @@ def trainer_config(
KFPResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]:
"""
Returns the trainer config specified in the resource config. (e.g. Vertex AI, KFP, Local)
Returns the trainer config specified in the resource config. (e.g. Vertex AI, KFP, Local, Custom)
"""

if not self._trainer_config:
Expand All @@ -305,6 +311,7 @@ def trainer_config(
KFPResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]
if (
deprecated_config.WhichOneof(_TRAINER_CONFIG_FIELD) # type: ignore[arg-type]
Expand Down Expand Up @@ -365,6 +372,11 @@ def trainer_config(
== _VERTEX_AI_GRAPH_STORE_TRAINER_CONFIG
):
_trainer_config = config.vertex_ai_graph_store_trainer_config
elif (
config.WhichOneof(_TRAINER_CONFIG_FIELD) # type: ignore[arg-type]
== _CUSTOM_TRAINER_CONFIG
):
_trainer_config = config.custom_trainer_config
else:
raise ValueError(f"Invalid trainer_config type: {config}")
else:
Expand All @@ -383,9 +395,10 @@ def inferencer_config(
VertexAiResourceConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
CustomLauncherConfig,
]:
"""
Returns the inferencer config specified in the resource config. (Dataflow)
Returns the inferencer config specified in the resource config. (e.g. Dataflow, Vertex AI, Local, Custom)
"""
if self._inference_config is None:
# TODO: (svij) Marked for deprecation
Expand Down Expand Up @@ -421,6 +434,11 @@ def inferencer_config(
self._inference_config = (
config.vertex_ai_graph_store_inferencer_config
)
elif (
config.WhichOneof(_INFERENCER_CONFIG_FIELD) # type: ignore[arg-type]
== _CUSTOM_INFERENCER_CONFIG
):
self._inference_config = config.custom_inferencer_config
else:
raise ValueError("Invalid inferencer_config type")
else:
Expand Down
6 changes: 6 additions & 0 deletions gigl/src/data_preprocessor/data_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ def __build_data_reference_str(references: Iterable[DataReference]) -> str:
edge_ref_to_preprocessing_spec
)

if num_dataflow_jobs == 0:
logger.info("No data references to preprocess; skipping Dataflow.")
return PreprocessedMetadataReferences(
node_data=node_refs_and_results, edge_data=edge_refs_and_results
)

with concurrent.futures.ThreadPoolExecutor(
max_workers=num_dataflow_jobs
) as executor:
Expand Down
8 changes: 8 additions & 0 deletions gigl/src/data_preprocessor/lib/enumerate/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ def __enumerate_all_node_references(
) -> list[EnumeratorNodeTypeMetadata]:
results: list[EnumeratorNodeTypeMetadata] = []

if not node_data_references:
logger.info("No node references to enumerate; skipping.")
return results

logger.info(
f"Launch {len(node_data_references)} node enumeration jobs in parallel."
)
Expand Down Expand Up @@ -274,6 +278,10 @@ def __enumerate_all_edge_references(
) -> list[EnumeratorEdgeTypeMetadata]:
results: list[EnumeratorEdgeTypeMetadata] = []

if not edge_data_references:
logger.info("No edge references to enumerate; skipping.")
return results

logger.info(
f"Launch {len(edge_data_references)} edge enumeration jobs in parallel."
)
Expand Down
30 changes: 26 additions & 4 deletions gigl/src/inference/v2/glt_inferencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from gigl.common.logger import Logger
from gigl.env.pipelines_config import get_resource_config
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.custom_launcher import launch_custom
from gigl.src.common.types import AppliedTaskIdentifier
from gigl.src.common.types.pb_wrappers.gbml_config import GbmlConfigPbWrapper
from gigl.src.common.types.pb_wrappers.gigl_resource_config import (
Expand All @@ -16,6 +17,7 @@
launch_single_pool_job,
)
from snapchat.research.gbml.gigl_resource_config_pb2 import (
CustomLauncherConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
VertexAiResourceConfig,
Expand Down Expand Up @@ -90,6 +92,20 @@ def __execute_VAI_inference(
cuda_docker_uri=cuda_docker_uri,
component=GiGLComponents.Inferencer,
)
elif isinstance(
resource_config_wrapper.inferencer_config, CustomLauncherConfig
):
launch_custom(
custom_launcher_config=resource_config_wrapper.inferencer_config,
applied_task_identifier=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
process_command=inference_process_command,
process_runtime_args=inference_process_runtime_args,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
component=GiGLComponents.Inferencer,
)
else:
raise NotImplementedError(
f"Unsupported resource config for glt inference: {type(resource_config_wrapper.inferencer_config).__name__}"
Expand All @@ -112,10 +128,16 @@ def run(
raise NotImplementedError(
f"Local GLT Inferencer is not yet supported, please specify a {VertexAiResourceConfig.__name__} or {VertexAiGraphStoreConfig.__name__} resource config field."
)
elif isinstance(
resource_config_wrapper.inferencer_config, VertexAiResourceConfig
) or isinstance(
resource_config_wrapper.inferencer_config, VertexAiGraphStoreConfig
elif (
isinstance(
resource_config_wrapper.inferencer_config, VertexAiResourceConfig
)
or isinstance(
resource_config_wrapper.inferencer_config, VertexAiGraphStoreConfig
)
or isinstance(
resource_config_wrapper.inferencer_config, CustomLauncherConfig
)
):
self.__execute_VAI_inference(
applied_task_identifier=applied_task_identifier,
Expand Down
20 changes: 18 additions & 2 deletions gigl/src/training/v2/glt_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from gigl.common.logger import Logger
from gigl.env.pipelines_config import get_resource_config
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.custom_launcher import launch_custom
from gigl.src.common.types import AppliedTaskIdentifier
from gigl.src.common.types.pb_wrappers.gbml_config import GbmlConfigPbWrapper
from gigl.src.common.types.pb_wrappers.gigl_resource_config import (
Expand All @@ -16,6 +17,7 @@
launch_single_pool_job,
)
from snapchat.research.gbml.gigl_resource_config_pb2 import (
CustomLauncherConfig,
LocalResourceConfig,
VertexAiGraphStoreConfig,
VertexAiResourceConfig,
Expand Down Expand Up @@ -86,6 +88,18 @@ def __execute_VAI_training(
cuda_docker_uri=cuda_docker_uri,
component=GiGLComponents.Trainer,
)
elif isinstance(resource_config.trainer_config, CustomLauncherConfig):
launch_custom(
custom_launcher_config=resource_config.trainer_config,
applied_task_identifier=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
process_command=training_process_command,
process_runtime_args=training_process_runtime_args,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
component=GiGLComponents.Trainer,
)
else:
raise NotImplementedError(
f"Unsupported resource config for glt training: {type(resource_config.trainer_config).__name__}"
Expand All @@ -110,8 +124,10 @@ def run(
raise NotImplementedError(
f"Local GLT Trainer is not yet supported, please specify a {VertexAiResourceConfig.__name__} or {VertexAiGraphStoreConfig.__name__} resource config field."
)
elif isinstance(trainer_config, VertexAiResourceConfig) or isinstance(
trainer_config, VertexAiGraphStoreConfig
elif (
isinstance(trainer_config, VertexAiResourceConfig)
or isinstance(trainer_config, VertexAiGraphStoreConfig)
or isinstance(trainer_config, CustomLauncherConfig)
):
self.__execute_VAI_training(
applied_task_identifier=applied_task_identifier,
Expand Down
Loading