Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,8 +909,7 @@ def _fetch_request():
time.sleep(0.005)

except RuntimeError as e:
if "cannot schedule new futures after shutdown" in str(e):
break
raise e
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 RuntimeError 异常处理行为变更

原代码在收到 "cannot schedule new futures after shutdown" 错误时会优雅地 break 退出循环,现在改为直接 raise e 抛出异常。

疑问点

  1. 这个变更会导致 shutdown 时抛出异常而非静默退出,是否是预期行为?
  2. 此变更与 PR 描述中的 "slot accounting" 修复目标的关联是什么?
  3. 是否会影响服务的正常关闭流程?

请确认这个行为变更是否符合预期。

except Exception as e:
err_msg = "Error happend while insert task to engine: {}, {}.".format(e, str(traceback.format_exc()))
self.llm_logger.error(err_msg)
Expand Down
7 changes: 6 additions & 1 deletion fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,12 @@ def _allocate_decode_and_extend():
if not preempted_reqs:
skip_requests: list[Request] = []
while self.waiting and token_budget > 0:
if len(self.running) == self.max_num_seqs:
if (
len(self.running)
+ len(self.to_be_rescheduled_request_id_set)
+ sum([req.status == RequestStatus.PREEMPTED for req in self.waiting])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 LGTM slot 计数逻辑修复

现在正确地将以下三类请求纳入 slot 占用计算:

  • self.running:正在运行的请求
  • self.to_be_rescheduled_request_id_set:待重新调度的请求
  • PREEMPTED 状态的 waiting 请求:已被抢占等待恢复的请求

同时将 == 改为 >= 是更安全的边界条件检查。

小建议(非阻塞):可考虑将 sum([req.status == ... for req in self.waiting]) 改为生成器表达式 sum(req.status == ... for req in self.waiting) 以节省内存分配。

>= self.max_num_seqs
):
break

request = self.waiting[0]
Expand Down
Loading