Skip to content

Commit 4a93a62

Browse files
authored
fix(api-nodes): add separate retry budget for 429 rate limit responses (Comfy-Org#12421)
1 parent 66c1852 commit 4a93a62

4 files changed

Lines changed: 176 additions & 180 deletions

File tree

comfy_api_nodes/util/client.py

Lines changed: 104 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class _RequestConfig:
5757
files: dict[str, Any] | list[tuple[str, Any]] | None
5858
multipart_parser: Callable | None
5959
max_retries: int
60+
max_retries_on_rate_limit: int
6061
retry_delay: float
6162
retry_backoff: float
6263
wait_label: str = "Waiting"
@@ -65,6 +66,7 @@ class _RequestConfig:
6566
final_label_on_success: str | None = "Completed"
6667
progress_origin_ts: float | None = None
6768
price_extractor: Callable[[dict[str, Any]], float | None] | None = None
69+
is_rate_limited: Callable[[int, Any], bool] | None = None
6870

6971

7072
@dataclass
@@ -78,7 +80,7 @@ class _PollUIState:
7880
active_since: float | None = None # start time of current active interval (None if queued)
7981

8082

81-
_RETRY_STATUS = {408, 429, 500, 502, 503, 504}
83+
_RETRY_STATUS = {408, 500, 502, 503, 504} # status 429 is handled separately
8284
COMPLETED_STATUSES = ["succeeded", "succeed", "success", "completed", "finished", "done", "complete"]
8385
FAILED_STATUSES = ["cancelled", "canceled", "canceling", "fail", "failed", "error"]
8486
QUEUED_STATUSES = ["created", "queued", "queueing", "submitted", "initializing"]
@@ -103,6 +105,8 @@ async def sync_op(
103105
final_label_on_success: str | None = "Completed",
104106
progress_origin_ts: float | None = None,
105107
monitor_progress: bool = True,
108+
max_retries_on_rate_limit: int = 16,
109+
is_rate_limited: Callable[[int, Any], bool] | None = None,
106110
) -> M:
107111
raw = await sync_op_raw(
108112
cls,
@@ -122,6 +126,8 @@ async def sync_op(
122126
final_label_on_success=final_label_on_success,
123127
progress_origin_ts=progress_origin_ts,
124128
monitor_progress=monitor_progress,
129+
max_retries_on_rate_limit=max_retries_on_rate_limit,
130+
is_rate_limited=is_rate_limited,
125131
)
126132
if not isinstance(raw, dict):
127133
raise Exception("Expected JSON response to validate into a Pydantic model, got non-JSON (binary or text).")
@@ -194,6 +200,8 @@ async def sync_op_raw(
194200
final_label_on_success: str | None = "Completed",
195201
progress_origin_ts: float | None = None,
196202
monitor_progress: bool = True,
203+
max_retries_on_rate_limit: int = 16,
204+
is_rate_limited: Callable[[int, Any], bool] | None = None,
197205
) -> dict[str, Any] | bytes:
198206
"""
199207
Make a single network request.
@@ -222,6 +230,8 @@ async def sync_op_raw(
222230
final_label_on_success=final_label_on_success,
223231
progress_origin_ts=progress_origin_ts,
224232
price_extractor=price_extractor,
233+
max_retries_on_rate_limit=max_retries_on_rate_limit,
234+
is_rate_limited=is_rate_limited,
225235
)
226236
return await _request_base(cfg, expect_binary=as_binary)
227237

@@ -506,7 +516,7 @@ def _friendly_http_message(status: int, body: Any) -> str:
506516
if status == 409:
507517
return "There is a problem with your account. Please contact support@comfy.org."
508518
if status == 429:
509-
return "Rate Limit Exceeded: Please try again later."
519+
return "Rate Limit Exceeded: The server returned 429 after all retry attempts. Please wait and try again."
510520
try:
511521
if isinstance(body, dict):
512522
err = body.get("error")
@@ -586,6 +596,8 @@ async def _monitor(stop_evt: asyncio.Event, start_ts: float):
586596
start_time = cfg.progress_origin_ts if cfg.progress_origin_ts is not None else time.monotonic()
587597
attempt = 0
588598
delay = cfg.retry_delay
599+
rate_limit_attempts = 0
600+
rate_limit_delay = cfg.retry_delay
589601
operation_succeeded: bool = False
590602
final_elapsed_seconds: int | None = None
591603
extracted_price: float | None = None
@@ -653,17 +665,14 @@ async def _monitor(stop_evt: asyncio.Event, start_ts: float):
653665
payload_headers["Content-Type"] = "application/json"
654666
payload_kw["json"] = cfg.data or {}
655667

656-
try:
657-
request_logger.log_request_response(
658-
operation_id=operation_id,
659-
request_method=method,
660-
request_url=url,
661-
request_headers=dict(payload_headers) if payload_headers else None,
662-
request_params=dict(params) if params else None,
663-
request_data=request_body_log,
664-
)
665-
except Exception as _log_e:
666-
logging.debug("[DEBUG] request logging failed: %s", _log_e)
668+
request_logger.log_request_response(
669+
operation_id=operation_id,
670+
request_method=method,
671+
request_url=url,
672+
request_headers=dict(payload_headers) if payload_headers else None,
673+
request_params=dict(params) if params else None,
674+
request_data=request_body_log,
675+
)
667676

668677
req_coro = sess.request(method, url, params=params, **payload_kw)
669678
req_task = asyncio.create_task(req_coro)
@@ -688,52 +697,61 @@ async def _monitor(stop_evt: asyncio.Event, start_ts: float):
688697
body = await resp.json()
689698
except (ContentTypeError, json.JSONDecodeError):
690699
body = await resp.text()
691-
if resp.status in _RETRY_STATUS and attempt <= cfg.max_retries:
700+
should_retry = False
701+
wait_time = 0.0
702+
retry_label = ""
703+
is_rl = resp.status == 429 or (
704+
cfg.is_rate_limited is not None and cfg.is_rate_limited(resp.status, body)
705+
)
706+
if is_rl and rate_limit_attempts < cfg.max_retries_on_rate_limit:
707+
rate_limit_attempts += 1
708+
wait_time = min(rate_limit_delay, 30.0)
709+
rate_limit_delay *= cfg.retry_backoff
710+
retry_label = f"rate-limit retry {rate_limit_attempts} of {cfg.max_retries_on_rate_limit}"
711+
should_retry = True
712+
elif resp.status in _RETRY_STATUS and (attempt - rate_limit_attempts) <= cfg.max_retries:
713+
wait_time = delay
714+
delay *= cfg.retry_backoff
715+
retry_label = f"retry {attempt - rate_limit_attempts} of {cfg.max_retries}"
716+
should_retry = True
717+
718+
if should_retry:
692719
logging.warning(
693-
"HTTP %s %s -> %s. Retrying in %.2fs (retry %d of %d).",
720+
"HTTP %s %s -> %s. Waiting %.2fs (%s).",
694721
method,
695722
url,
696723
resp.status,
697-
delay,
698-
attempt,
699-
cfg.max_retries,
724+
wait_time,
725+
retry_label,
726+
)
727+
request_logger.log_request_response(
728+
operation_id=operation_id,
729+
request_method=method,
730+
request_url=url,
731+
response_status_code=resp.status,
732+
response_headers=dict(resp.headers),
733+
response_content=body,
734+
error_message=f"HTTP {resp.status} ({retry_label}, will retry in {wait_time:.1f}s)",
700735
)
701-
try:
702-
request_logger.log_request_response(
703-
operation_id=operation_id,
704-
request_method=method,
705-
request_url=url,
706-
response_status_code=resp.status,
707-
response_headers=dict(resp.headers),
708-
response_content=body,
709-
error_message=_friendly_http_message(resp.status, body),
710-
)
711-
except Exception as _log_e:
712-
logging.debug("[DEBUG] response logging failed: %s", _log_e)
713-
714736
await sleep_with_interrupt(
715-
delay,
737+
wait_time,
716738
cfg.node_cls,
717739
cfg.wait_label if cfg.monitor_progress else None,
718740
start_time if cfg.monitor_progress else None,
719741
cfg.estimated_total,
720742
display_callback=_display_time_progress if cfg.monitor_progress else None,
721743
)
722-
delay *= cfg.retry_backoff
723744
continue
724745
msg = _friendly_http_message(resp.status, body)
725-
try:
726-
request_logger.log_request_response(
727-
operation_id=operation_id,
728-
request_method=method,
729-
request_url=url,
730-
response_status_code=resp.status,
731-
response_headers=dict(resp.headers),
732-
response_content=body,
733-
error_message=msg,
734-
)
735-
except Exception as _log_e:
736-
logging.debug("[DEBUG] response logging failed: %s", _log_e)
746+
request_logger.log_request_response(
747+
operation_id=operation_id,
748+
request_method=method,
749+
request_url=url,
750+
response_status_code=resp.status,
751+
response_headers=dict(resp.headers),
752+
response_content=body,
753+
error_message=msg,
754+
)
737755
raise Exception(msg)
738756

739757
if expect_binary:
@@ -753,17 +771,14 @@ async def _monitor(stop_evt: asyncio.Event, start_ts: float):
753771
bytes_payload = bytes(buff)
754772
operation_succeeded = True
755773
final_elapsed_seconds = int(time.monotonic() - start_time)
756-
try:
757-
request_logger.log_request_response(
758-
operation_id=operation_id,
759-
request_method=method,
760-
request_url=url,
761-
response_status_code=resp.status,
762-
response_headers=dict(resp.headers),
763-
response_content=bytes_payload,
764-
)
765-
except Exception as _log_e:
766-
logging.debug("[DEBUG] response logging failed: %s", _log_e)
774+
request_logger.log_request_response(
775+
operation_id=operation_id,
776+
request_method=method,
777+
request_url=url,
778+
response_status_code=resp.status,
779+
response_headers=dict(resp.headers),
780+
response_content=bytes_payload,
781+
)
767782
return bytes_payload
768783
else:
769784
try:
@@ -780,45 +795,39 @@ async def _monitor(stop_evt: asyncio.Event, start_ts: float):
780795
extracted_price = cfg.price_extractor(payload) if cfg.price_extractor else None
781796
operation_succeeded = True
782797
final_elapsed_seconds = int(time.monotonic() - start_time)
783-
try:
784-
request_logger.log_request_response(
785-
operation_id=operation_id,
786-
request_method=method,
787-
request_url=url,
788-
response_status_code=resp.status,
789-
response_headers=dict(resp.headers),
790-
response_content=response_content_to_log,
791-
)
792-
except Exception as _log_e:
793-
logging.debug("[DEBUG] response logging failed: %s", _log_e)
798+
request_logger.log_request_response(
799+
operation_id=operation_id,
800+
request_method=method,
801+
request_url=url,
802+
response_status_code=resp.status,
803+
response_headers=dict(resp.headers),
804+
response_content=response_content_to_log,
805+
)
794806
return payload
795807

796808
except ProcessingInterrupted:
797809
logging.debug("Polling was interrupted by user")
798810
raise
799811
except (ClientError, OSError) as e:
800-
if attempt <= cfg.max_retries:
812+
if (attempt - rate_limit_attempts) <= cfg.max_retries:
801813
logging.warning(
802814
"Connection error calling %s %s. Retrying in %.2fs (%d/%d): %s",
803815
method,
804816
url,
805817
delay,
806-
attempt,
818+
attempt - rate_limit_attempts,
807819
cfg.max_retries,
808820
str(e),
809821
)
810-
try:
811-
request_logger.log_request_response(
812-
operation_id=operation_id,
813-
request_method=method,
814-
request_url=url,
815-
request_headers=dict(payload_headers) if payload_headers else None,
816-
request_params=dict(params) if params else None,
817-
request_data=request_body_log,
818-
error_message=f"{type(e).__name__}: {str(e)} (will retry)",
819-
)
820-
except Exception as _log_e:
821-
logging.debug("[DEBUG] request error logging failed: %s", _log_e)
822+
request_logger.log_request_response(
823+
operation_id=operation_id,
824+
request_method=method,
825+
request_url=url,
826+
request_headers=dict(payload_headers) if payload_headers else None,
827+
request_params=dict(params) if params else None,
828+
request_data=request_body_log,
829+
error_message=f"{type(e).__name__}: {str(e)} (will retry)",
830+
)
822831
await sleep_with_interrupt(
823832
delay,
824833
cfg.node_cls,
@@ -831,34 +840,28 @@ async def _monitor(stop_evt: asyncio.Event, start_ts: float):
831840
continue
832841
diag = await _diagnose_connectivity()
833842
if not diag["internet_accessible"]:
834-
try:
835-
request_logger.log_request_response(
836-
operation_id=operation_id,
837-
request_method=method,
838-
request_url=url,
839-
request_headers=dict(payload_headers) if payload_headers else None,
840-
request_params=dict(params) if params else None,
841-
request_data=request_body_log,
842-
error_message=f"LocalNetworkError: {str(e)}",
843-
)
844-
except Exception as _log_e:
845-
logging.debug("[DEBUG] final error logging failed: %s", _log_e)
846-
raise LocalNetworkError(
847-
"Unable to connect to the API server due to local network issues. "
848-
"Please check your internet connection and try again."
849-
) from e
850-
try:
851843
request_logger.log_request_response(
852844
operation_id=operation_id,
853845
request_method=method,
854846
request_url=url,
855847
request_headers=dict(payload_headers) if payload_headers else None,
856848
request_params=dict(params) if params else None,
857849
request_data=request_body_log,
858-
error_message=f"ApiServerError: {str(e)}",
850+
error_message=f"LocalNetworkError: {str(e)}",
859851
)
860-
except Exception as _log_e:
861-
logging.debug("[DEBUG] final error logging failed: %s", _log_e)
852+
raise LocalNetworkError(
853+
"Unable to connect to the API server due to local network issues. "
854+
"Please check your internet connection and try again."
855+
) from e
856+
request_logger.log_request_response(
857+
operation_id=operation_id,
858+
request_method=method,
859+
request_url=url,
860+
request_headers=dict(payload_headers) if payload_headers else None,
861+
request_params=dict(params) if params else None,
862+
request_data=request_body_log,
863+
error_message=f"ApiServerError: {str(e)}",
864+
)
862865
raise ApiServerError(
863866
f"The API server at {default_base_url()} is currently unreachable. "
864867
f"The service may be experiencing issues."

comfy_api_nodes/util/download_helpers.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -167,27 +167,25 @@ async def _monitor():
167167
with contextlib.suppress(Exception):
168168
dest.seek(0)
169169

170-
with contextlib.suppress(Exception):
171-
request_logger.log_request_response(
172-
operation_id=op_id,
173-
request_method="GET",
174-
request_url=url,
175-
response_status_code=resp.status,
176-
response_headers=dict(resp.headers),
177-
response_content=f"[streamed {written} bytes to dest]",
178-
)
170+
request_logger.log_request_response(
171+
operation_id=op_id,
172+
request_method="GET",
173+
request_url=url,
174+
response_status_code=resp.status,
175+
response_headers=dict(resp.headers),
176+
response_content=f"[streamed {written} bytes to dest]",
177+
)
179178
return
180179
except asyncio.CancelledError:
181180
raise ProcessingInterrupted("Task cancelled") from None
182181
except (ClientError, OSError) as e:
183182
if attempt <= max_retries:
184-
with contextlib.suppress(Exception):
185-
request_logger.log_request_response(
186-
operation_id=op_id,
187-
request_method="GET",
188-
request_url=url,
189-
error_message=f"{type(e).__name__}: {str(e)} (will retry)",
190-
)
183+
request_logger.log_request_response(
184+
operation_id=op_id,
185+
request_method="GET",
186+
request_url=url,
187+
error_message=f"{type(e).__name__}: {str(e)} (will retry)",
188+
)
191189
await sleep_with_interrupt(delay, cls, None, None, None)
192190
delay *= retry_backoff
193191
continue

0 commit comments

Comments
 (0)