Skip to content

auto set schedule way.#1235

Merged
hiworldwzj merged 13 commits intomainfrom
wzj_fix
Mar 18, 2026
Merged

auto set schedule way.#1235
hiworldwzj merged 13 commits intomainfrom
wzj_fix

Conversation

@hiworldwzj
Copy link
Collaborator

@hiworldwzj hiworldwzj commented Mar 17, 2026

pd 分离分段 + 自动启动高级调度 + 自动休整 max_new_tokens 为正常值。

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refines the request scheduling and resource management within the system. It introduces an automatic mechanism for setting the router_token_ratio and replaces a static request output length parameter with a dynamic, EMA-based estimation. Furthermore, it implements a request splitting strategy for PD separation mode to enhance memory efficiency and throughput when handling long generation requests.

Highlights

  • Dynamic Scheduling Parameter: The router_token_ratio parameter can now be automatically configured based on the run_mode if not explicitly set, enhancing ease of use and system adaptability.
  • Removed Fixed Output Length Parameter: The --router_max_new_token_len argument and its internal usage have been removed, streamlining the configuration and relying on more dynamic estimations.
  • Exponential Moving Average (EMA) for Request Output Length: A new RouterStatics class tracks an Exponential Moving Average (EMA) of request output lengths (ema_req_out_len), which dynamically informs scheduling decisions, replacing the previously fixed router_max_new_token_len.
  • Request Splitting in PD Separation Mode: Requests with large max_new_tokens in PD separation mode are now automatically split into smaller blocks. This prevents excessive memory reservation and aims to improve overall system throughput by optimizing resource utilization.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • docs/CN/source/tutorial/api_server_args.rst
    • Removed documentation for the --router_max_new_token_len option.
  • docs/EN/source/tutorial/api_server_args.rst
    • Removed documentation for the --router_max_new_token_len option.
  • lightllm/server/api_cli.py
    • Modified the --router_token_ratio argument to default to None and updated its help message to describe automatic determination.
    • Removed the --router_max_new_token_len argument.
  • lightllm/server/api_start.py
    • Added logic to automatically set args.router_token_ratio based on the run_mode if it is None.
    • Relocated and updated the assertion related to diverse_mode and router_token_ratio.
  • lightllm/server/core/objs/req.py
    • Updated the get_tuple_tokens method signature and its implementations to use ema_req_out_len instead of router_max_new_token_len.
  • lightllm/server/core/objs/start_args_type.py
    • Removed the router_max_new_token_len field from the StartArgs class.
  • lightllm/server/httpserver_for_pd_master/manager.py
    • Imported get_pd_split_max_new_tokens.
    • Implemented request splitting logic for max_new_tokens within the generate method, specifically for PD separation scenarios.
    • Added a new private method _split_max_new_tokens to handle token splitting.
  • lightllm/server/router/batch.py
    • Imported RouterStatics.
    • Modified filter_out_finished_req to update router_statics with candetoken_out_len for non-aborted requests.
  • lightllm/server/router/manager.py
    • Imported RouterStatics.
    • Initialized self.router_statics in the __init__ method.
    • Added logging for router_statics within the loop_for_fwd method.
    • Modified _filter_reqs_from_running_batch to pass the router_statics object.
  • lightllm/server/router/req_queue/base_queue.py
    • Removed the self.router_max_new_token_len attribute.
  • lightllm/server/router/req_queue/chunked_prefill/beam_impl.py
    • Updated calls to get_tuple_tokens to use self.router.router_statics.ema_req_out_len.
  • lightllm/server/router/req_queue/chunked_prefill/impl.py
    • Updated calls to get_tuple_tokens to use self.router.router_statics.ema_req_out_len.
  • lightllm/server/router/req_queue/chunked_prefill/impl_for_nixl_pd.py
    • Updated calls to get_tuple_tokens to use self.router.router_statics.ema_req_out_len.
  • lightllm/server/router/req_queue/chunked_prefill/impl_for_pd_decode.py
    • Updated calls to get_tuple_tokens to use self.router.router_statics.ema_req_out_len.
  • lightllm/server/router/stats.py
    • Refactored the Stats class into RouterStatics to specifically track ema_req_out_len and busy_token_used_ratio.
    • Removed token counting and throughput logging logic.
  • lightllm/utils/envs_utils.py
    • Added a new function get_pd_split_max_new_tokens to retrieve a configurable maximum new tokens value for PD splitting.
Activity
  • No specific activity has been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the scheduling logic by making it more dynamic. It removes the fixed router_max_new_token_len parameter and introduces a new RouterStatics class to calculate an exponential moving average of request output lengths (ema_req_out_len). This allows the scheduler to adapt to the actual workload. Additionally, it introduces request chunking in the pd_master to handle long generation requests more efficiently by splitting them into smaller blocks.

My review focuses on a critical bug in the exception handling logic for the new request chunking feature, which could lead to runtime errors. I've also suggested a minor refactoring for code clarity. The rest of the changes for dynamic scheduling appear to be well-implemented and consistent across the codebase.

Comment on lines 107 to 166
try:
sampling_params.group_request_id = group_request_id
# 记录请求到达的相关信息
await self._log_req_header(request, group_request_id)
await self._log_req_header(request, origin_group_request_id)
# 监控
self.metric_client.counter_inc("lightllm_request_count")
self.metric_client.histogram_observe("lightllm_request_max_new_tokens", sampling_params.max_new_tokens)
self.metric_client.histogram_observe(
"lightllm_request_max_new_tokens", origin_sampling_params.max_new_tokens
)

p_node, d_node = await self.select_p_d_node(prompt, origin_sampling_params, multimodal_params)

p_node, d_node = await self.select_p_d_node(prompt, sampling_params, multimodal_params)
history_gen_token_strs = []

if not p_node or not d_node:
logger.error(f"{group_request_id}: No p_node or d_node found")
raise Exception(f"{group_request_id}: No p_node or d_node found")

results_generator = self._wait_to_token_package(
p_node,
d_node,
start_time,
prompt,
sampling_params,
multimodal_params,
request,
)
async for sub_req_id, request_output, metadata, finish_status in results_generator:
yield sub_req_id, request_output, metadata, finish_status
logger.error(f"{origin_group_request_id}: No p_node or d_node found")
raise Exception(f"{origin_group_request_id}: No p_node or d_node found")

for iter_index, block_max_new_tokens in enumerate(max_new_tokens_list):
sampling_params = SamplingParams.from_buffer_copy(origin_sampling_params)
block_group_request_id = self.id_gen.generate_id()
sampling_params.group_request_id = block_group_request_id
logger.info(
f"pd log gen sub req id {block_group_request_id}" f" for main req id {origin_group_request_id}"
)
sampling_params.max_new_tokens = block_max_new_tokens

results_generator = self._wait_to_token_package(
p_node,
d_node,
start_time,
prompt + "".join(history_gen_token_strs),
sampling_params,
multimodal_params,
request,
)
is_last_block = iter_index == len(max_new_tokens_list) - 1
prompt_tokens = sys.maxsize # 因为分段的原因
async for sub_req_id, request_output, metadata, finish_status in results_generator:
# pd 分离模式下,返回的 metadata 可能序号信息可能存在不准确性。
assert sub_req_id == block_group_request_id
if finish_status.get_finish_reason() == "length" and (not is_last_block):
finish_status = FinishStatus() # 转换为NoFinished
history_gen_token_strs.append(request_output)
prompt_tokens = min(prompt_tokens, metadata["prompt_tokens"])
metadata["prompt_tokens"] = prompt_tokens
yield origin_group_request_id, request_output, metadata, finish_status

await self.remove_req(group_request_id=block_group_request_id)

except BaseException as e:
logger.error(f"has exception {str(e)}")
try:
await self.abort(group_request_id, p_node=p_node, d_node=d_node)
await self.abort(block_group_request_id, p_node=p_node, d_node=d_node)
except:
await self.abort(group_request_id)
await self.abort(block_group_request_id)
raise e

finally:
await self.remove_req(group_request_id)
await self.remove_req(block_group_request_id)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The exception and finalization logic in the generate method has a couple of critical issues that could lead to runtime errors:

  1. Potential NameError: block_group_request_id is defined inside the for loop. If an exception occurs before the loop starts (e.g., in select_p_d_node), the except and finally blocks will raise a NameError when trying to access block_group_request_id.
  2. Double remove_req call: On successful completion of all chunks, remove_req is called for the last chunk at the end of the for loop (line 154), and then called again in the finally block. This will likely cause an error (e.g., KeyError) as remove_req is probably not idempotent.

To fix this, I suggest initializing block_group_request_id = None before the try block and ensuring cleanup logic handles all edge cases correctly without causing errors. For instance, the finally block could be removed and cleanup handled within the except block for failures, while the success path is already handled inside the loop.

Comment on lines +503 to +506
ans_list = [block_max_new_tokens for _ in range(max_new_tokens // block_max_new_tokens)]
left_token = max_new_tokens - (max_new_tokens // block_max_new_tokens) * block_max_new_tokens
if left_token > 0:
ans_list.append(left_token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation can be made more concise and Pythonic by using the modulo operator (%) for the remainder and list multiplication for creating the list of blocks.

Suggested change
ans_list = [block_max_new_tokens for _ in range(max_new_tokens // block_max_new_tokens)]
left_token = max_new_tokens - (max_new_tokens // block_max_new_tokens) * block_max_new_tokens
if left_token > 0:
ans_list.append(left_token)
ans_list = [block_max_new_tokens] * (max_new_tokens // block_max_new_tokens)
remainder = max_new_tokens % block_max_new_tokens
if remainder > 0:
ans_list.append(remainder)

@hiworldwzj hiworldwzj merged commit ff4eaa5 into main Mar 18, 2026
1 check passed
@hiworldwzj hiworldwzj deleted the wzj_fix branch March 18, 2026 07:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant