diff --git a/.gitignore b/.gitignore index 7336dcd..2a3efb4 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ Container-Root/version.txt Container-Root/ray/anyscale/efs_env.sh +__pycache__/ diff --git a/Container-Root/ray/rayservice/disaggregated_prefill_decode/README.md b/Container-Root/ray/rayservice/disaggregated_prefill_decode/README.md new file mode 100644 index 0000000..57630a4 --- /dev/null +++ b/Container-Root/ray/rayservice/disaggregated_prefill_decode/README.md @@ -0,0 +1,163 @@ +# Disaggregated Prefill/Decode Serving + +This example demonstrates **disaggregated prefill/decode** serving for LLMs using [Ray Serve LLM](https://docs.ray.io/en/latest/serve/llm/user-guides/prefill-decode.html) with vLLM's [NIXLConnector](https://docs.vllm.ai/en/stable/features/nixl_connector_usage.html). + +## What is Disaggregated Serving? + +Traditional LLM inference colocates two phases on the same GPU: + +1. **Prefill** — processes the full input prompt in parallel (compute-bound, high FLOPS) +2. **Decode** — generates tokens one at a time autoregressively (memory-bandwidth-bound) + +When colocated, these phases interfere with each other: a long prefill blocks decode and increases inter-token latency (ITL), while ongoing decode delays new prefill requests and increases time-to-first-token (TTFT). + +**Disaggregated serving** separates them onto dedicated instances connected via high-speed KV cache transfer (NIXL), enabling: + +| Benefit | Description | +|---|---| +| **Independent scaling** | Scale prefill and decode replicas separately based on demand | +| **Reduced interference** | Prefill doesn't block decode and vice versa | +| **Cost optimization** | Use different instance types for different workloads | +| **Better latency** | Optimize TTFT and ITL independently | + +## Architecture + +``` + ┌─────────────────┐ + │ Ray Serve │ + │ Router │ + └────────┬────────┘ + │ + ┌──────────────┴──────────────┐ + │ │ + ┌────────▼────────┐ ┌────────▼────────┐ + │ Prefill Pool │ NIXL │ Decode Pool │ + │ (compute-bound)│ ──────► │ (memory-bound) │ + │ 1-2 replicas │ KV cache │ 1-4 replicas │ + └─────────────────┘ transfer └─────────────────┘ +``` + +## Files + +| File | Description | +|---|---| +| `disaggregated_prefill_decode.py` | Python deployment script (can run standalone) | +| `rayservice.disaggregated_prefill_decode.yaml` | KubeRay RayService manifest for Kubernetes | +| `disaggregated_prefill_decode_req.py` | Test client with chat completion and streaming | + +## Prerequisites + +- **Ray** >= 2.44 with `ray[serve]` +- **vLLM** v1 (default engine in Ray Serve LLM) +- **NIXL**: `pip install nixl` (pre-installed in `rayproject/ray-llm` images) +- **GPU nodes** with sufficient VRAM (e.g., NVIDIA A10G for Llama-3.1-8B) +- **HuggingFace token** with access to `meta-llama/Llama-3.1-8B-Instruct` + +## Quick Start + +### Option 1: Kubernetes (RayService) + +```bash +# From the rayservice directory +./rayservice-create.sh disaggregated_prefill_decode + +# Check status +./rayservice-status.sh + +# Wait for pods to be ready, then test +./rayservice-test.sh disaggregated_prefill_decode +``` + +### Option 2: Direct Python + +```bash +# Install dependencies +pip install "ray[serve]" vllm nixl + +# Set HuggingFace token +export HF_TOKEN= + +# Deploy +python disaggregated_prefill_decode.py + +# In another terminal, test +python disaggregated_prefill_decode_req.py +``` + +### Option 3: Ray Serve CLI + +```bash +# Deploy from YAML (extract the serveConfigV2 section) +serve deploy rayservice.disaggregated_prefill_decode.yaml + +# Test +curl -X POST http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "meta-llama/Llama-3.1-8B-Instruct", + "messages": [{"role": "user", "content": "Hello!"}], + "max_tokens": 64 + }' +``` + +## Configuration + +### Changing the Model + +Update `model_id` in both the prefill and decode configs: + +```yaml +prefill_config: + model_loading_config: + model_id: your-org/your-model +decode_config: + model_loading_config: + model_id: your-org/your-model +``` + +### Scaling + +Adjust `min_replicas` / `max_replicas` independently for each phase. A typical pattern is more decode replicas than prefill, since decode is the longer-running phase: + +```yaml +prefill_config: + deployment_config: + autoscaling_config: + min_replicas: 2 + max_replicas: 4 + +decode_config: + deployment_config: + autoscaling_config: + min_replicas: 6 + max_replicas: 10 +``` + +### GPU Instance Types (AWS) + +| Instance | GPU | VRAM | Best For | +|---|---|---|---| +| g5.xlarge | 1x A10G | 24 GB | Small models (7-8B) | +| g5.2xlarge | 1x A10G | 24 GB | Small models, more CPU/RAM | +| p4d.24xlarge | 8x A100 | 320 GB | Large models (70B+) | +| p5.48xlarge | 8x H100 | 640 GB | Largest models, highest throughput | + +### Alternative KV Transfer Backends + +This example uses NIXLConnector. For advanced caching, you can switch to LMCacheConnectorV1: + +```yaml +engine_kwargs: + kv_transfer_config: + kv_connector: LMCacheConnectorV1 + kv_role: kv_producer # or kv_consumer +``` + +See the [Ray Serve docs](https://docs.ray.io/en/latest/serve/llm/user-guides/prefill-decode.html) for LMCache and Mooncake backend configurations. + +## References + +- [Ray Serve Prefill/Decode Disaggregation Guide](https://docs.ray.io/en/latest/serve/llm/user-guides/prefill-decode.html) +- [vLLM NIXLConnector Usage](https://docs.vllm.ai/en/stable/features/nixl_connector_usage.html) +- [DistServe Paper — Disaggregated Inference](https://arxiv.org/abs/2401.09670) +- [Anyscale Blog — Wide-EP and Disaggregated Serving](https://www.anyscale.com/blog/ray-serve-llm-anyscale-apis-wide-ep-disaggregated-serving-vllm) diff --git a/Container-Root/ray/rayservice/disaggregated_prefill_decode/disaggregated_prefill_decode.py b/Container-Root/ray/rayservice/disaggregated_prefill_decode/disaggregated_prefill_decode.py new file mode 100644 index 0000000..4ec37b3 --- /dev/null +++ b/Container-Root/ray/rayservice/disaggregated_prefill_decode/disaggregated_prefill_decode.py @@ -0,0 +1,106 @@ +""" +Disaggregated Prefill/Decode Serving with Ray Serve LLM + +This example demonstrates how to deploy an LLM with prefill/decode +disaggregation using Ray Serve's built-in LLM APIs and vLLM's +NIXLConnector for KV cache transfer. + +Disaggregated serving separates the prefill phase (processing input +prompts) from the decode phase (generating tokens), enabling: + + - Independent scaling of prefill and decode replicas + - Reduced interference between compute-bound prefill and + memory-bound decode + - Cost optimization via heterogeneous hardware + +Prerequisites: + - Ray >= 2.44 with ray[serve] installed + - vLLM v1 (default engine) + - NIXL: pip install nixl (pre-installed in ray-llm images) + - GPU workers with enough VRAM for the model + +Usage: + # Deploy via Ray Serve config (recommended for Kubernetes) + serve deploy rayservice.disaggregated_prefill_decode.yaml + + # Or run directly with Python + python disaggregated_prefill_decode.py + + # Test the endpoint + python disaggregated_prefill_decode_req.py +""" + +from ray.serve.llm import LLMConfig, build_pd_openai_app +import ray.serve as serve + +# Model to serve — change to any HuggingFace model you have access to. +MODEL_ID = "meta-llama/Llama-3.1-8B-Instruct" + +# ── Prefill instance configuration ────────────────────────────────── +# The prefill instance processes input prompts and produces KV cache +# entries that are transferred to decode instances via NIXL. +prefill_config = LLMConfig( + model_loading_config={ + "model_id": MODEL_ID, + }, + deployment_config={ + "autoscaling_config": { + "min_replicas": 1, + "max_replicas": 2, + } + }, + accelerator_type="A10G", + engine_kwargs={ + "kv_transfer_config": { + "kv_connector": "NixlConnector", + "kv_role": "kv_both", + }, + }, +) + +# ── Decode instance configuration ─────────────────────────────────── +# The decode instance generates tokens autoregressively, consuming +# KV cache entries produced by the prefill instance. +decode_config = LLMConfig( + model_loading_config={ + "model_id": MODEL_ID, + }, + deployment_config={ + "autoscaling_config": { + "min_replicas": 1, + "max_replicas": 4, + } + }, + accelerator_type="A10G", + engine_kwargs={ + "kv_transfer_config": { + "kv_connector": "NixlConnector", + "kv_role": "kv_both", + }, + }, +) + +# ── Build and deploy ──────────────────────────────────────────────── +# build_pd_openai_app creates an OpenAI-compatible API with a router +# that directs requests to prefill instances first, then hands off +# KV cache to decode instances for token generation. +pd_config = dict( + prefill_config=prefill_config, + decode_config=decode_config, +) + +app = build_pd_openai_app(pd_config) + +if __name__ == "__main__": + serve.run(app) + print(f"\nDisaggregated serving is running for {MODEL_ID}") + print("Send requests to http://localhost:8000/v1/chat/completions") + print("Press Ctrl+C to stop.\n") + + # Keep the process alive + import time + try: + while True: + time.sleep(10) + except KeyboardInterrupt: + print("Shutting down...") diff --git a/Container-Root/ray/rayservice/disaggregated_prefill_decode/disaggregated_prefill_decode_req.py b/Container-Root/ray/rayservice/disaggregated_prefill_decode/disaggregated_prefill_decode_req.py new file mode 100644 index 0000000..544e24e --- /dev/null +++ b/Container-Root/ray/rayservice/disaggregated_prefill_decode/disaggregated_prefill_decode_req.py @@ -0,0 +1,131 @@ +""" +Test client for the disaggregated prefill/decode serving example. + +Sends a chat completion request to the Ray Serve endpoint and +streams the response. + +Prerequisites: + - The disaggregated serving deployment must be running + (see disaggregated_prefill_decode.py or the RayService YAML) + - Port 8000 must be accessible (use kubectl port-forward for K8s) + +Usage: + python disaggregated_prefill_decode_req.py +""" + +import requests +import json +import sys + +URL = "http://127.0.0.1:8000/v1/chat/completions" + +MODEL_ID = "Qwen/Qwen2.5-7B-Instruct" + + +def test_chat_completion(): + """Send a basic chat completion request.""" + payload = { + "model": MODEL_ID, + "messages": [ + { + "role": "system", + "content": "You are a helpful assistant. Be concise.", + }, + { + "role": "user", + "content": "Explain the benefits of disaggregated prefill and decode for LLM serving in 3 bullet points.", + }, + ], + "max_tokens": 256, + "temperature": 0.7, + } + + print(f"Sending request to {URL} ...") + print(f"Model: {MODEL_ID}") + print("-" * 60) + + try: + response = requests.post( + URL, + headers={"Content-Type": "application/json"}, + json=payload, + timeout=120, + ) + response.raise_for_status() + result = response.json() + + # Print the response + message = result["choices"][0]["message"]["content"] + print(f"\nResponse:\n{message}") + + # Print usage stats if available + if "usage" in result: + usage = result["usage"] + print(f"\nTokens — prompt: {usage.get('prompt_tokens', 'N/A')}, " + f"completion: {usage.get('completion_tokens', 'N/A')}, " + f"total: {usage.get('total_tokens', 'N/A')}") + + except requests.exceptions.ConnectionError: + print("ERROR: Could not connect to the serving endpoint.") + print("Make sure the deployment is running and port 8000 is forwarded.") + print("\n kubectl port-forward svc/ 8000") + sys.exit(1) + except requests.exceptions.HTTPError as e: + print(f"ERROR: HTTP {e.response.status_code}") + print(e.response.text) + sys.exit(1) + + +def test_streaming(): + """Send a streaming chat completion request.""" + payload = { + "model": MODEL_ID, + "messages": [ + { + "role": "user", + "content": "Write a haiku about distributed systems.", + }, + ], + "max_tokens": 64, + "temperature": 0.9, + "stream": True, + } + + print("\nStreaming request:") + print("-" * 60) + + try: + response = requests.post( + URL, + headers={"Content-Type": "application/json"}, + json=payload, + stream=True, + timeout=120, + ) + response.raise_for_status() + + for line in response.iter_lines(): + if line: + decoded = line.decode("utf-8") + if decoded.startswith("data: "): + data = decoded[6:] + if data.strip() == "[DONE]": + break + try: + chunk = json.loads(data) + delta = chunk["choices"][0].get("delta", {}) + content = delta.get("content", "") + if content: + print(content, end="", flush=True) + except json.JSONDecodeError: + pass + print("\n") + + except requests.exceptions.ConnectionError: + print("ERROR: Could not connect for streaming.") + sys.exit(1) + + +if __name__ == "__main__": + test_chat_completion() + test_streaming() diff --git a/Container-Root/ray/rayservice/disaggregated_prefill_decode/rayservice.disaggregated_prefill_decode.yaml b/Container-Root/ray/rayservice/disaggregated_prefill_decode/rayservice.disaggregated_prefill_decode.yaml new file mode 100644 index 0000000..193047d --- /dev/null +++ b/Container-Root/ray/rayservice/disaggregated_prefill_decode/rayservice.disaggregated_prefill_decode.yaml @@ -0,0 +1,147 @@ +apiVersion: ray.io/v1 +kind: RayService +metadata: + name: disaggregated-prefill-decode +spec: + # ── Ray Serve application ───────────────────────────────────────── + # Uses Ray Serve LLM's build_pd_openai_app to deploy an + # OpenAI-compatible endpoint with disaggregated prefill/decode. + # + # The prefill instances process input prompts and transfer KV cache + # to decode instances via NIXL for token generation. + serveConfigV2: | + applications: + - name: disaggregated-prefill-decode + import_path: ray.serve.llm:build_pd_openai_app + route_prefix: "/" + args: + prefill_config: + model_loading_config: + model_id: Qwen/Qwen2.5-7B-Instruct + accelerator_type: H200 + engine_kwargs: + kv_transfer_config: + kv_connector: NixlConnector + kv_role: kv_both + deployment_config: + autoscaling_config: + min_replicas: 1 + max_replicas: 2 + + decode_config: + model_loading_config: + model_id: Qwen/Qwen2.5-7B-Instruct + accelerator_type: H200 + engine_kwargs: + kv_transfer_config: + kv_connector: NixlConnector + kv_role: kv_both + deployment_config: + autoscaling_config: + min_replicas: 1 + max_replicas: 4 + + # ── Ray cluster configuration ───────────────────────────────────── + rayClusterConfig: + rayVersion: '2.44.0' + + headGroupSpec: + rayStartParams: + dashboard-host: '0.0.0.0' + num-gpus: '0' + template: + spec: + containers: + - name: ray-head + image: rayproject/ray-llm:latest-py311-cu128 + ports: + - containerPort: 6379 + name: gcs + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + - containerPort: 8000 + name: serve + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + resources: + limits: + cpu: "4" + memory: "16G" + requests: + cpu: "4" + memory: "16G" + volumes: + - name: ray-logs + emptyDir: {} + + workerGroupSpecs: + # ── Prefill workers ────────────────────────────────────────────── + # Prefill is compute-bound: benefits from high FLOPS GPUs. + - replicas: 1 + minReplicas: 1 + maxReplicas: 2 + groupName: prefill-group + rayStartParams: {} + template: + spec: + containers: + - name: ray-worker-prefill + image: rayproject/ray-llm:latest-py311-cu128 + lifecycle: + preStop: + exec: + command: ["/bin/sh", "-c", "ray stop"] + resources: + limits: + cpu: "8" + memory: "32G" + nvidia.com/gpu: 1 + requests: + cpu: "6" + memory: "24G" + nvidia.com/gpu: 1 + # Uncomment for node affinity / tolerations: + # tolerations: + # - key: "ray.io/node-type" + # operator: "Equal" + # value: "worker" + # effect: "NoSchedule" + # nodeSelector: + # node.kubernetes.io/instance-type: g5.2xlarge + + # ── Decode workers ─────────────────────────────────────────────── + # Decode is memory-bandwidth-bound: benefits from high HBM GPUs. + - replicas: 1 + minReplicas: 1 + maxReplicas: 4 + groupName: decode-group + rayStartParams: {} + template: + spec: + containers: + - name: ray-worker-decode + image: rayproject/ray-llm:latest-py311-cu128 + lifecycle: + preStop: + exec: + command: ["/bin/sh", "-c", "ray stop"] + resources: + limits: + cpu: "8" + memory: "32G" + nvidia.com/gpu: 1 + requests: + cpu: "6" + memory: "24G" + nvidia.com/gpu: 1 + # Uncomment for node affinity / tolerations: + # tolerations: + # - key: "ray.io/node-type" + # operator: "Equal" + # value: "worker" + # effect: "NoSchedule" + # nodeSelector: + # node.kubernetes.io/instance-type: g5.2xlarge diff --git a/Container-Root/ray/rayservice/rayservice-create.sh b/Container-Root/ray/rayservice/rayservice-create.sh index 019c485..59e861e 100755 --- a/Container-Root/ray/rayservice/rayservice-create.sh +++ b/Container-Root/ray/rayservice/rayservice-create.sh @@ -6,7 +6,7 @@ if [ -z "$1" ]; then echo "" echo "Error: No model name provided." echo "Usage: ./rayservice-create.sh " - echo "Available model names: detr, mobilenet, stable-diffusion" + echo "Available model names: detr, mobilenet, stable-diffusion, disaggregated_prefill_decode" echo "" exit 1 fi