From ba1698ee964333210e1c452f05da598522899b70 Mon Sep 17 00:00:00 2001 From: xyuzh Date: Tue, 10 Feb 2026 19:02:19 -0800 Subject: [PATCH 1/5] Add SGLang Ray inference example Add offline and online inference drivers with Dockerfile and Anyscale job configs for running SGLang on Ray. --- sglang_ray_inference/Dockerfile | 61 +++++++++++++ sglang_ray_inference/driver_offline.py | 102 ++++++++++++++++++++++ sglang_ray_inference/driver_online.py | 116 +++++++++++++++++++++++++ sglang_ray_inference/job_offline.yaml | 37 ++++++++ sglang_ray_inference/job_online.yaml | 41 +++++++++ 5 files changed, 357 insertions(+) create mode 100644 sglang_ray_inference/Dockerfile create mode 100644 sglang_ray_inference/driver_offline.py create mode 100644 sglang_ray_inference/driver_online.py create mode 100644 sglang_ray_inference/job_offline.yaml create mode 100644 sglang_ray_inference/job_online.yaml diff --git a/sglang_ray_inference/Dockerfile b/sglang_ray_inference/Dockerfile new file mode 100644 index 0000000..c787649 --- /dev/null +++ b/sglang_ray_inference/Dockerfile @@ -0,0 +1,61 @@ +FROM anyscale/ray:2.53.0-py312-cu129 + +# ============================================================================= +# System Dependencies +# ============================================================================= +RUN sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends \ + build-essential \ + cmake \ + ninja-build \ + libnuma1 \ + libnuma-dev \ + numactl \ + git \ + curl \ + wget \ + netcat \ + && sudo rm -rf /var/lib/apt/lists/* + +# ============================================================================= +# CUDA Toolkit (nvcc compiler) - CUDA 12.9 to match base image +# ============================================================================= +RUN curl -fsSL https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-keyring_1.1-1_all.deb -o /tmp/cuda-keyring.deb && \ + sudo dpkg -i /tmp/cuda-keyring.deb && \ + rm /tmp/cuda-keyring.deb && \ + sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends \ + cuda-nvcc-12-9 \ + cuda-cudart-dev-12-9 \ + cuda-crt-12-9 \ + && sudo rm -rf /var/lib/apt/lists/* + +# Create/update CUDA symlink +RUN sudo rm -rf /usr/local/cuda && \ + sudo ln -s /usr/local/cuda-12.9 /usr/local/cuda + +# ============================================================================= +# Environment Variables +# ============================================================================= +ENV PATH="/usr/local/cuda/bin:${PATH}" +ENV CUDA_HOME="/usr/local/cuda" +ENV LD_LIBRARY_PATH="/usr/local/cuda/lib64:${LD_LIBRARY_PATH}" +ENV RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES="1" + +# ============================================================================= +# Python Dependencies +# ============================================================================= +RUN curl -LsSf https://astral.sh/uv/install.sh | sh +ENV PATH="/home/ray/.local/bin:${PATH}" + +RUN uv pip install --system \ + "sglang[all] @ git+https://github.com/xyuzh/sglang.git@feature/ray-actor-scheduler#subdirectory=python" \ + numpy \ + transformers \ + accelerate \ + huggingface_hub \ + requests \ + httpx + + +WORKDIR /home/ray/default diff --git a/sglang_ray_inference/driver_offline.py b/sglang_ray_inference/driver_offline.py new file mode 100644 index 0000000..38d4cc0 --- /dev/null +++ b/sglang_ray_inference/driver_offline.py @@ -0,0 +1,102 @@ +""" +Offline (batch) inference with SGLang on Ray. + +Wraps sglang.Engine in a Ray actor for multi-node batch generation. +The driver (head node) needs no GPU — sglang is imported only inside the actor. + +Usage: + python driver_offline.py --model-path Qwen/Qwen3-1.7B --tp-size 4 --nnodes 1 +""" + +import argparse +import time + +import ray +from ray.util.placement_group import placement_group +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + + +@ray.remote +class EngineActor: + """Thin wrapper that creates an sglang.Engine inside a Ray actor.""" + + def __init__(self, **kwargs): + from sglang import Engine + + self.engine = Engine(**kwargs) + + def ready(self): + return True + + def generate(self, prompts, sampling_params): + return [ + self.engine.generate(prompt=p, sampling_params=sampling_params) + for p in prompts + ] + + def shutdown(self): + self.engine.shutdown() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--model-path", type=str, default="Qwen/Qwen3-1.7B") + parser.add_argument("--tp-size", type=int, default=4) + parser.add_argument("--pp-size", type=int, default=1) + parser.add_argument("--nnodes", type=int, default=1) + parser.add_argument("--port", type=int, default=30000) + args = parser.parse_args() + + gpus_per_node = (args.tp_size * args.pp_size) // args.nnodes + + # Reserve GPUs across nodes + pg = placement_group( + bundles=[{"CPU": 1, "GPU": gpus_per_node}] * args.nnodes, + strategy="STRICT_PACK" if args.nnodes == 1 else "STRICT_SPREAD", + ) + ray.get(pg.ready()) + + # Start engine actor on the first bundle + engine = EngineActor.options( + num_cpus=1, + num_gpus=0, + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_bundle_index=0, + ), + ).remote( + model_path=args.model_path, + tp_size=args.tp_size, + pp_size=args.pp_size, + nnodes=args.nnodes, + port=args.port, + use_ray=True, + ) + + ray.get(engine.ready.remote()) + print("Engine ready.") + + # Batch generate + prompts = [ + "The capital of France is", + "Explain quantum computing in simple terms:", + "Write a haiku about programming:", + "What is 2 + 2?", + ] + + t0 = time.time() + results = ray.get( + engine.generate.remote(prompts, {"max_new_tokens": 64, "temperature": 0.0}) + ) + print(f"Generated {len(results)} responses in {time.time() - t0:.2f}s\n") + + for prompt, result in zip(prompts, results): + print(f"Prompt: {prompt}") + print(f"Response: {result['text'][:200]}\n") + + # Cleanup + ray.get(engine.shutdown.remote()) + ray.util.remove_placement_group(pg) + + +if __name__ == "__main__": + main() diff --git a/sglang_ray_inference/driver_online.py b/sglang_ray_inference/driver_online.py new file mode 100644 index 0000000..6cf27ac --- /dev/null +++ b/sglang_ray_inference/driver_online.py @@ -0,0 +1,116 @@ +""" +Online (HTTP server) inference with SGLang on Ray. + +Launches the SGLang HTTP server inside a Ray task for multi-node serving. +The driver (head node) needs no GPU — sglang is imported only inside the task. + +Usage: + python driver_online.py --model-path Qwen/Qwen3-1.7B --tp-size 4 --nnodes 1 +""" + +import argparse +import time + +import ray +import requests +from ray.util.placement_group import placement_group +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + + +@ray.remote +def get_node_ip(): + """Return the IP of the node this task lands on.""" + return ray.util.get_node_ip_address() + + +@ray.remote +def launch_server(**kwargs): + """Start the SGLang HTTP server (blocks until the server exits).""" + from sglang.srt.entrypoints.http_server import launch_server + from sglang.srt.server_args import ServerArgs + + launch_server(ServerArgs(**kwargs)) + + +def wait_for_healthy(url, server_ref, timeout=600, poll=5): + """Poll the health endpoint until the server is ready or timeout.""" + t0 = time.time() + while time.time() - t0 < timeout: + # If the task already finished, it crashed + done, _ = ray.wait([server_ref], timeout=0) + if done: + ray.get(server_ref) # raises on error + raise RuntimeError("Server exited before becoming healthy.") + try: + if requests.get(f"{url}/health", timeout=5).status_code == 200: + return + except requests.RequestException: + pass + time.sleep(poll) + raise TimeoutError(f"Server not healthy after {timeout}s.") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--model-path", type=str, default="Qwen/Qwen3-1.7B") + parser.add_argument("--tp-size", type=int, default=4) + parser.add_argument("--pp-size", type=int, default=1) + parser.add_argument("--nnodes", type=int, default=1) + parser.add_argument("--port", type=int, default=30000) + args = parser.parse_args() + + gpus_per_node = (args.tp_size * args.pp_size) // args.nnodes + + # Reserve GPUs across nodes + pg = placement_group( + bundles=[{"CPU": 1, "GPU": gpus_per_node}] * args.nnodes, + strategy="STRICT_PACK" if args.nnodes == 1 else "STRICT_SPREAD", + ) + ray.get(pg.ready()) + + pg_strategy = PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_bundle_index=0, + ) + + # Resolve the IP of the node where the server will run + node_ip = ray.get( + get_node_ip.options(num_cpus=0, scheduling_strategy=pg_strategy).remote() + ) + url = f"http://{node_ip}:{args.port}" + print(f"Server URL: {url}") + + # Launch the HTTP server (runs until cancelled) + server_ref = launch_server.options( + num_cpus=1, num_gpus=0, scheduling_strategy=pg_strategy, + ).remote( + model_path=args.model_path, + tp_size=args.tp_size, + pp_size=args.pp_size, + nnodes=args.nnodes, + port=args.port, + host="0.0.0.0", + use_ray=True, + ) + + wait_for_healthy(url, server_ref) + print("Server healthy.") + + # Test request + resp = requests.post( + f"{url}/generate", + json={ + "text": "The capital of France is", + "sampling_params": {"max_new_tokens": 32, "temperature": 0.0}, + }, + timeout=60, + ) + resp.raise_for_status() + print(f"Test response: {resp.json()}") + + # Cleanup + ray.cancel(server_ref, force=True) + ray.util.remove_placement_group(pg) + + +if __name__ == "__main__": + main() diff --git a/sglang_ray_inference/job_offline.yaml b/sglang_ray_inference/job_offline.yaml new file mode 100644 index 0000000..fc7b305 --- /dev/null +++ b/sglang_ray_inference/job_offline.yaml @@ -0,0 +1,37 @@ +# Anyscale Job: SGLang Offline (Batch) Inference with Ray Actor Backend +# +# Configuration: TP=8 across 2 nodes (4 GPUs per node) +# Submit: anyscale job submit -f job_offline.yaml + +name: sglang-offline-inference + +cloud: + +compute_config: + head_node: + instance_type: m5.2xlarge # CPU-only head + worker_nodes: + - instance_type: g5.12xlarge # 4x A10G + min_nodes: 2 + max_nodes: 2 + +containerfile: ./Dockerfile + +working_dir: . + +entrypoint: | + set -e + + # Run offline inference + echo "Running offline inference..." + echo "==========================================" + python driver_offline.py \ + --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 + +env_vars: + NCCL_IB_DISABLE: "0" + NCCL_NET_GDR_LEVEL: "2" + RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "1" + SGLANG_SKIP_SGL_KERNEL_VERSION_CHECK: "1" + +max_retries: 0 diff --git a/sglang_ray_inference/job_online.yaml b/sglang_ray_inference/job_online.yaml new file mode 100644 index 0000000..ac8bcab --- /dev/null +++ b/sglang_ray_inference/job_online.yaml @@ -0,0 +1,41 @@ +# Anyscale Job: SGLang Online (HTTP Server) Inference with Ray Actor Backend +# +# Configuration: TP=8 across 2 nodes (4 GPUs per node) +# Submit: anyscale job submit -f examples/anyscale/job_online.yaml + +name: sglang-online-inference + +cloud: + +compute_config: + head_node: + instance_type: m5.2xlarge # CPU-only head + worker_nodes: + - instance_type: g5.12xlarge # 4x A10G + min_nodes: 2 + max_nodes: 2 + +containerfile: ./Dockerfile + +working_dir: . + +entrypoint: | + set -e + + echo "==========================================" + echo "Running online inference..." + echo "==========================================" + + # Start online HTTP server + echo "Starting online server..." + echo "==========================================" + python driver_online.py \ + --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 + +env_vars: + NCCL_IB_DISABLE: "0" + NCCL_NET_GDR_LEVEL: "2" + RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "1" + SGLANG_SKIP_SGL_KERNEL_VERSION_CHECK: "1" + +max_retries: 0 From 4d0dad545298a95a22c632d78854d411f4aa0d88 Mon Sep 17 00:00:00 2001 From: xyuzh Date: Tue, 10 Feb 2026 19:03:43 -0800 Subject: [PATCH 2/5] Add README for SGLang Ray inference example --- sglang_ray_inference/README.md | 65 ++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 sglang_ray_inference/README.md diff --git a/sglang_ray_inference/README.md b/sglang_ray_inference/README.md new file mode 100644 index 0000000..f27c593 --- /dev/null +++ b/sglang_ray_inference/README.md @@ -0,0 +1,65 @@ +# SGLang Multi-Node Inference on Ray (Anyscale) + +This example shows how to run [SGLang](https://github.com/sgl-project/sglang) inference across multiple nodes using Ray as the distributed backend, deployed as Anyscale Jobs. + +Two modes are provided: + +| Mode | Driver | Description | +|------|--------|-------------| +| **Offline (batch)** | `driver_offline.py` | Wraps `sglang.Engine` in a Ray actor for batch generation | +| **Online (HTTP server)** | `driver_online.py` | Launches the SGLang HTTP server inside a Ray task for real-time serving | + +In both modes the driver runs on a CPU-only head node while SGLang workers are distributed across GPU worker nodes via Ray placement groups. + +## Files + +``` +sglang_ray_inference/ +├── Dockerfile # Container image (Ray 2.53 + CUDA 12.9 + SGLang) +├── driver_offline.py # Offline batch inference driver +├── driver_online.py # Online HTTP server driver +├── job_offline.yaml # Anyscale job config for offline mode +└── job_online.yaml # Anyscale job config for online mode +``` + +## Quick Start + +### Prerequisites + +- An [Anyscale](https://www.anyscale.com/) account with GPU quota +- The `anyscale` CLI installed and configured + +### Run Offline Inference + +```bash +cd sglang_ray_inference +anyscale job submit -f job_offline.yaml +``` + +### Run Online Inference + +```bash +cd sglang_ray_inference +anyscale job submit -f job_online.yaml +``` + +## Configuration + +Both job configs default to **TP=8 across 2 nodes** (4x A10G per node via `g5.12xlarge`). + +To change the model, tensor-parallelism degree, or node count, edit the `entrypoint` in the YAML file: + +```bash +python driver_offline.py \ + --model-path \ + --tp-size \ + --nnodes +``` + +| Argument | Default | Description | +|----------|---------|-------------| +| `--model-path` | `Qwen/Qwen3-1.7B` | HuggingFace model ID or path | +| `--tp-size` | `4` | Tensor parallelism degree (total GPUs) | +| `--pp-size` | `1` | Pipeline parallelism degree | +| `--nnodes` | `1` | Number of nodes to spread across | +| `--port` | `30000` | Port for the SGLang server (online mode) | From 8cc286ad9a056df4063650af530d8720a424a689 Mon Sep 17 00:00:00 2001 From: xyuzh Date: Sat, 21 Feb 2026 19:32:28 -0800 Subject: [PATCH 3/5] Update sglang_ray_inference to use released packages and improve robustness - Dockerfile: use sglang[all]==0.5.8 + sgl-kernel==0.3.21 instead of git fork - Drivers: add logging, named placement groups, exit codes, better error handling - Job configs: add NCCL_DEBUG, fix submit path comment - README: add How It Works, Troubleshooting, local run examples Co-Authored-By: Claude Opus 4.6 --- sglang_ray_inference/Dockerfile | 9 +- sglang_ray_inference/README.md | 73 ++++++++------ sglang_ray_inference/driver_offline.py | 94 +++++++++-------- sglang_ray_inference/driver_online.py | 134 +++++++++++++++---------- sglang_ray_inference/job_offline.yaml | 6 +- sglang_ray_inference/job_online.yaml | 8 +- 6 files changed, 188 insertions(+), 136 deletions(-) diff --git a/sglang_ray_inference/Dockerfile b/sglang_ray_inference/Dockerfile index c787649..a10cfcd 100644 --- a/sglang_ray_inference/Dockerfile +++ b/sglang_ray_inference/Dockerfile @@ -49,7 +49,7 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh ENV PATH="/home/ray/.local/bin:${PATH}" RUN uv pip install --system \ - "sglang[all] @ git+https://github.com/xyuzh/sglang.git@feature/ray-actor-scheduler#subdirectory=python" \ + "sglang[all]==0.5.8" \ numpy \ transformers \ accelerate \ @@ -57,5 +57,12 @@ RUN uv pip install --system \ requests \ httpx +RUN uv pip install --system "sgl-kernel==0.3.21" + +# ============================================================================= +# Verification +# ============================================================================= +RUN nvcc --version && \ + python -c "import sglang; print(f'SGLang version: {sglang.__version__}')" WORKDIR /home/ray/default diff --git a/sglang_ray_inference/README.md b/sglang_ray_inference/README.md index f27c593..85ceef1 100644 --- a/sglang_ray_inference/README.md +++ b/sglang_ray_inference/README.md @@ -1,26 +1,22 @@ -# SGLang Multi-Node Inference on Ray (Anyscale) +# Multi-Node SGLang on Anyscale with Ray Actor Backend -This example shows how to run [SGLang](https://github.com/sgl-project/sglang) inference across multiple nodes using Ray as the distributed backend, deployed as Anyscale Jobs. +Run SGLang across multiple GPU nodes on Anyscale using Ray placement groups. +The head node needs no GPU — sglang is imported only inside Ray actors. -Two modes are provided: +Two driver scripts: -| Mode | Driver | Description | -|------|--------|-------------| -| **Offline (batch)** | `driver_offline.py` | Wraps `sglang.Engine` in a Ray actor for batch generation | -| **Online (HTTP server)** | `driver_online.py` | Launches the SGLang HTTP server inside a Ray task for real-time serving | - -In both modes the driver runs on a CPU-only head node while SGLang workers are distributed across GPU worker nodes via Ray placement groups. +- **`driver_offline.py`** — batch inference via `sglang.Engine` +- **`driver_online.py`** — HTTP server via `sglang.srt.entrypoints.http_server` ## Files -``` -sglang_ray_inference/ -├── Dockerfile # Container image (Ray 2.53 + CUDA 12.9 + SGLang) -├── driver_offline.py # Offline batch inference driver -├── driver_online.py # Online HTTP server driver -├── job_offline.yaml # Anyscale job config for offline mode -└── job_online.yaml # Anyscale job config for online mode -``` +| File | Purpose | +|------|---------| +| `driver_offline.py` | Batch inference driver | +| `driver_online.py` | HTTP server driver | +| `job_offline.yaml` | Anyscale job config for batch inference | +| `job_online.yaml` | Anyscale job config for HTTP server | +| `Dockerfile` | Base image (`anyscale/ray` + sglang) | ## Quick Start @@ -43,23 +39,38 @@ cd sglang_ray_inference anyscale job submit -f job_online.yaml ``` -## Configuration - -Both job configs default to **TP=8 across 2 nodes** (4x A10G per node via `g5.12xlarge`). - -To change the model, tensor-parallelism degree, or node count, edit the `entrypoint` in the YAML file: +### Run Locally (single node, requires GPU) ```bash -python driver_offline.py \ - --model-path \ - --tp-size \ - --nnodes +python driver_offline.py --model-path Qwen/Qwen3-1.7B --tp-size 1 --nnodes 1 +python driver_online.py --model-path Qwen/Qwen3-1.7B --tp-size 1 --nnodes 1 ``` +## CLI Arguments + +Both scripts accept the same arguments: + | Argument | Default | Description | |----------|---------|-------------| -| `--model-path` | `Qwen/Qwen3-1.7B` | HuggingFace model ID or path | -| `--tp-size` | `4` | Tensor parallelism degree (total GPUs) | -| `--pp-size` | `1` | Pipeline parallelism degree | -| `--nnodes` | `1` | Number of nodes to spread across | -| `--port` | `30000` | Port for the SGLang server (online mode) | +| `--model-path` | `Qwen/Qwen3-1.7B` | HuggingFace model ID or local path | +| `--tp-size` | `4` | Tensor parallelism size | +| `--pp-size` | `1` | Pipeline parallelism size | +| `--nnodes` | `2` | Number of nodes | +| `--port` | `30000` | Server port | + +## How It Works + +1. **Placement group** — one bundle per node (`{"CPU": 1, "GPU": gpus_per_node}`). + `STRICT_PACK` for 1 node, `STRICT_SPREAD` for multi-node. +2. **Driver actor** — scheduled on bundle 0 with `num_cpus=1, num_gpus=0`. + Imports sglang inside the actor to avoid GPU-less head node issues. +3. **SchedulerActors** — created internally by sglang, each claiming `num_gpus=1` + from the correct node's bundle. + +## Troubleshooting + +- **Server timeout** — increase the 600s health-check timeout or check NCCL + connectivity (`NCCL_DEBUG=INFO`). +- **OOM** — reduce model size or use more GPUs. +- **Connection refused** — ensure security groups allow inter-node traffic on + the server port and NCCL ports. diff --git a/sglang_ray_inference/driver_offline.py b/sglang_ray_inference/driver_offline.py index 38d4cc0..9b5f0f5 100644 --- a/sglang_ray_inference/driver_offline.py +++ b/sglang_ray_inference/driver_offline.py @@ -1,14 +1,15 @@ """ -Offline (batch) inference with SGLang on Ray. +Offline (Batch) Inference with SGLang on Ray -Wraps sglang.Engine in a Ray actor for multi-node batch generation. -The driver (head node) needs no GPU — sglang is imported only inside the actor. +Runs sglang.Engine inside a Ray actor for batch generation. +The head node needs no GPU — sglang is imported only inside the actor. Usage: - python driver_offline.py --model-path Qwen/Qwen3-1.7B --tp-size 4 --nnodes 1 + python driver_offline.py --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 """ import argparse +import sys import time import ray @@ -16,47 +17,53 @@ from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -@ray.remote -class EngineActor: - """Thin wrapper that creates an sglang.Engine inside a Ray actor.""" - - def __init__(self, **kwargs): - from sglang import Engine - - self.engine = Engine(**kwargs) - - def ready(self): - return True - - def generate(self, prompts, sampling_params): - return [ - self.engine.generate(prompt=p, sampling_params=sampling_params) - for p in prompts - ] - - def shutdown(self): - self.engine.shutdown() - - def main(): - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(description="SGLang offline inference via Ray") parser.add_argument("--model-path", type=str, default="Qwen/Qwen3-1.7B") parser.add_argument("--tp-size", type=int, default=4) parser.add_argument("--pp-size", type=int, default=1) - parser.add_argument("--nnodes", type=int, default=1) + parser.add_argument("--nnodes", type=int, default=2) parser.add_argument("--port", type=int, default=30000) args = parser.parse_args() - gpus_per_node = (args.tp_size * args.pp_size) // args.nnodes + world_size = args.tp_size * args.pp_size + gpus_per_node = world_size // args.nnodes - # Reserve GPUs across nodes + # --- Ray init --- + print(f"Model={args.model_path} TP={args.tp_size} PP={args.pp_size} " + f"nodes={args.nnodes} GPUs/node={gpus_per_node}") + + # --- Placement group: one bundle per node --- + strategy = "STRICT_PACK" if args.nnodes == 1 else "STRICT_SPREAD" pg = placement_group( + name="engine_group", bundles=[{"CPU": 1, "GPU": gpus_per_node}] * args.nnodes, - strategy="STRICT_PACK" if args.nnodes == 1 else "STRICT_SPREAD", + strategy=strategy, ) ray.get(pg.ready()) + print("Placement group ready.") + + # --- Engine actor (sglang imported inside, not on head node) --- + @ray.remote + class EngineActor: + def __init__(self, **kwargs): + from sglang import Engine + self.engine = Engine(**kwargs) + + def is_ready(self): + return True + + def generate(self, prompts, sampling_params): + return [ + {"prompt": p, "text": self.engine.generate(prompt=p, sampling_params=sampling_params)["text"]} + for p in prompts + ] + + def shutdown(self): + if self.engine: + self.engine.shutdown() + self.engine = None - # Start engine actor on the first bundle engine = EngineActor.options( num_cpus=1, num_gpus=0, @@ -72,10 +79,11 @@ def main(): use_ray=True, ) - ray.get(engine.ready.remote()) - print("Engine ready.") + print("Waiting for engine...") + ray.get(engine.is_ready.remote()) + print("Engine ready.\n") - # Batch generate + # --- Generate --- prompts = [ "The capital of France is", "Explain quantum computing in simple terms:", @@ -84,19 +92,19 @@ def main(): ] t0 = time.time() - results = ray.get( - engine.generate.remote(prompts, {"max_new_tokens": 64, "temperature": 0.0}) - ) + results = ray.get(engine.generate.remote(prompts, {"max_new_tokens": 64, "temperature": 0.0})) print(f"Generated {len(results)} responses in {time.time() - t0:.2f}s\n") - for prompt, result in zip(prompts, results): - print(f"Prompt: {prompt}") - print(f"Response: {result['text'][:200]}\n") + for r in results: + print(f"Prompt: {r['prompt']}") + print(f"Response: {r['text'][:200]}") + print("-" * 60) - # Cleanup + # --- Cleanup --- ray.get(engine.shutdown.remote()) ray.util.remove_placement_group(pg) + print("\nDone.") if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/sglang_ray_inference/driver_online.py b/sglang_ray_inference/driver_online.py index 6cf27ac..1235bcc 100644 --- a/sglang_ray_inference/driver_online.py +++ b/sglang_ray_inference/driver_online.py @@ -1,14 +1,15 @@ """ -Online (HTTP server) inference with SGLang on Ray. +Online (HTTP Server) Inference with SGLang on Ray -Launches the SGLang HTTP server inside a Ray task for multi-node serving. -The driver (head node) needs no GPU — sglang is imported only inside the task. +Launches the SGLang HTTP server as a Ray remote function. +The head node needs no GPU — sglang is imported only inside the task. Usage: - python driver_online.py --model-path Qwen/Qwen3-1.7B --tp-size 4 --nnodes 1 + python driver_online.py --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 """ import argparse +import sys import time import ray @@ -18,70 +19,62 @@ @ray.remote -def get_node_ip(): - """Return the IP of the node this task lands on.""" +def _get_node_ip(): return ray.util.get_node_ip_address() @ray.remote -def launch_server(**kwargs): - """Start the SGLang HTTP server (blocks until the server exits).""" +def _launch_server(**server_kwargs): from sglang.srt.entrypoints.http_server import launch_server from sglang.srt.server_args import ServerArgs - launch_server(ServerArgs(**kwargs)) - - -def wait_for_healthy(url, server_ref, timeout=600, poll=5): - """Poll the health endpoint until the server is ready or timeout.""" - t0 = time.time() - while time.time() - t0 < timeout: - # If the task already finished, it crashed - done, _ = ray.wait([server_ref], timeout=0) - if done: - ray.get(server_ref) # raises on error - raise RuntimeError("Server exited before becoming healthy.") - try: - if requests.get(f"{url}/health", timeout=5).status_code == 200: - return - except requests.RequestException: - pass - time.sleep(poll) - raise TimeoutError(f"Server not healthy after {timeout}s.") + launch_server(ServerArgs(**server_kwargs)) def main(): - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(description="SGLang HTTP server via Ray") parser.add_argument("--model-path", type=str, default="Qwen/Qwen3-1.7B") parser.add_argument("--tp-size", type=int, default=4) parser.add_argument("--pp-size", type=int, default=1) - parser.add_argument("--nnodes", type=int, default=1) + parser.add_argument("--nnodes", type=int, default=2) parser.add_argument("--port", type=int, default=30000) args = parser.parse_args() - gpus_per_node = (args.tp_size * args.pp_size) // args.nnodes + world_size = args.tp_size * args.pp_size + gpus_per_node = world_size // args.nnodes + + print(f"Model={args.model_path} TP={args.tp_size} PP={args.pp_size} " + f"nodes={args.nnodes} GPUs/node={gpus_per_node}") - # Reserve GPUs across nodes + # --- Placement group: one bundle per node --- + strategy = "STRICT_PACK" if args.nnodes == 1 else "STRICT_SPREAD" pg = placement_group( + name="engine_group", bundles=[{"CPU": 1, "GPU": gpus_per_node}] * args.nnodes, - strategy="STRICT_PACK" if args.nnodes == 1 else "STRICT_SPREAD", + strategy=strategy, ) ray.get(pg.ready()) + print("Placement group ready.") pg_strategy = PlacementGroupSchedulingStrategy( placement_group=pg, placement_group_bundle_index=0, ) - # Resolve the IP of the node where the server will run + # --- Resolve the node IP where the server will run --- node_ip = ray.get( - get_node_ip.options(num_cpus=0, scheduling_strategy=pg_strategy).remote() + _get_node_ip.options( + num_cpus=0, + scheduling_strategy=pg_strategy, + ).remote() ) url = f"http://{node_ip}:{args.port}" print(f"Server URL: {url}") - # Launch the HTTP server (runs until cancelled) - server_ref = launch_server.options( - num_cpus=1, num_gpus=0, scheduling_strategy=pg_strategy, + # --- Launch the HTTP server as a Ray task (blocks until server exits) --- + server_ref = _launch_server.options( + num_cpus=1, + num_gpus=0, + scheduling_strategy=pg_strategy, ).remote( model_path=args.model_path, tp_size=args.tp_size, @@ -92,25 +85,58 @@ def main(): use_ray=True, ) - wait_for_healthy(url, server_ref) - print("Server healthy.") - - # Test request - resp = requests.post( - f"{url}/generate", - json={ - "text": "The capital of France is", - "sampling_params": {"max_new_tokens": 32, "temperature": 0.0}, - }, - timeout=60, - ) - resp.raise_for_status() - print(f"Test response: {resp.json()}") - - # Cleanup + # --- Health check --- + print("Waiting for server to be healthy...") + t0 = time.time() + timeout = 600 + healthy = False + while time.time() - t0 < timeout: + # Check if the task crashed early + ready, _ = ray.wait([server_ref], timeout=0) + if ready: + # Task finished unexpectedly — surface the error + ray.get(server_ref) + print("ERROR: server task exited before becoming healthy.") + ray.util.remove_placement_group(pg) + return 1 + try: + if requests.get(f"{url}/health", timeout=5).status_code == 200: + healthy = True + break + except requests.exceptions.RequestException: + pass + time.sleep(5) + elapsed = int(time.time() - t0) + if elapsed % 30 == 0: + print(f" {elapsed}s elapsed...") + + if not healthy: + print("ERROR: server did not become healthy within timeout.") + ray.cancel(server_ref, force=True) + ray.util.remove_placement_group(pg) + return 1 + + print(f"Server healthy ({int(time.time() - t0)}s).") + + # --- Test request --- + try: + resp = requests.post( + f"{url}/generate", + json={"text": "The capital of France is", + "sampling_params": {"max_new_tokens": 32, "temperature": 0.0}}, + timeout=60, + ) + resp.raise_for_status() + print(f"Test response: {resp.json()}") + except requests.exceptions.RequestException as e: + print(f"Warning: test request failed: {e}") + + # --- Shutdown --- ray.cancel(server_ref, force=True) ray.util.remove_placement_group(pg) + print("Done.") + return 0 if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/sglang_ray_inference/job_offline.yaml b/sglang_ray_inference/job_offline.yaml index fc7b305..4b70570 100644 --- a/sglang_ray_inference/job_offline.yaml +++ b/sglang_ray_inference/job_offline.yaml @@ -22,13 +22,15 @@ working_dir: . entrypoint: | set -e - # Run offline inference - echo "Running offline inference..." echo "==========================================" + echo "SGLang Offline Inference" + echo "==========================================" + python driver_offline.py \ --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 env_vars: + NCCL_DEBUG: INFO NCCL_IB_DISABLE: "0" NCCL_NET_GDR_LEVEL: "2" RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "1" diff --git a/sglang_ray_inference/job_online.yaml b/sglang_ray_inference/job_online.yaml index ac8bcab..141b697 100644 --- a/sglang_ray_inference/job_online.yaml +++ b/sglang_ray_inference/job_online.yaml @@ -1,7 +1,7 @@ # Anyscale Job: SGLang Online (HTTP Server) Inference with Ray Actor Backend # # Configuration: TP=8 across 2 nodes (4 GPUs per node) -# Submit: anyscale job submit -f examples/anyscale/job_online.yaml +# Submit: anyscale job submit -f job_online.yaml name: sglang-online-inference @@ -23,16 +23,14 @@ entrypoint: | set -e echo "==========================================" - echo "Running online inference..." + echo "SGLang Online Server" echo "==========================================" - # Start online HTTP server - echo "Starting online server..." - echo "==========================================" python driver_online.py \ --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 env_vars: + NCCL_DEBUG: INFO NCCL_IB_DISABLE: "0" NCCL_NET_GDR_LEVEL: "2" RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "1" From 785a4c3f87c2467a6f0a99e15b541cbc29f72289 Mon Sep 17 00:00:00 2001 From: xyuzh Date: Sat, 21 Feb 2026 19:38:17 -0800 Subject: [PATCH 4/5] Revert Dockerfile to use sglang from feature branch fork Co-Authored-By: Claude Opus 4.6 --- sglang_ray_inference/Dockerfile | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/sglang_ray_inference/Dockerfile b/sglang_ray_inference/Dockerfile index a10cfcd..c787649 100644 --- a/sglang_ray_inference/Dockerfile +++ b/sglang_ray_inference/Dockerfile @@ -49,7 +49,7 @@ RUN curl -LsSf https://astral.sh/uv/install.sh | sh ENV PATH="/home/ray/.local/bin:${PATH}" RUN uv pip install --system \ - "sglang[all]==0.5.8" \ + "sglang[all] @ git+https://github.com/xyuzh/sglang.git@feature/ray-actor-scheduler#subdirectory=python" \ numpy \ transformers \ accelerate \ @@ -57,12 +57,5 @@ RUN uv pip install --system \ requests \ httpx -RUN uv pip install --system "sgl-kernel==0.3.21" - -# ============================================================================= -# Verification -# ============================================================================= -RUN nvcc --version && \ - python -c "import sglang; print(f'SGLang version: {sglang.__version__}')" WORKDIR /home/ray/default From c58a970d1cd0e22ef1c279941ae6d4ed34d8ff84 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 21 Feb 2026 19:20:54 -0800 Subject: [PATCH 5/5] Refactor SGLang example: batch inference works, serve in progress - Rename sglang_ray_inference -> sglang_inference - Batch inference (job.yaml + driver_offline.py) fully working with multi-node TP=4, PP=2 using SGLang's use_ray=True mode - Ray Serve deployment (service.yaml + serve.py) uses same pattern as official Ray LLM SGLang integration with signal monkey-patching - Add query.py script for testing the service - Simplify configuration with environment variables The serving example is still being validated with multi-replica autoscaling. Single replica works; investigating occasional timeouts with multiple replicas. Co-Authored-By: Claude Opus 4.5 Signed-off-by: Robert Nishihara --- sglang_inference/Dockerfile | 39 +++++++ sglang_inference/README.md | 93 ++++++++++++++++ sglang_inference/driver_offline.py | 105 ++++++++++++++++++ sglang_inference/job.yaml | 36 +++++++ sglang_inference/query.py | 29 +++++ sglang_inference/serve.py | 80 ++++++++++++++ sglang_inference/service.yaml | 35 ++++++ sglang_ray_inference/Dockerfile | 61 ----------- sglang_ray_inference/README.md | 76 ------------- sglang_ray_inference/driver_offline.py | 110 ------------------- sglang_ray_inference/driver_online.py | 142 ------------------------- sglang_ray_inference/job_offline.yaml | 39 ------- sglang_ray_inference/job_online.yaml | 39 ------- 13 files changed, 417 insertions(+), 467 deletions(-) create mode 100644 sglang_inference/Dockerfile create mode 100644 sglang_inference/README.md create mode 100644 sglang_inference/driver_offline.py create mode 100644 sglang_inference/job.yaml create mode 100644 sglang_inference/query.py create mode 100644 sglang_inference/serve.py create mode 100644 sglang_inference/service.yaml delete mode 100644 sglang_ray_inference/Dockerfile delete mode 100644 sglang_ray_inference/README.md delete mode 100644 sglang_ray_inference/driver_offline.py delete mode 100644 sglang_ray_inference/driver_online.py delete mode 100644 sglang_ray_inference/job_offline.yaml delete mode 100644 sglang_ray_inference/job_online.yaml diff --git a/sglang_inference/Dockerfile b/sglang_inference/Dockerfile new file mode 100644 index 0000000..061792a --- /dev/null +++ b/sglang_inference/Dockerfile @@ -0,0 +1,39 @@ +FROM anyscale/ray:2.53.0-py312-cu129 + +# System dependencies for building SGLang extensions +RUN sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends \ + build-essential \ + cmake \ + ninja-build \ + libnuma-dev \ + curl \ + && sudo rm -rf /var/lib/apt/lists/* + +# CUDA toolkit (nvcc) for compiling CUDA kernels +RUN curl -fsSL https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-keyring_1.1-1_all.deb -o /tmp/cuda-keyring.deb && \ + sudo dpkg -i /tmp/cuda-keyring.deb && \ + rm /tmp/cuda-keyring.deb && \ + sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends \ + cuda-nvcc-12-9 \ + cuda-cudart-dev-12-9 \ + cuda-crt-12-9 \ + && sudo rm -rf /var/lib/apt/lists/* && \ + sudo rm -rf /usr/local/cuda && \ + sudo ln -s /usr/local/cuda-12.9 /usr/local/cuda + +ENV PATH="/usr/local/cuda/bin:${PATH}" +ENV CUDA_HOME="/usr/local/cuda" +ENV LD_LIBRARY_PATH="/usr/local/cuda/lib64:${LD_LIBRARY_PATH}" + +# Python dependencies +RUN curl -LsSf https://astral.sh/uv/install.sh | sh +ENV PATH="/home/ray/.local/bin:${PATH}" + +# Install SGLang from a feature branch that supports Ray actor scheduling. +# Replace with a released version after this feature is merged into a future SGLang release. +RUN uv pip install --system \ + "sglang[all] @ git+https://github.com/xyuzh/sglang.git@feature/ray-actor-scheduler#subdirectory=python" + +WORKDIR /home/ray/default diff --git a/sglang_inference/README.md b/sglang_inference/README.md new file mode 100644 index 0000000..94affe0 --- /dev/null +++ b/sglang_inference/README.md @@ -0,0 +1,93 @@ +# Deploy SGLang Multi-Node Inference + +This example deploys [SGLang](https://github.com/sgl-project/sglang) for multi-node tensor-parallel inference using Ray on Anyscale. + +## Install the Anyscale CLI + +```bash +pip install -U anyscale +anyscale login +``` + +## Clone the example + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/sglang_inference +``` + +## Batch inference + +Run batch inference as an Anyscale job: + +```bash +anyscale job submit -f job.yaml +``` + +Or with the larger model: + +```bash +anyscale job submit -f job.yaml --env MODEL_PATH=Qwen/Qwen3-30B-A3B-Instruct-2507 +``` + +## Deploy as a service + +Deploy as an HTTP endpoint with Ray Serve: + +```bash +anyscale service deploy -f service.yaml +``` + +Or with the larger model: + +```bash +anyscale service deploy -f service.yaml --env MODEL_PATH=Qwen/Qwen3-30B-A3B-Instruct-2507 +``` + +Wait for the service to be ready: + +```bash +anyscale service wait --name sglang-inference --state RUNNING --timeout-s 900 +``` + +The `anyscale service deploy` command outputs a line that looks like: + +```text +curl -H "Authorization: Bearer " +``` + +Set the environment variables from this output and query the model: + +```bash +export SERVICE_URL= +export SERVICE_TOKEN= + +pip install requests +python query.py +``` + +Shutdown the service when done: + +```bash +anyscale service terminate --name sglang-inference +``` + +## Understanding the example + +- [serve.py](https://github.com/anyscale/examples/blob/main/sglang_inference/serve.py) uses Ray Serve's [`placement_group_bundles`](https://docs.ray.io/en/latest/serve/advanced-guides/replica-scheduling.html) to reserve GPUs across multiple nodes for tensor-parallel inference. +- [driver_offline.py](https://github.com/anyscale/examples/blob/main/sglang_inference/driver_offline.py) wraps SGLang in a Ray actor for batch inference. +- SGLang is imported inside the actor because it initializes CUDA and cannot be imported on CPU-only nodes. +- The default configuration uses TP=4, PP=2 across 2 nodes (8 GPUs per replica) on A10G GPUs. Other GPU types like L4, L40S, A100, and H100 would also work. +- The service autoscales from 1-4 replicas based on queue depth. See [AutoscalingConfig](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.config.AutoscalingConfig.html) for tuning. +- The [Dockerfile](https://github.com/anyscale/examples/blob/main/sglang_inference/Dockerfile) installs CUDA toolkit and SGLang dependencies on top of the Ray base image. + +**Environment variables:** + +Override any variable at deploy/submit time with `--env`: + +| Variable | Default | Description | +|----------|---------|-------------| +| `MODEL_PATH` | `Qwen/Qwen3-1.7B` | HuggingFace model ID | +| `TP_SIZE` | `4` | Tensor parallelism (GPUs per pipeline stage) | +| `PP_SIZE` | `2` | Pipeline parallelism (number of stages) | +| `NUM_NODES` | `2` | Nodes per replica | diff --git a/sglang_inference/driver_offline.py b/sglang_inference/driver_offline.py new file mode 100644 index 0000000..361b8fa --- /dev/null +++ b/sglang_inference/driver_offline.py @@ -0,0 +1,105 @@ +""" +Offline (batch) inference with SGLang on Ray. + +Wraps sglang.Engine in a Ray actor for multi-node batch generation. +The driver (head node) needs no GPU — sglang is imported only inside the actor. + +Usage: + python driver_offline.py +""" + +import os +import time + +import ray +from ray.util.placement_group import placement_group +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy + +# Configuration from environment (same as serve.py) +MODEL_PATH = os.environ.get("MODEL_PATH", "Qwen/Qwen3-1.7B") +TP_SIZE = int(os.environ.get("TP_SIZE", "4")) +PP_SIZE = int(os.environ.get("PP_SIZE", "2")) +NUM_NODES = int(os.environ.get("NUM_NODES", "2")) + + +@ray.remote +class EngineActor: + """Thin wrapper that creates an sglang.Engine inside a Ray actor. + + We import sglang inside the actor because it initializes CUDA and + cannot be imported on the CPU-only head node where the driver runs. + """ + + def __init__(self, **kwargs): + from sglang import Engine + + self.engine = Engine(**kwargs) + + def generate(self, prompts, sampling_params): + return [ + self.engine.generate(prompt=p, sampling_params=sampling_params) + for p in prompts + ] + + def shutdown(self): + self.engine.shutdown() + + +def main(): + gpus_per_node = (TP_SIZE * PP_SIZE) // NUM_NODES + + print(f"Configuration: MODEL_PATH={MODEL_PATH}, TP={TP_SIZE}, PP={PP_SIZE}, NUM_NODES={NUM_NODES}") + print(f"GPUs per node: {gpus_per_node}") + + # Reserve GPUs across nodes + pg = placement_group( + bundles=[{"CPU": 1, "GPU": gpus_per_node}] * NUM_NODES, + ) + ray.get(pg.ready()) + print("Placement group ready.") + + # Start engine actor on the first bundle + engine = EngineActor.options( + num_cpus=1, + num_gpus=0, + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_bundle_index=0, + ), + ).remote( + model_path=MODEL_PATH, + tp_size=TP_SIZE, + pp_size=PP_SIZE, + nnodes=NUM_NODES, + use_ray=True, + ) + + # Wait for engine to be ready (model loaded) + print("Loading model...") + ray.get(engine.generate.remote(["warmup"], {"max_new_tokens": 1})) + print("Engine ready.") + + # Batch generate + prompts = [ + "The capital of France is", + "Explain quantum computing in simple terms:", + "Write a haiku about programming:", + "What is 2 + 2?", + ] + + t0 = time.time() + results = ray.get( + engine.generate.remote(prompts, {"max_new_tokens": 64, "temperature": 0.0}) + ) + print(f"Generated {len(results)} responses in {time.time() - t0:.2f}s\n") + + for prompt, result in zip(prompts, results): + print(f"Prompt: {prompt}") + print(f"Response: {result['text'][:200]}\n") + + # Cleanup + ray.get(engine.shutdown.remote()) + ray.util.remove_placement_group(pg) + + +if __name__ == "__main__": + main() diff --git a/sglang_inference/job.yaml b/sglang_inference/job.yaml new file mode 100644 index 0000000..c32a7ca --- /dev/null +++ b/sglang_inference/job.yaml @@ -0,0 +1,36 @@ +# Anyscale Job: SGLang Offline (Batch) Inference +# +# Configuration: TP=4, PP=2 across 2 nodes (4 GPUs per node) +# +# Submit (small model, fast): +# anyscale job submit -f job.yaml +# +# Submit (large model): +# anyscale job submit -f job.yaml --env MODEL_PATH=Qwen/Qwen3-30B-A3B-Instruct-2507 + +name: sglang-offline-inference + +containerfile: ./Dockerfile + +compute_config: + head_node: + instance_type: m5.2xlarge # CPU-only head + worker_nodes: + - instance_type: g5.12xlarge # 4x A10G + min_nodes: 4 + max_nodes: 4 + +env_vars: + MODEL_PATH: "Qwen/Qwen3-1.7B" + TP_SIZE: "4" + PP_SIZE: "2" + NUM_NODES: "2" + # Ray normally sets CUDA_VISIBLE_DEVICES for each worker process. + # Disable this because SGLang assumes CUDA_VISIBLE_DEVICES lists all GPUs. + RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "1" + +working_dir: . + +entrypoint: python driver_offline.py + +max_retries: 0 diff --git a/sglang_inference/query.py b/sglang_inference/query.py new file mode 100644 index 0000000..90af229 --- /dev/null +++ b/sglang_inference/query.py @@ -0,0 +1,29 @@ +"""Query the SGLang inference service.""" + +import os +import requests + +SERVICE_URL = os.environ.get("SERVICE_URL") +SERVICE_TOKEN = os.environ.get("SERVICE_TOKEN") + +if not SERVICE_URL or not SERVICE_TOKEN: + print("Set SERVICE_URL and SERVICE_TOKEN from 'anyscale service deploy' output") + raise SystemExit(1) + +prompts = [ + "The capital of France is", + "Explain quantum computing in one sentence:", + "Write a haiku about programming:", + "What is 2 + 2?", + "The largest planet in our solar system is", +] + +for prompt in prompts: + response = requests.post( + SERVICE_URL, + headers={"Authorization": f"Bearer {SERVICE_TOKEN}"}, + json={"text": prompt, "sampling_params": {"max_new_tokens": 32}}, + timeout=120, + ) + response.raise_for_status() + print(f"{prompt}{response.json()['text']}\n") diff --git a/sglang_inference/serve.py b/sglang_inference/serve.py new file mode 100644 index 0000000..0bc5533 --- /dev/null +++ b/sglang_inference/serve.py @@ -0,0 +1,80 @@ +"""Ray Serve deployment for SGLang inference. + +This deployment uses Ray Serve's placement_group_bundles to reserve GPUs +across multiple nodes for tensor-parallel inference with SGLang. + +Based on the Ray Serve LLM + SGLang integration pattern from: +https://github.com/ray-project/ray/pull/58366 +""" + +import os +import signal + +from fastapi import FastAPI +from ray import serve + +# Configuration from environment (same defaults as driver_offline.py) +MODEL_PATH = os.environ.get("MODEL_PATH", "Qwen/Qwen3-1.7B") +TP_SIZE = int(os.environ.get("TP_SIZE", "4")) +PP_SIZE = int(os.environ.get("PP_SIZE", "2")) +NUM_NODES = int(os.environ.get("NUM_NODES", "2")) + +gpus_per_node = (TP_SIZE * PP_SIZE) // NUM_NODES + +app = FastAPI() + + +@serve.deployment( + autoscaling_config={ + "min_replicas": 1, + "max_replicas": 4, + "target_ongoing_requests": 4, + }, + ray_actor_options={ + "num_cpus": 1, + "num_gpus": 0, + }, + # Reserve resources across multiple nodes for tensor parallelism. + # Each bundle reserves GPUs on one node. + placement_group_bundles=[{"CPU": 1, "GPU": gpus_per_node}] * NUM_NODES, +) +@serve.ingress(app) +class SGLangDeployment: + def __init__(self): + # Import sglang inside the actor because it initializes CUDA and + # cannot be imported on the CPU-only head node where the Serve + # controller runs. + from sglang import Engine + + # Monkey patch signal.signal to avoid "signal only works in main thread" + # error. SGLang tries to register signal handlers for graceful shutdown, + # but Ray Serve workers are not in the main thread. + original_signal = signal.signal + + def noop_signal_handler(sig, action): + return signal.SIG_DFL + + try: + signal.signal = noop_signal_handler + self.engine = Engine( + model_path=MODEL_PATH, + tp_size=TP_SIZE, + pp_size=PP_SIZE, + nnodes=NUM_NODES, + use_ray=True, + ) + finally: + signal.signal = original_signal + + @app.post("/") + async def generate(self, request: dict) -> dict: + text = request.get("text", "") + sampling_params = request.get("sampling_params", {"max_new_tokens": 64}) + result = await self.engine.async_generate( + prompt=text, + sampling_params=sampling_params, + ) + return {"text": result["text"]} + + +app_deploy = SGLangDeployment.bind() diff --git a/sglang_inference/service.yaml b/sglang_inference/service.yaml new file mode 100644 index 0000000..c3473b5 --- /dev/null +++ b/sglang_inference/service.yaml @@ -0,0 +1,35 @@ +# Anyscale Service: SGLang Online Inference +# +# Configuration: TP=4, PP=2 across 2 nodes (4 GPUs per node) +# +# Deploy (small model, fast): +# anyscale service deploy -f service.yaml +# +# Deploy (large model): +# anyscale service deploy -f service.yaml --env MODEL_PATH=Qwen/Qwen3-30B-A3B-Instruct-2507 + +name: sglang-inference + +containerfile: ./Dockerfile + +compute_config: + head_node: + instance_type: m5.2xlarge # CPU-only head + worker_nodes: + - instance_type: g5.12xlarge # 4x A10G + min_nodes: 4 + max_nodes: 8 + +env_vars: + MODEL_PATH: "Qwen/Qwen3-1.7B" + TP_SIZE: "4" + PP_SIZE: "2" + NUM_NODES: "2" + # Ray normally sets CUDA_VISIBLE_DEVICES for each worker process. + # Disable this because SGLang assumes CUDA_VISIBLE_DEVICES lists all GPUs. + RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "1" + +working_dir: . + +applications: + - import_path: serve:app_deploy diff --git a/sglang_ray_inference/Dockerfile b/sglang_ray_inference/Dockerfile deleted file mode 100644 index c787649..0000000 --- a/sglang_ray_inference/Dockerfile +++ /dev/null @@ -1,61 +0,0 @@ -FROM anyscale/ray:2.53.0-py312-cu129 - -# ============================================================================= -# System Dependencies -# ============================================================================= -RUN sudo apt-get update && \ - sudo apt-get install -y --no-install-recommends \ - build-essential \ - cmake \ - ninja-build \ - libnuma1 \ - libnuma-dev \ - numactl \ - git \ - curl \ - wget \ - netcat \ - && sudo rm -rf /var/lib/apt/lists/* - -# ============================================================================= -# CUDA Toolkit (nvcc compiler) - CUDA 12.9 to match base image -# ============================================================================= -RUN curl -fsSL https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-keyring_1.1-1_all.deb -o /tmp/cuda-keyring.deb && \ - sudo dpkg -i /tmp/cuda-keyring.deb && \ - rm /tmp/cuda-keyring.deb && \ - sudo apt-get update && \ - sudo apt-get install -y --no-install-recommends \ - cuda-nvcc-12-9 \ - cuda-cudart-dev-12-9 \ - cuda-crt-12-9 \ - && sudo rm -rf /var/lib/apt/lists/* - -# Create/update CUDA symlink -RUN sudo rm -rf /usr/local/cuda && \ - sudo ln -s /usr/local/cuda-12.9 /usr/local/cuda - -# ============================================================================= -# Environment Variables -# ============================================================================= -ENV PATH="/usr/local/cuda/bin:${PATH}" -ENV CUDA_HOME="/usr/local/cuda" -ENV LD_LIBRARY_PATH="/usr/local/cuda/lib64:${LD_LIBRARY_PATH}" -ENV RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES="1" - -# ============================================================================= -# Python Dependencies -# ============================================================================= -RUN curl -LsSf https://astral.sh/uv/install.sh | sh -ENV PATH="/home/ray/.local/bin:${PATH}" - -RUN uv pip install --system \ - "sglang[all] @ git+https://github.com/xyuzh/sglang.git@feature/ray-actor-scheduler#subdirectory=python" \ - numpy \ - transformers \ - accelerate \ - huggingface_hub \ - requests \ - httpx - - -WORKDIR /home/ray/default diff --git a/sglang_ray_inference/README.md b/sglang_ray_inference/README.md deleted file mode 100644 index 85ceef1..0000000 --- a/sglang_ray_inference/README.md +++ /dev/null @@ -1,76 +0,0 @@ -# Multi-Node SGLang on Anyscale with Ray Actor Backend - -Run SGLang across multiple GPU nodes on Anyscale using Ray placement groups. -The head node needs no GPU — sglang is imported only inside Ray actors. - -Two driver scripts: - -- **`driver_offline.py`** — batch inference via `sglang.Engine` -- **`driver_online.py`** — HTTP server via `sglang.srt.entrypoints.http_server` - -## Files - -| File | Purpose | -|------|---------| -| `driver_offline.py` | Batch inference driver | -| `driver_online.py` | HTTP server driver | -| `job_offline.yaml` | Anyscale job config for batch inference | -| `job_online.yaml` | Anyscale job config for HTTP server | -| `Dockerfile` | Base image (`anyscale/ray` + sglang) | - -## Quick Start - -### Prerequisites - -- An [Anyscale](https://www.anyscale.com/) account with GPU quota -- The `anyscale` CLI installed and configured - -### Run Offline Inference - -```bash -cd sglang_ray_inference -anyscale job submit -f job_offline.yaml -``` - -### Run Online Inference - -```bash -cd sglang_ray_inference -anyscale job submit -f job_online.yaml -``` - -### Run Locally (single node, requires GPU) - -```bash -python driver_offline.py --model-path Qwen/Qwen3-1.7B --tp-size 1 --nnodes 1 -python driver_online.py --model-path Qwen/Qwen3-1.7B --tp-size 1 --nnodes 1 -``` - -## CLI Arguments - -Both scripts accept the same arguments: - -| Argument | Default | Description | -|----------|---------|-------------| -| `--model-path` | `Qwen/Qwen3-1.7B` | HuggingFace model ID or local path | -| `--tp-size` | `4` | Tensor parallelism size | -| `--pp-size` | `1` | Pipeline parallelism size | -| `--nnodes` | `2` | Number of nodes | -| `--port` | `30000` | Server port | - -## How It Works - -1. **Placement group** — one bundle per node (`{"CPU": 1, "GPU": gpus_per_node}`). - `STRICT_PACK` for 1 node, `STRICT_SPREAD` for multi-node. -2. **Driver actor** — scheduled on bundle 0 with `num_cpus=1, num_gpus=0`. - Imports sglang inside the actor to avoid GPU-less head node issues. -3. **SchedulerActors** — created internally by sglang, each claiming `num_gpus=1` - from the correct node's bundle. - -## Troubleshooting - -- **Server timeout** — increase the 600s health-check timeout or check NCCL - connectivity (`NCCL_DEBUG=INFO`). -- **OOM** — reduce model size or use more GPUs. -- **Connection refused** — ensure security groups allow inter-node traffic on - the server port and NCCL ports. diff --git a/sglang_ray_inference/driver_offline.py b/sglang_ray_inference/driver_offline.py deleted file mode 100644 index 9b5f0f5..0000000 --- a/sglang_ray_inference/driver_offline.py +++ /dev/null @@ -1,110 +0,0 @@ -""" -Offline (Batch) Inference with SGLang on Ray - -Runs sglang.Engine inside a Ray actor for batch generation. -The head node needs no GPU — sglang is imported only inside the actor. - -Usage: - python driver_offline.py --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 -""" - -import argparse -import sys -import time - -import ray -from ray.util.placement_group import placement_group -from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy - - -def main(): - parser = argparse.ArgumentParser(description="SGLang offline inference via Ray") - parser.add_argument("--model-path", type=str, default="Qwen/Qwen3-1.7B") - parser.add_argument("--tp-size", type=int, default=4) - parser.add_argument("--pp-size", type=int, default=1) - parser.add_argument("--nnodes", type=int, default=2) - parser.add_argument("--port", type=int, default=30000) - args = parser.parse_args() - - world_size = args.tp_size * args.pp_size - gpus_per_node = world_size // args.nnodes - - # --- Ray init --- - print(f"Model={args.model_path} TP={args.tp_size} PP={args.pp_size} " - f"nodes={args.nnodes} GPUs/node={gpus_per_node}") - - # --- Placement group: one bundle per node --- - strategy = "STRICT_PACK" if args.nnodes == 1 else "STRICT_SPREAD" - pg = placement_group( - name="engine_group", - bundles=[{"CPU": 1, "GPU": gpus_per_node}] * args.nnodes, - strategy=strategy, - ) - ray.get(pg.ready()) - print("Placement group ready.") - - # --- Engine actor (sglang imported inside, not on head node) --- - @ray.remote - class EngineActor: - def __init__(self, **kwargs): - from sglang import Engine - self.engine = Engine(**kwargs) - - def is_ready(self): - return True - - def generate(self, prompts, sampling_params): - return [ - {"prompt": p, "text": self.engine.generate(prompt=p, sampling_params=sampling_params)["text"]} - for p in prompts - ] - - def shutdown(self): - if self.engine: - self.engine.shutdown() - self.engine = None - - engine = EngineActor.options( - num_cpus=1, - num_gpus=0, - scheduling_strategy=PlacementGroupSchedulingStrategy( - placement_group=pg, placement_group_bundle_index=0, - ), - ).remote( - model_path=args.model_path, - tp_size=args.tp_size, - pp_size=args.pp_size, - nnodes=args.nnodes, - port=args.port, - use_ray=True, - ) - - print("Waiting for engine...") - ray.get(engine.is_ready.remote()) - print("Engine ready.\n") - - # --- Generate --- - prompts = [ - "The capital of France is", - "Explain quantum computing in simple terms:", - "Write a haiku about programming:", - "What is 2 + 2?", - ] - - t0 = time.time() - results = ray.get(engine.generate.remote(prompts, {"max_new_tokens": 64, "temperature": 0.0})) - print(f"Generated {len(results)} responses in {time.time() - t0:.2f}s\n") - - for r in results: - print(f"Prompt: {r['prompt']}") - print(f"Response: {r['text'][:200]}") - print("-" * 60) - - # --- Cleanup --- - ray.get(engine.shutdown.remote()) - ray.util.remove_placement_group(pg) - print("\nDone.") - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/sglang_ray_inference/driver_online.py b/sglang_ray_inference/driver_online.py deleted file mode 100644 index 1235bcc..0000000 --- a/sglang_ray_inference/driver_online.py +++ /dev/null @@ -1,142 +0,0 @@ -""" -Online (HTTP Server) Inference with SGLang on Ray - -Launches the SGLang HTTP server as a Ray remote function. -The head node needs no GPU — sglang is imported only inside the task. - -Usage: - python driver_online.py --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 -""" - -import argparse -import sys -import time - -import ray -import requests -from ray.util.placement_group import placement_group -from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy - - -@ray.remote -def _get_node_ip(): - return ray.util.get_node_ip_address() - - -@ray.remote -def _launch_server(**server_kwargs): - from sglang.srt.entrypoints.http_server import launch_server - from sglang.srt.server_args import ServerArgs - - launch_server(ServerArgs(**server_kwargs)) - - -def main(): - parser = argparse.ArgumentParser(description="SGLang HTTP server via Ray") - parser.add_argument("--model-path", type=str, default="Qwen/Qwen3-1.7B") - parser.add_argument("--tp-size", type=int, default=4) - parser.add_argument("--pp-size", type=int, default=1) - parser.add_argument("--nnodes", type=int, default=2) - parser.add_argument("--port", type=int, default=30000) - args = parser.parse_args() - - world_size = args.tp_size * args.pp_size - gpus_per_node = world_size // args.nnodes - - print(f"Model={args.model_path} TP={args.tp_size} PP={args.pp_size} " - f"nodes={args.nnodes} GPUs/node={gpus_per_node}") - - # --- Placement group: one bundle per node --- - strategy = "STRICT_PACK" if args.nnodes == 1 else "STRICT_SPREAD" - pg = placement_group( - name="engine_group", - bundles=[{"CPU": 1, "GPU": gpus_per_node}] * args.nnodes, - strategy=strategy, - ) - ray.get(pg.ready()) - print("Placement group ready.") - - pg_strategy = PlacementGroupSchedulingStrategy( - placement_group=pg, placement_group_bundle_index=0, - ) - - # --- Resolve the node IP where the server will run --- - node_ip = ray.get( - _get_node_ip.options( - num_cpus=0, - scheduling_strategy=pg_strategy, - ).remote() - ) - url = f"http://{node_ip}:{args.port}" - print(f"Server URL: {url}") - - # --- Launch the HTTP server as a Ray task (blocks until server exits) --- - server_ref = _launch_server.options( - num_cpus=1, - num_gpus=0, - scheduling_strategy=pg_strategy, - ).remote( - model_path=args.model_path, - tp_size=args.tp_size, - pp_size=args.pp_size, - nnodes=args.nnodes, - port=args.port, - host="0.0.0.0", - use_ray=True, - ) - - # --- Health check --- - print("Waiting for server to be healthy...") - t0 = time.time() - timeout = 600 - healthy = False - while time.time() - t0 < timeout: - # Check if the task crashed early - ready, _ = ray.wait([server_ref], timeout=0) - if ready: - # Task finished unexpectedly — surface the error - ray.get(server_ref) - print("ERROR: server task exited before becoming healthy.") - ray.util.remove_placement_group(pg) - return 1 - try: - if requests.get(f"{url}/health", timeout=5).status_code == 200: - healthy = True - break - except requests.exceptions.RequestException: - pass - time.sleep(5) - elapsed = int(time.time() - t0) - if elapsed % 30 == 0: - print(f" {elapsed}s elapsed...") - - if not healthy: - print("ERROR: server did not become healthy within timeout.") - ray.cancel(server_ref, force=True) - ray.util.remove_placement_group(pg) - return 1 - - print(f"Server healthy ({int(time.time() - t0)}s).") - - # --- Test request --- - try: - resp = requests.post( - f"{url}/generate", - json={"text": "The capital of France is", - "sampling_params": {"max_new_tokens": 32, "temperature": 0.0}}, - timeout=60, - ) - resp.raise_for_status() - print(f"Test response: {resp.json()}") - except requests.exceptions.RequestException as e: - print(f"Warning: test request failed: {e}") - - # --- Shutdown --- - ray.cancel(server_ref, force=True) - ray.util.remove_placement_group(pg) - print("Done.") - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/sglang_ray_inference/job_offline.yaml b/sglang_ray_inference/job_offline.yaml deleted file mode 100644 index 4b70570..0000000 --- a/sglang_ray_inference/job_offline.yaml +++ /dev/null @@ -1,39 +0,0 @@ -# Anyscale Job: SGLang Offline (Batch) Inference with Ray Actor Backend -# -# Configuration: TP=8 across 2 nodes (4 GPUs per node) -# Submit: anyscale job submit -f job_offline.yaml - -name: sglang-offline-inference - -cloud: - -compute_config: - head_node: - instance_type: m5.2xlarge # CPU-only head - worker_nodes: - - instance_type: g5.12xlarge # 4x A10G - min_nodes: 2 - max_nodes: 2 - -containerfile: ./Dockerfile - -working_dir: . - -entrypoint: | - set -e - - echo "==========================================" - echo "SGLang Offline Inference" - echo "==========================================" - - python driver_offline.py \ - --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 - -env_vars: - NCCL_DEBUG: INFO - NCCL_IB_DISABLE: "0" - NCCL_NET_GDR_LEVEL: "2" - RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "1" - SGLANG_SKIP_SGL_KERNEL_VERSION_CHECK: "1" - -max_retries: 0 diff --git a/sglang_ray_inference/job_online.yaml b/sglang_ray_inference/job_online.yaml deleted file mode 100644 index 141b697..0000000 --- a/sglang_ray_inference/job_online.yaml +++ /dev/null @@ -1,39 +0,0 @@ -# Anyscale Job: SGLang Online (HTTP Server) Inference with Ray Actor Backend -# -# Configuration: TP=8 across 2 nodes (4 GPUs per node) -# Submit: anyscale job submit -f job_online.yaml - -name: sglang-online-inference - -cloud: - -compute_config: - head_node: - instance_type: m5.2xlarge # CPU-only head - worker_nodes: - - instance_type: g5.12xlarge # 4x A10G - min_nodes: 2 - max_nodes: 2 - -containerfile: ./Dockerfile - -working_dir: . - -entrypoint: | - set -e - - echo "==========================================" - echo "SGLang Online Server" - echo "==========================================" - - python driver_online.py \ - --model-path Qwen/Qwen3-1.7B --tp-size 8 --nnodes 2 - -env_vars: - NCCL_DEBUG: INFO - NCCL_IB_DISABLE: "0" - NCCL_NET_GDR_LEVEL: "2" - RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES: "1" - SGLANG_SKIP_SGL_KERNEL_VERSION_CHECK: "1" - -max_retries: 0