Skip to content
Open
3 changes: 3 additions & 0 deletions src/vllm_router/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,13 @@ def initialize_all(app: FastAPI, args):
initialize_routing_logic(
args.routing_logic,
session_key=args.session_key,
tolerate_waiting_requests=args.tolerate_waiting_requests,
lmcache_controller_port=args.lmcache_controller_port,
prefill_model_labels=args.prefill_model_labels,
decode_model_labels=args.decode_model_labels,
kv_aware_threshold=args.kv_aware_threshold,
enable_request_logging=args.enable_request_logging,
request_log_dir=args.request_log_dir,
)

# Initialize feature gates
Expand Down
26 changes: 26 additions & 0 deletions src/vllm_router/parsers/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def validate_args(args):
raise ValueError(
"Session key must be provided when using session routing logic."
)
if args.routing_logic == "cache_aware_load_balancing" and args.session_key is None:
raise ValueError(
"Session key must be provided when using cache_aware_load_balancing routing logic."
)
if args.log_stats and args.log_stats_interval <= 0:
raise ValueError("Log stats interval must be greater than 0.")
if args.engine_stats_interval <= 0:
Expand Down Expand Up @@ -180,12 +184,34 @@ def parse_args():
choices=[
"roundrobin",
"session",
"cache_aware_load_balancing",
"kvaware",
"prefixaware",
"disaggregated_prefill",
],
help="The routing logic to use",
)

parser.add_argument(
"--tolerate-waiting-requests",
type=int,
default=10,
help="The number of waiting requests to tolerate in cache-aware load balancing router.",
)

parser.add_argument(
"--enable-request-logging",
action="store_true",
help="Enable request logging, record the routing decision and performance data of each request",
)

parser.add_argument(
"--request-log-dir",
type=str,
default=None,
help="The directory to store the request log file, if provided, the log will be written to this file",
)

parser.add_argument(
"--lmcache-controller-port",
type=int,
Expand Down
150 changes: 150 additions & 0 deletions src/vllm_router/request_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright 2024-2025 The vLLM Production Stack Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import time
from datetime import datetime

from fastapi import Request

from vllm_router.log import init_logger

logger = init_logger(__name__)


class RequestLogger:
"""Request logger for recording routing decisions and performance data"""

def __init__(self):
"""Initialize request logger"""
self.log_enabled = False
self.log_file = None
self.requests_bodies_dir = None
# Dictionary for temporarily storing request arrival times
self.arrival_times = {}

def enable_logging(self, enabled: bool = True, log_dir: str = None):
"""
Enable or disable logging

Args:
enabled: Whether to enable logging
log_dir: Log directory path, if provided write to file, otherwise save to project path
"""
self.log_enabled = enabled
self.log_file = None # Reset log_file

if enabled:
# If log_dir is not specified, use project path
if not log_dir:
log_dir = os.getcwd()
logger.info(
f"No log directory specified, will use current working directory: {log_dir}"
)

try:
# Ensure log directory exists
os.makedirs(log_dir, exist_ok=True)

# Create request body storage directory
self.requests_bodies_dir = os.path.join(log_dir, "request_bodies")
os.makedirs(self.requests_bodies_dir, exist_ok=True)

# Create log file name with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(
log_dir, f"router_requests_logs_{timestamp}.csv"
)

# Create log file and write header
with open(log_file, "w") as f:
f.write(
"timestamp,request_id,conversation_id,arrival_time,routing_method,target_engine,process_time\n"
)

self.log_file = log_file
logger.info(f"Request routing logs will be written to file: {log_file}")
except Exception as e:
logger.error(f"Failed to create log file: {str(e)}")
self.log_file = None

logger.info("Request routing logging has been enabled")

def log_request_routed(
self,
arrival_time: float,
request_id: str,
routing_method: str,
target_engine: str,
session_id: str = None,
process_time: float = None,
):
"""Record request routing decision and timestamp, and write to file (if enabled)"""
if not self.log_enabled or not self.log_file:
return

# Ensure session_id has a value
session_id = session_id or "unknown"

# Write to file
try:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_line = f"{timestamp},{request_id},{session_id},{arrival_time},{routing_method},{target_engine},{process_time}\n"

with open(self.log_file, "a") as f:
f.write(log_line)

except Exception as e:
logger.error(f"Failed to write to log file: {str(e)}")

def log_request_body(self, request_body, request_id=None):
"""
Log request body to a separate file

Args:
request_body: Request body content
request_id: Request ID, if None then try to extract from request body
"""
if not self.log_enabled or not self.requests_bodies_dir:
return

if not request_id:
# Try to extract request_id from request body
try:
body_json = json.loads(request_body)
request_id = body_json.get("id", str(int(time.time())))
except:
request_id = str(int(time.time()))

# Create file name
file_path = os.path.join(self.requests_bodies_dir, f"{request_id}.json")

# Write request body to file
try:
with open(file_path, "wb") as f:
if isinstance(request_body, bytes):
f.write(request_body)
else:
f.write(request_body.encode("utf-8"))
except Exception as e:
logger.error(f"Failed to write request body file: {str(e)}")

def clear_logs(self):
"""Clear temporarily stored arrival times"""
self.arrival_times.clear()


# Create global request logger instance
request_logger = RequestLogger()
22 changes: 22 additions & 0 deletions src/vllm_router/routers/metrics_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
num_prefill_requests,
num_requests_running,
num_requests_swapped,
routing_method_qps,
)
from vllm_router.stats.engine_stats import get_engine_stats_scraper
from vllm_router.stats.request_stats import get_request_stats_monitor
Expand Down Expand Up @@ -99,6 +100,27 @@ async def metrics():
avg_itl.labels(server=server).set(stat.avg_itl)
num_requests_swapped.labels(server=server).set(stat.num_swapped_requests)

# -----------------------------------------------------------------------------

# Routing method QPS
routing_methods_qps = get_request_stats_monitor().get_routing_methods_qps()
# Save all known routing methods, for resetting
known_methods = set()
# Find registered routing method labels
for labels, _ in routing_method_qps._metrics.items():
if labels and len(labels) > 0:
method = labels[0] # Assuming labels is (method,) format
known_methods.add(method)
# Reset QPS of all known but currently inactive routing methods to 0
for method in known_methods:
if method not in routing_methods_qps:
routing_method_qps.labels(method=method).set(0)
# Set QPS of currently active routing methods
for method, qps_value in routing_methods_qps.items():
routing_method_qps.labels(method=method).set(qps_value)

# -----------------------------------------------------------------------------

# Engine statistics (GPU prefix cache metrics)
engine_stats = get_engine_stats_scraper().get_engine_stats()
for server, engine_stat in engine_stats.items():
Expand Down
Loading
Loading