Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4340,11 +4340,10 @@ def create_fixed_window_call_rate_policy(
for matcher in model.matchers
]

# Set the initial reset timestamp to 10 days from now.
# This value will be updated by the first request.
period = parse_duration(model.period)
return FixedWindowCallRatePolicy(
next_reset_ts=datetime.datetime.now() + datetime.timedelta(days=10),
period=parse_duration(model.period),
next_reset_ts=datetime.datetime.now() + period,
period=period,
call_limit=model.call_limit,
matchers=matchers,
)
Expand Down
16 changes: 15 additions & 1 deletion airbyte_cdk/sources/streams/call_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ def update(

def _update_current_window(self) -> None:
now = datetime.datetime.now()
if now > self._next_reset_ts:
while now > self._next_reset_ts:
logger.debug("started new window, %s calls available now", self._call_limit)
self._next_reset_ts = self._next_reset_ts + self._offset
self._calls_num = 0
Expand Down Expand Up @@ -646,6 +646,14 @@ def _do_acquire(
f"Policy {policy} reached call limit for endpoint {endpoint} ({exc.rate}). "
f"Sleeping for {time_to_wait} on attempt {attempt}."
)
max_sleep = timedelta(seconds=600)
if time_to_wait > max_sleep:
logger.warning(
"Rate limit wait time %s exceeds maximum of %s. Capping to maximum.",
time_to_wait,
max_sleep,
)
time_to_wait = max_sleep
time.sleep(time_to_wait.total_seconds())
else:
logger.debug(
Expand Down Expand Up @@ -700,6 +708,12 @@ def get_reset_ts_from_response(
return datetime.datetime.fromtimestamp(
int(response.headers[self._ratelimit_reset_header])
)
retry_after = response.headers.get("retry-after")
if retry_after is not None:
try:
return datetime.datetime.now() + datetime.timedelta(seconds=int(retry_after))
except (ValueError, OverflowError):
logger.warning("Could not parse retry-after header value: %s", retry_after)
return None

def get_calls_left_from_response(self, response: requests.Response) -> Optional[int]:
Expand Down
148 changes: 148 additions & 0 deletions unit_tests/sources/streams/test_call_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
from datetime import datetime, timedelta
from typing import Any, Iterable, Mapping, Optional
from unittest.mock import patch

import pytest
import requests
Expand All @@ -16,6 +17,7 @@
APIBudget,
CallRateLimitHit,
FixedWindowCallRatePolicy,
HttpAPIBudget,
HttpRequestMatcher,
HttpRequestRegexMatcher,
MovingWindowCallRatePolicy,
Expand Down Expand Up @@ -562,3 +564,149 @@ def test_combined_criteria(self):
assert not matcher(req_bad_path)
assert not matcher(req_bad_param)
assert not matcher(req_bad_header)


def test_fixed_window_update_current_window_advances_past_multiple_periods():
"""_update_current_window should advance past all elapsed periods, not just one."""
now = datetime.now()
# Set next_reset_ts to 5 periods in the past
period = timedelta(minutes=1)
past_reset = now - (period * 5)
policy = FixedWindowCallRatePolicy(
next_reset_ts=past_reset,
period=period,
call_limit=10,
matchers=[],
)
# Trigger window update via try_acquire
policy.try_acquire("request", weight=1)
# After advancing, next_reset_ts should be in the future
assert policy._next_reset_ts > now, (
"next_reset_ts should have advanced past now after multiple elapsed periods"
)


def test_fixed_window_deadlock_scenario_429_without_ratelimit_reset():
"""Reproduce the deadlock: 429 with no ratelimit-reset header should not cause extreme wait times.

The original bug chain:
1. FixedWindowCallRatePolicy created with next_reset_ts = now + 10 days
2. 429 arrives without ratelimit-reset header
3. available_calls set to 0, next_reset_ts unchanged
4. try_acquire raises CallRateLimitHit with time_to_wait ≈ 10 days
"""
now = datetime.now()
period = timedelta(hours=1)
policy = FixedWindowCallRatePolicy(
next_reset_ts=now + period,
period=period,
call_limit=10,
matchers=[],
)

budget = HttpAPIBudget(
policies=[policy],
status_codes_for_ratelimit_hit=[429],
)

# Simulate a 429 response without ratelimit-reset but with retry-after
mock_response = requests.Response()
mock_response.status_code = 429
mock_response.headers["retry-after"] = "60"
# No ratelimit-reset header

mock_request = Request("GET", "http://example.com/api")
budget.update_from_response(mock_request, mock_response)

# After update, available_calls should be 0 and reset_ts should be ~60s from now
# The policy should NOT have a 10-day wait
with pytest.raises(CallRateLimitHit) as exc_info:
policy.try_acquire("request", weight=1)

# The wait time should be roughly 1 hour (the period), not 10 days
assert exc_info.value.time_to_wait < timedelta(hours=2), (
f"Wait time {exc_info.value.time_to_wait} is too large, likely the old 10-day bug"
)


def test_http_api_budget_get_reset_ts_from_retry_after_header():
"""get_reset_ts_from_response should fall back to retry-after when ratelimit-reset is absent."""
budget = HttpAPIBudget(policies=[])

mock_response = requests.Response()
mock_response.status_code = 429
mock_response.headers["retry-after"] = "120"

now = datetime.now()
result = budget.get_reset_ts_from_response(mock_response)
assert result is not None
# Should be approximately 120 seconds from now
expected = now + timedelta(seconds=120)
assert abs((result - expected).total_seconds()) < 5, (
f"Expected reset_ts ~{expected}, got {result}"
)


def test_http_api_budget_get_reset_ts_prefers_ratelimit_reset_over_retry_after():
"""ratelimit-reset header should be preferred over retry-after."""
budget = HttpAPIBudget(policies=[])

mock_response = requests.Response()
mock_response.status_code = 200
future_ts = int((datetime.now() + timedelta(hours=1)).timestamp())
mock_response.headers["ratelimit-reset"] = str(future_ts)
mock_response.headers["retry-after"] = "30"

result = budget.get_reset_ts_from_response(mock_response)
assert result is not None
# Should use ratelimit-reset (1 hour from now), not retry-after (30s)
assert result > datetime.now() + timedelta(minutes=30)


def test_http_api_budget_get_reset_ts_invalid_retry_after():
"""Invalid retry-after header value should return None gracefully."""
budget = HttpAPIBudget(policies=[])

mock_response = requests.Response()
mock_response.status_code = 429
mock_response.headers["retry-after"] = "not-a-number"

result = budget.get_reset_ts_from_response(mock_response)
assert result is None


def test_http_api_budget_get_reset_ts_no_headers():
"""No rate limit headers at all should return None."""
budget = HttpAPIBudget(policies=[])

mock_response = requests.Response()
mock_response.status_code = 429

result = budget.get_reset_ts_from_response(mock_response)
assert result is None


def test_do_acquire_caps_sleep_duration():
"""_do_acquire should cap sleep time to 600 seconds maximum."""
# Use call_limit=1 so try_acquire doesn't reject weight=1, then exhaust budget
policy = FixedWindowCallRatePolicy(
next_reset_ts=datetime.now() + timedelta(days=10),
period=timedelta(days=10),
call_limit=1,
matchers=[],
)
# Exhaust the budget so next call triggers CallRateLimitHit with ~10 day wait
policy.try_acquire("warmup", weight=1)

budget = APIBudget(policies=[policy], maximum_attempts_to_acquire=2)

with patch("airbyte_cdk.sources.streams.call_rate.time.sleep") as mock_sleep:
mock_sleep.side_effect = [None, None]
with pytest.raises(CallRateLimitHit):
budget.acquire_call(Request("GET", "http://example.com"), block=True)

# Sleep should have been called with at most 600 seconds
assert mock_sleep.call_count > 0, "Expected sleep to be called at least once"
for call_args in mock_sleep.call_args_list:
sleep_seconds = call_args[0][0]
assert sleep_seconds <= 600, f"Sleep duration {sleep_seconds}s exceeds the 600s cap"
Loading