Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ void SpeculateSaveWithOutputMsg(const paddle::Tensor& accept_tokens,
int msg_queue_id,
int save_each_rank,
bool skip_prefill) {
// printf("enter save output");
if (!save_each_rank && rank_id > 0) {
// NOTE(yaohuicong): Skip non-zero TP ranks — they share identical sampling
// outputs, so only rank 0 needs to send results to the message queue.
if (rank_id > 0) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 save_each_rank 参数的语义已被移除(从 if (!save_each_rank && rank_id > 0) 改为 if (rank_id > 0)),这意味着无论 save_each_rank 的值如何,只有 rank 0 会发送结果到消息队列。

潜在影响:如果 speculate 场景支持 EP(Expert Parallelism)模式,在 EP + TP 混合模式下,不同 EP rank 的输出是不同的,移除此检查会导致非 rank 0 的输出丢失。

建议

  1. 确认 speculate decoding 场景下是否支持 EP 模式
  2. 如果不支持 EP,建议在注释中明确说明此修改仅适用于 TP 模式
  3. 如果支持 EP,需要考虑是否应该保留 save_each_rank 检查

另外,XPU 版本的 speculate_save_output.cccustom_ops/xpu_ops/src/ops/mtp/speculate_save_output.cc)仍然保留原始逻辑,建议考虑是否需要同步修改以保持跨硬件一致性。

return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ void SpeculateSaveOutMmsgTopK(const paddle::Tensor& sampled_token_ids,
int message_flag, // Target: 3, Draft: 4
int64_t rank_id,
bool save_each_rank) {
if (!save_each_rank && rank_id > 0) {
// NOTE(yaohuicong): Skip non-zero TP ranks — they share identical sampling
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上,save_each_rank 参数的语义已被移除。如果此函数在 EP 模式下被调用,可能会导致数据丢失。

// outputs, so only rank 0 needs to send results to the message queue.
if (rank_id > 0) {
return;
}

Expand Down
4 changes: 1 addition & 3 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,7 @@ def _predict_next_launch_token_num(self) -> int:
is_block_step_cpu = self.share_inputs["is_block_step_cpu"].numpy()
next_real_bsz = (seq_lens_this_time_cpu > 0).sum().item() + (is_block_step_cpu > 0).sum().item()
token_num_one_step = (self.speculative_config.num_speculative_tokens + 1) if self.speculative_decoding else 1
next_launch_token_num = (
seq_lens_this_time_cpu.sum().item() + is_block_step_cpu.sum().item() * token_num_one_step
)
next_launch_token_num = next_real_bsz * token_num_one_step
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 此简化看起来是合理的,因为在 speculate decoding 场景下,每个 sequence 在每一步处理的 token 数量应该是固定的(token_num_one_step)。

不过,建议添加注释说明原始计算方式的问题,例如:

# In MTP (Multi-Token Prediction) mode, each sequence processes a fixed number of
# tokens per step (num_speculative_tokens + 1), so we can simplify the calculation
# from seq_lens.sum() + is_block_step.sum() * token_num_one_step to
# next_real_bsz * token_num_one_step.

return next_launch_token_num, next_real_bsz

def only_prefill(self):
Expand Down
Loading