Skip to content

Commit b707112

Browse files
Expand broken-executor and shutdown retry regression matrix
Co-authored-by: Shri Sukhani <shrisukhani@users.noreply.github.com>
1 parent 7f903ca commit b707112

File tree

1 file changed

+206
-0
lines changed

1 file changed

+206
-0
lines changed

tests/test_polling.py

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,28 @@ def get_status() -> str:
242242
assert attempts["count"] == 1
243243

244244

245+
def test_poll_until_terminal_status_does_not_retry_executor_shutdown_runtime_errors():
246+
attempts = {"count": 0}
247+
248+
def get_status() -> str:
249+
attempts["count"] += 1
250+
raise RuntimeError("cannot schedule new futures after shutdown")
251+
252+
with pytest.raises(
253+
RuntimeError, match="cannot schedule new futures after shutdown"
254+
):
255+
poll_until_terminal_status(
256+
operation_name="sync poll executor-shutdown passthrough",
257+
get_status=get_status,
258+
is_terminal_status=lambda value: value == "completed",
259+
poll_interval_seconds=0.0001,
260+
max_wait_seconds=1.0,
261+
max_status_failures=5,
262+
)
263+
264+
assert attempts["count"] == 1
265+
266+
245267
def test_poll_until_terminal_status_retries_rate_limit_errors():
246268
attempts = {"count": 0}
247269

@@ -1185,6 +1207,29 @@ async def operation() -> str:
11851207
asyncio.run(run())
11861208

11871209

1210+
def test_retry_operation_async_does_not_retry_executor_shutdown_runtime_errors():
1211+
async def run() -> None:
1212+
attempts = {"count": 0}
1213+
1214+
async def operation() -> str:
1215+
attempts["count"] += 1
1216+
raise RuntimeError("cannot schedule new futures after shutdown")
1217+
1218+
with pytest.raises(
1219+
RuntimeError, match="cannot schedule new futures after shutdown"
1220+
):
1221+
await retry_operation_async(
1222+
operation_name="async retry executor-shutdown passthrough",
1223+
operation=operation,
1224+
max_attempts=5,
1225+
retry_delay_seconds=0.0001,
1226+
)
1227+
1228+
assert attempts["count"] == 1
1229+
1230+
asyncio.run(run())
1231+
1232+
11881233
def test_retry_operation_async_retries_server_errors():
11891234
async def run() -> None:
11901235
attempts = {"count": 0}
@@ -2279,6 +2324,30 @@ def get_next_page(page: int) -> dict:
22792324
assert attempts["count"] == 1
22802325

22812326

2327+
def test_collect_paginated_results_does_not_retry_executor_shutdown_runtime_errors():
2328+
attempts = {"count": 0}
2329+
2330+
def get_next_page(page: int) -> dict:
2331+
attempts["count"] += 1
2332+
raise RuntimeError("cannot schedule new futures after shutdown")
2333+
2334+
with pytest.raises(
2335+
RuntimeError, match="cannot schedule new futures after shutdown"
2336+
):
2337+
collect_paginated_results(
2338+
operation_name="sync paginated executor-shutdown passthrough",
2339+
get_next_page=get_next_page,
2340+
get_current_page_batch=lambda response: response["current"],
2341+
get_total_page_batches=lambda response: response["total"],
2342+
on_page_success=lambda response: None,
2343+
max_wait_seconds=1.0,
2344+
max_attempts=5,
2345+
retry_delay_seconds=0.0001,
2346+
)
2347+
2348+
assert attempts["count"] == 1
2349+
2350+
22822351
def test_collect_paginated_results_retries_server_errors():
22832352
attempts = {"count": 0}
22842353
collected = []
@@ -2670,6 +2739,33 @@ async def get_next_page(page: int) -> dict:
26702739
asyncio.run(run())
26712740

26722741

2742+
def test_collect_paginated_results_async_does_not_retry_executor_shutdown_runtime_errors():
2743+
async def run() -> None:
2744+
attempts = {"count": 0}
2745+
2746+
async def get_next_page(page: int) -> dict:
2747+
attempts["count"] += 1
2748+
raise RuntimeError("cannot schedule new futures after shutdown")
2749+
2750+
with pytest.raises(
2751+
RuntimeError, match="cannot schedule new futures after shutdown"
2752+
):
2753+
await collect_paginated_results_async(
2754+
operation_name="async paginated executor-shutdown passthrough",
2755+
get_next_page=get_next_page,
2756+
get_current_page_batch=lambda response: response["current"],
2757+
get_total_page_batches=lambda response: response["total"],
2758+
on_page_success=lambda response: None,
2759+
max_wait_seconds=1.0,
2760+
max_attempts=5,
2761+
retry_delay_seconds=0.0001,
2762+
)
2763+
2764+
assert attempts["count"] == 1
2765+
2766+
asyncio.run(run())
2767+
2768+
26732769
def test_collect_paginated_results_async_retries_server_errors():
26742770
async def run() -> None:
26752771
attempts = {"count": 0}
@@ -2889,6 +2985,35 @@ def fetch_result() -> dict:
28892985
assert fetch_attempts["count"] == 0
28902986

28912987

2988+
def test_wait_for_job_result_does_not_retry_broken_executor_status_errors():
2989+
status_attempts = {"count": 0}
2990+
fetch_attempts = {"count": 0}
2991+
2992+
def get_status() -> str:
2993+
status_attempts["count"] += 1
2994+
raise ConcurrentBrokenExecutor("executor is broken")
2995+
2996+
def fetch_result() -> dict:
2997+
fetch_attempts["count"] += 1
2998+
return {"ok": True}
2999+
3000+
with pytest.raises(ConcurrentBrokenExecutor, match="executor is broken"):
3001+
wait_for_job_result(
3002+
operation_name="sync wait helper status broken-executor",
3003+
get_status=get_status,
3004+
is_terminal_status=lambda value: value == "completed",
3005+
fetch_result=fetch_result,
3006+
poll_interval_seconds=0.0001,
3007+
max_wait_seconds=1.0,
3008+
max_status_failures=5,
3009+
fetch_max_attempts=5,
3010+
fetch_retry_delay_seconds=0.0001,
3011+
)
3012+
3013+
assert status_attempts["count"] == 1
3014+
assert fetch_attempts["count"] == 0
3015+
3016+
28923017
def test_wait_for_job_result_does_not_retry_timeout_status_errors():
28933018
status_attempts = {"count": 0}
28943019
fetch_attempts = {"count": 0}
@@ -3098,6 +3223,29 @@ def fetch_result() -> dict:
30983223
assert fetch_attempts["count"] == 1
30993224

31003225

3226+
def test_wait_for_job_result_does_not_retry_broken_executor_fetch_errors():
3227+
fetch_attempts = {"count": 0}
3228+
3229+
def fetch_result() -> dict:
3230+
fetch_attempts["count"] += 1
3231+
raise ConcurrentBrokenExecutor("executor is broken")
3232+
3233+
with pytest.raises(ConcurrentBrokenExecutor, match="executor is broken"):
3234+
wait_for_job_result(
3235+
operation_name="sync wait helper fetch broken-executor",
3236+
get_status=lambda: "completed",
3237+
is_terminal_status=lambda value: value == "completed",
3238+
fetch_result=fetch_result,
3239+
poll_interval_seconds=0.0001,
3240+
max_wait_seconds=1.0,
3241+
max_status_failures=5,
3242+
fetch_max_attempts=5,
3243+
fetch_retry_delay_seconds=0.0001,
3244+
)
3245+
3246+
assert fetch_attempts["count"] == 1
3247+
3248+
31013249
def test_wait_for_job_result_does_not_retry_timeout_fetch_errors():
31023250
fetch_attempts = {"count": 0}
31033251

@@ -3388,6 +3536,38 @@ async def fetch_result() -> dict:
33883536
asyncio.run(run())
33893537

33903538

3539+
def test_wait_for_job_result_async_does_not_retry_broken_executor_status_errors():
3540+
async def run() -> None:
3541+
status_attempts = {"count": 0}
3542+
fetch_attempts = {"count": 0}
3543+
3544+
async def get_status() -> str:
3545+
status_attempts["count"] += 1
3546+
raise ConcurrentBrokenExecutor("executor is broken")
3547+
3548+
async def fetch_result() -> dict:
3549+
fetch_attempts["count"] += 1
3550+
return {"ok": True}
3551+
3552+
with pytest.raises(ConcurrentBrokenExecutor, match="executor is broken"):
3553+
await wait_for_job_result_async(
3554+
operation_name="async wait helper status broken-executor",
3555+
get_status=get_status,
3556+
is_terminal_status=lambda value: value == "completed",
3557+
fetch_result=fetch_result,
3558+
poll_interval_seconds=0.0001,
3559+
max_wait_seconds=1.0,
3560+
max_status_failures=5,
3561+
fetch_max_attempts=5,
3562+
fetch_retry_delay_seconds=0.0001,
3563+
)
3564+
3565+
assert status_attempts["count"] == 1
3566+
assert fetch_attempts["count"] == 0
3567+
3568+
asyncio.run(run())
3569+
3570+
33913571
def test_wait_for_job_result_async_does_not_retry_timeout_status_errors():
33923572
async def run() -> None:
33933573
status_attempts = {"count": 0}
@@ -3615,6 +3795,32 @@ async def fetch_result() -> dict:
36153795
asyncio.run(run())
36163796

36173797

3798+
def test_wait_for_job_result_async_does_not_retry_broken_executor_fetch_errors():
3799+
async def run() -> None:
3800+
fetch_attempts = {"count": 0}
3801+
3802+
async def fetch_result() -> dict:
3803+
fetch_attempts["count"] += 1
3804+
raise ConcurrentBrokenExecutor("executor is broken")
3805+
3806+
with pytest.raises(ConcurrentBrokenExecutor, match="executor is broken"):
3807+
await wait_for_job_result_async(
3808+
operation_name="async wait helper fetch broken-executor",
3809+
get_status=lambda: asyncio.sleep(0, result="completed"),
3810+
is_terminal_status=lambda value: value == "completed",
3811+
fetch_result=fetch_result,
3812+
poll_interval_seconds=0.0001,
3813+
max_wait_seconds=1.0,
3814+
max_status_failures=5,
3815+
fetch_max_attempts=5,
3816+
fetch_retry_delay_seconds=0.0001,
3817+
)
3818+
3819+
assert fetch_attempts["count"] == 1
3820+
3821+
asyncio.run(run())
3822+
3823+
36183824
def test_wait_for_job_result_async_does_not_retry_timeout_fetch_errors():
36193825
async def run() -> None:
36203826
fetch_attempts = {"count": 0}

0 commit comments

Comments
 (0)