From c1722c33877397995d148faa1a54a0ef4274e580 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 8 Apr 2026 22:28:57 +0000 Subject: [PATCH] fix: resolve FixedWindowCallRatePolicy deadlock on 429 without ratelimit-reset header Four interacting bugs caused connectors using FixedWindowCallRatePolicy to deadlock when a 429 response arrived from an API that lacks a ratelimit-reset header: 1. next_reset_ts initialized 10 days in the future instead of now + period 2. get_reset_ts_from_response() never fell back to retry-after header 3. _update_current_window() only advanced by one period instead of catching up 4. _do_acquire() had no upper bound on sleep duration Fixes: - Initialize next_reset_ts to now + period in model_to_component_factory.py - Fall back to retry-after header in get_reset_ts_from_response() - Use while loop in _update_current_window() to advance past all elapsed periods - Cap maximum sleep in _do_acquire() to 600 seconds with a warning log Co-Authored-By: bot_apk --- .../parsers/model_to_component_factory.py | 7 +- airbyte_cdk/sources/streams/call_rate.py | 16 +- unit_tests/sources/streams/test_call_rate.py | 148 ++++++++++++++++++ 3 files changed, 166 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index aefe01364..043239d82 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -4340,11 +4340,10 @@ def create_fixed_window_call_rate_policy( for matcher in model.matchers ] - # Set the initial reset timestamp to 10 days from now. - # This value will be updated by the first request. + period = parse_duration(model.period) return FixedWindowCallRatePolicy( - next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10), - period=parse_duration(model.period), + next_reset_ts=datetime.datetime.now() + period, + period=period, call_limit=model.call_limit, matchers=matchers, ) diff --git a/airbyte_cdk/sources/streams/call_rate.py b/airbyte_cdk/sources/streams/call_rate.py index 4a06db3b2..7964a7b74 100644 --- a/airbyte_cdk/sources/streams/call_rate.py +++ b/airbyte_cdk/sources/streams/call_rate.py @@ -415,7 +415,7 @@ def update( def _update_current_window(self) -> None: now = datetime.datetime.now() - if now > self._next_reset_ts: + while now > self._next_reset_ts: logger.debug("started new window, %s calls available now", self._call_limit) self._next_reset_ts = self._next_reset_ts + self._offset self._calls_num = 0 @@ -646,6 +646,14 @@ def _do_acquire( f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). " f"Sleeping for {time_to_wait} on attempt {attempt}." ) + max_sleep = timedelta(seconds=600) + if time_to_wait > max_sleep: + logger.warning( + "Rate limit wait time %s exceeds maximum of %s. Capping to maximum.", + time_to_wait, + max_sleep, + ) + time_to_wait = max_sleep time.sleep(time_to_wait.total_seconds()) else: logger.debug( @@ -700,6 +708,12 @@ def get_reset_ts_from_response( return datetime.datetime.fromtimestamp( int(response.headers[self._ratelimit_reset_header]) ) + retry_after = response.headers.get("retry-after") + if retry_after is not None: + try: + return datetime.datetime.now() + datetime.timedelta(seconds=int(retry_after)) + except (ValueError, OverflowError): + logger.warning("Could not parse retry-after header value: %s", retry_after) return None def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]: diff --git a/unit_tests/sources/streams/test_call_rate.py b/unit_tests/sources/streams/test_call_rate.py index a423fe573..2f92f599e 100644 --- a/unit_tests/sources/streams/test_call_rate.py +++ b/unit_tests/sources/streams/test_call_rate.py @@ -6,6 +6,7 @@ import time from datetime import datetime, timedelta from typing import Any, Iterable, Mapping, Optional +from unittest.mock import patch import pytest import requests @@ -16,6 +17,7 @@ APIBudget, CallRateLimitHit, FixedWindowCallRatePolicy, + HttpAPIBudget, HttpRequestMatcher, HttpRequestRegexMatcher, MovingWindowCallRatePolicy, @@ -562,3 +564,149 @@ def test_combined_criteria(self): assert not matcher(req_bad_path) assert not matcher(req_bad_param) assert not matcher(req_bad_header) + + +def test_fixed_window_update_current_window_advances_past_multiple_periods(): + """_update_current_window should advance past all elapsed periods, not just one.""" + now = datetime.now() + # Set next_reset_ts to 5 periods in the past + period = timedelta(minutes=1) + past_reset = now - (period * 5) + policy = FixedWindowCallRatePolicy( + next_reset_ts=past_reset, + period=period, + call_limit=10, + matchers=[], + ) + # Trigger window update via try_acquire + policy.try_acquire("request", weight=1) + # After advancing, next_reset_ts should be in the future + assert policy._next_reset_ts > now, ( + "next_reset_ts should have advanced past now after multiple elapsed periods" + ) + + +def test_fixed_window_deadlock_scenario_429_without_ratelimit_reset(): + """Reproduce the deadlock: 429 with no ratelimit-reset header should not cause extreme wait times. + + The original bug chain: + 1. FixedWindowCallRatePolicy created with next_reset_ts = now + 10 days + 2. 429 arrives without ratelimit-reset header + 3. available_calls set to 0, next_reset_ts unchanged + 4. try_acquire raises CallRateLimitHit with time_to_wait ≈ 10 days + """ + now = datetime.now() + period = timedelta(hours=1) + policy = FixedWindowCallRatePolicy( + next_reset_ts=now + period, + period=period, + call_limit=10, + matchers=[], + ) + + budget = HttpAPIBudget( + policies=[policy], + status_codes_for_ratelimit_hit=[429], + ) + + # Simulate a 429 response without ratelimit-reset but with retry-after + mock_response = requests.Response() + mock_response.status_code = 429 + mock_response.headers["retry-after"] = "60" + # No ratelimit-reset header + + mock_request = Request("GET", "http://example.com/api") + budget.update_from_response(mock_request, mock_response) + + # After update, available_calls should be 0 and reset_ts should be ~60s from now + # The policy should NOT have a 10-day wait + with pytest.raises(CallRateLimitHit) as exc_info: + policy.try_acquire("request", weight=1) + + # The wait time should be roughly 1 hour (the period), not 10 days + assert exc_info.value.time_to_wait < timedelta(hours=2), ( + f"Wait time {exc_info.value.time_to_wait} is too large, likely the old 10-day bug" + ) + + +def test_http_api_budget_get_reset_ts_from_retry_after_header(): + """get_reset_ts_from_response should fall back to retry-after when ratelimit-reset is absent.""" + budget = HttpAPIBudget(policies=[]) + + mock_response = requests.Response() + mock_response.status_code = 429 + mock_response.headers["retry-after"] = "120" + + now = datetime.now() + result = budget.get_reset_ts_from_response(mock_response) + assert result is not None + # Should be approximately 120 seconds from now + expected = now + timedelta(seconds=120) + assert abs((result - expected).total_seconds()) < 5, ( + f"Expected reset_ts ~{expected}, got {result}" + ) + + +def test_http_api_budget_get_reset_ts_prefers_ratelimit_reset_over_retry_after(): + """ratelimit-reset header should be preferred over retry-after.""" + budget = HttpAPIBudget(policies=[]) + + mock_response = requests.Response() + mock_response.status_code = 200 + future_ts = int((datetime.now() + timedelta(hours=1)).timestamp()) + mock_response.headers["ratelimit-reset"] = str(future_ts) + mock_response.headers["retry-after"] = "30" + + result = budget.get_reset_ts_from_response(mock_response) + assert result is not None + # Should use ratelimit-reset (1 hour from now), not retry-after (30s) + assert result > datetime.now() + timedelta(minutes=30) + + +def test_http_api_budget_get_reset_ts_invalid_retry_after(): + """Invalid retry-after header value should return None gracefully.""" + budget = HttpAPIBudget(policies=[]) + + mock_response = requests.Response() + mock_response.status_code = 429 + mock_response.headers["retry-after"] = "not-a-number" + + result = budget.get_reset_ts_from_response(mock_response) + assert result is None + + +def test_http_api_budget_get_reset_ts_no_headers(): + """No rate limit headers at all should return None.""" + budget = HttpAPIBudget(policies=[]) + + mock_response = requests.Response() + mock_response.status_code = 429 + + result = budget.get_reset_ts_from_response(mock_response) + assert result is None + + +def test_do_acquire_caps_sleep_duration(): + """_do_acquire should cap sleep time to 600 seconds maximum.""" + # Use call_limit=1 so try_acquire doesn't reject weight=1, then exhaust budget + policy = FixedWindowCallRatePolicy( + next_reset_ts=datetime.now() + timedelta(days=10), + period=timedelta(days=10), + call_limit=1, + matchers=[], + ) + # Exhaust the budget so next call triggers CallRateLimitHit with ~10 day wait + policy.try_acquire("warmup", weight=1) + + budget = APIBudget(policies=[policy], maximum_attempts_to_acquire=2) + + with patch("airbyte_cdk.sources.streams.call_rate.time.sleep") as mock_sleep: + mock_sleep.side_effect = [None, None] + with pytest.raises(CallRateLimitHit): + budget.acquire_call(Request("GET", "http://example.com"), block=True) + + # Sleep should have been called with at most 600 seconds + assert mock_sleep.call_count > 0, "Expected sleep to be called at least once" + for call_args in mock_sleep.call_args_list: + sleep_seconds = call_args[0][0] + assert sleep_seconds <= 600, f"Sleep duration {sleep_seconds}s exceeds the 600s cap"