From ed427011b767896c1f4ad8c86fa0bacbc34d3493 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 6 Apr 2026 23:58:40 +0000 Subject: [PATCH 1/4] refactor(http-client): replace nested backoff decorators with explicit retry loop - Add RetryRequestException to exceptions.py extending BaseBackoffException - Rewrite _send_with_retry as explicit while-loop with _compute_backoff helper - Simplify _handle_error_resolution to raise single RetryRequestException - Delete 3 internal backoff handlers from rate_limiting.py (keep default_backoff_handler) - Remove unused imports from http_client.py - Update test_http_client.py to use RetryRequestException - Preserve backward compatibility: all old exception classes remain importable Co-Authored-By: bot_apk --- .../sources/streams/http/exceptions.py | 22 +++ .../sources/streams/http/http_client.py | 130 +++++++++--------- .../sources/streams/http/rate_limiting.py | 102 +------------- .../sources/streams/http/test_http_client.py | 42 +++--- 4 files changed, 108 insertions(+), 188 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/exceptions.py b/airbyte_cdk/sources/streams/http/exceptions.py index 73d2947fa..8139aa5cb 100644 --- a/airbyte_cdk/sources/streams/http/exceptions.py +++ b/airbyte_cdk/sources/streams/http/exceptions.py @@ -69,3 +69,25 @@ class DefaultBackoffException(BaseBackoffException): class RateLimitBackoffException(BaseBackoffException): pass + + +class RetryRequestException(BaseBackoffException): + """Unified retry signal raised by HttpClient when a request should be retried.""" + + def __init__( + self, + request: requests.PreparedRequest, + response: Optional[Union[requests.Response, Exception]], + error_message: str = "", + failure_type: Optional[FailureType] = None, + backoff_time: Optional[float] = None, + retry_endlessly: bool = False, + ): + self.backoff_time = backoff_time + self.retry_endlessly = retry_endlessly + super().__init__( + request=request, + response=response, + error_message=error_message, + failure_type=failure_type, + ) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 8f70b6e35..73aef5145 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -4,6 +4,7 @@ import logging import os +import time import urllib from pathlib import Path from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union @@ -36,20 +37,12 @@ ResponseAction, ) from airbyte_cdk.sources.streams.http.exceptions import ( - BaseBackoffException, - DefaultBackoffException, - RateLimitBackoffException, RequestBodyException, - UserDefinedBackoffException, + RetryRequestException, ) from airbyte_cdk.sources.streams.http.pagination_reset_exception import ( PaginationResetRequiredException, ) -from airbyte_cdk.sources.streams.http.rate_limiting import ( - http_client_default_backoff_handler, - rate_limit_default_backoff_handler, - user_defined_backoff_handler, -) from airbyte_cdk.sources.utils.types import JsonType from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH @@ -258,6 +251,18 @@ def _max_time(self) -> int: else self._DEFAULT_MAX_TIME ) + def _compute_backoff(self, exc: RetryRequestException, attempt: int) -> float: + """Compute the backoff duration in seconds for a retry attempt. + + If the exception carries a user-defined `backoff_time`, that value plus + one second is returned (preserving the legacy +1 s behaviour). Otherwise + an exponential back-off with base 2 and no jitter is used: + ``2 ** attempt`` seconds. + """ + if exc.backoff_time is not None: + return exc.backoff_time + 1 # extra second to cover fractions + return float(2**attempt) + def _send_with_retry( self, request: requests.PreparedRequest, @@ -265,47 +270,57 @@ def _send_with_retry( log_formatter: Optional[Callable[[requests.Response], Any]] = None, exit_on_rate_limit: Optional[bool] = False, ) -> requests.Response: - """ - Sends a request with retry logic. - - Args: - request (requests.PreparedRequest): The prepared HTTP request to send. - request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request. + """Send a request with an explicit retry loop. - Returns: - requests.Response: The HTTP response received from the server after retries. + Replaces the previous three-layer ``backoff`` decorator chain with a + single ``while True`` loop that catches `RetryRequestException`, + computes the appropriate back-off, and sleeps before retrying. """ - - max_retries = self._max_retries - max_tries = max(0, max_retries) + 1 + max_tries = max(0, self._max_retries) + 1 max_time = self._max_time + attempt = 0 + start_time = time.monotonic() - user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)( - self._send - ) - rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries) - backoff_handler = http_client_default_backoff_handler( - max_tries=max_tries, max_time=max_time - ) - # backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted - try: - response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))( - request, - request_kwargs, - log_formatter=log_formatter, - exit_on_rate_limit=exit_on_rate_limit, - ) # type: ignore # mypy can't infer that backoff_handler wraps _send - - return response - except BaseBackoffException as e: - self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True) - raise AirbyteTracedException( - internal_message=f"Exhausted available request attempts. Exception: {e}", - message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}", - failure_type=e.failure_type or FailureType.system_error, - exception=e, - stream_descriptor=StreamDescriptor(name=self._name), - ) + while True: + try: + return self._send( + request, + request_kwargs, + log_formatter=log_formatter, + exit_on_rate_limit=exit_on_rate_limit, + ) + except RetryRequestException as exc: + attempt += 1 + elapsed = time.monotonic() - start_time + + # Determine whether we have exhausted retries. + budget_exhausted = False + if attempt >= max_tries: + budget_exhausted = True + elif elapsed >= max_time: + budget_exhausted = True + + if budget_exhausted: + self._logger.error("Retries exhausted with backoff exception.", exc_info=True) + raise AirbyteTracedException( + internal_message=f"Exhausted available request attempts. Exception: {exc}", + message=f"Exhausted available request attempts. Please see logs for more details. Exception: {exc}", + failure_type=exc.failure_type or FailureType.system_error, + exception=exc, + stream_descriptor=StreamDescriptor(name=self._name), + ) + + backoff_seconds = self._compute_backoff(exc, attempt) + + if exc.response is not None and isinstance(exc.response, requests.Response): + self._logger.info( + f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}" + ) + self._logger.info( + f"Caught retryable error '{exc!s}' after {attempt} tries. " + f"Waiting {backoff_seconds} seconds then retrying..." + ) + time.sleep(backoff_seconds) def _send( self, @@ -503,7 +518,7 @@ def _handle_error_resolution( ResponseAction.RATE_LIMITED, ResponseAction.REFRESH_TOKEN_THEN_RETRY, ): - user_defined_backoff_time = None + user_defined_backoff_time: Optional[float] = None for backoff_strategy in self._backoff_strategies: backoff_time = backoff_strategy.backoff_time( response_or_exception=response if response is not None else exc, @@ -522,28 +537,13 @@ def _handle_error_resolution( and not exit_on_rate_limit ) - if user_defined_backoff_time: - raise UserDefinedBackoffException( - backoff=user_defined_backoff_time, - request=request, - response=(response if response is not None else exc), - error_message=error_message, - failure_type=error_resolution.failure_type, - ) - - elif retry_endlessly: - raise RateLimitBackoffException( - request=request, - response=(response if response is not None else exc), - error_message=error_message, - failure_type=error_resolution.failure_type, - ) - - raise DefaultBackoffException( + raise RetryRequestException( request=request, response=(response if response is not None else exc), error_message=error_message, failure_type=error_resolution.failure_type, + backoff_time=user_defined_backoff_time, + retry_endlessly=retry_endlessly, ) elif response: diff --git a/airbyte_cdk/sources/streams/http/rate_limiting.py b/airbyte_cdk/sources/streams/http/rate_limiting.py index 0137aa504..d0c7e8b89 100644 --- a/airbyte_cdk/sources/streams/http/rate_limiting.py +++ b/airbyte_cdk/sources/streams/http/rate_limiting.py @@ -4,17 +4,12 @@ import logging import sys -import time from typing import Any, Callable, Mapping, Optional import backoff from requests import PreparedRequest, RequestException, Response, codes, exceptions -from .exceptions import ( - DefaultBackoffException, - RateLimitBackoffException, - UserDefinedBackoffException, -) +from .exceptions import DefaultBackoffException TRANSIENT_EXCEPTIONS = ( DefaultBackoffException, @@ -69,98 +64,3 @@ def should_give_up(exc: Exception) -> bool: factor=factor, **kwargs, ) - - -def http_client_default_backoff_handler( - max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any -) -> Callable[[SendRequestCallableType], SendRequestCallableType]: - def log_retry_attempt(details: Mapping[str, Any]) -> None: - _, exc, _ = sys.exc_info() - if isinstance(exc, RequestException) and exc.response: - logger.info( - f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}" - ) - logger.info( - f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." - ) - - def should_give_up(exc: Exception) -> bool: - # If made it here, the ResponseAction was RETRY and therefore should not give up - return False - - return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function - backoff.expo, - TRANSIENT_EXCEPTIONS, - jitter=None, - on_backoff=log_retry_attempt, - giveup=should_give_up, - max_tries=max_tries, - max_time=max_time, - **kwargs, - ) - - -def user_defined_backoff_handler( - max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any -) -> Callable[[SendRequestCallableType], SendRequestCallableType]: - def sleep_on_ratelimit(details: Mapping[str, Any]) -> None: - _, exc, _ = sys.exc_info() - if isinstance(exc, UserDefinedBackoffException): - retry_after = exc.backoff - sleep_time = retry_after + 1 # extra second to cover any fractions of second - if exc.response is not None: - logger.info( - f"UserDefinedBackoffException: Rate limit exceeded (HTTP {exc.response.status_code}). Retrying in {sleep_time} seconds." - ) - else: - logger.info( - f"UserDefinedBackoffException: Rate limit exceeded. Retrying in {sleep_time} seconds." - ) - time.sleep(sleep_time) - - def log_give_up(details: Mapping[str, Any]) -> None: - _, exc, _ = sys.exc_info() - if isinstance(exc, RequestException): - logger.error( - f"Max retry limit reached after {details['elapsed']:.1f}s. Request: {exc.request}, Response: {exc.response}" - ) - else: - logger.error("Max retry limit reached for unknown request and response") - - # Suppress the backoff library's default log that misleadingly reports interval (0s) instead of actual sleep time - kwargs.pop("logger", None) - - return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function - backoff.constant, - UserDefinedBackoffException, - interval=0, # skip waiting, we'll wait in on_backoff handler - on_backoff=sleep_on_ratelimit, - on_giveup=log_give_up, - jitter=None, - max_tries=max_tries, - max_time=max_time, - logger=None, - **kwargs, - ) - - -def rate_limit_default_backoff_handler( - **kwargs: Any, -) -> Callable[[SendRequestCallableType], SendRequestCallableType]: - def log_retry_attempt(details: Mapping[str, Any]) -> None: - _, exc, _ = sys.exc_info() - if isinstance(exc, RequestException) and exc.response: - logger.info( - f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}" - ) - logger.info( - f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." - ) - - return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function - backoff.expo, - RateLimitBackoffException, - jitter=None, - on_backoff=log_retry_attempt, - **kwargs, - ) diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 7dc8054b5..6fb609635 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -20,10 +20,8 @@ ResponseAction, ) from airbyte_cdk.sources.streams.http.exceptions import ( - DefaultBackoffException, - RateLimitBackoffException, RequestBodyException, - UserDefinedBackoffException, + RetryRequestException, ) from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator @@ -265,7 +263,7 @@ def backoff_time(self, *args, **kwargs) -> float: @pytest.mark.parametrize( "backoff_time_value, exception_type", - [(0.1, UserDefinedBackoffException), (None, DefaultBackoffException)], + [(0.1, RetryRequestException), (None, RetryRequestException)], ) def test_raises_backoff_exception_with_retry_response_action( mocker, backoff_time_value, exception_type @@ -306,7 +304,7 @@ def test_raises_backoff_exception_with_retry_response_action( @pytest.mark.parametrize( "backoff_time_value, exception_type", - [(0.1, UserDefinedBackoffException), (None, DefaultBackoffException)], + [(0.1, RetryRequestException), (None, RetryRequestException)], ) def test_raises_backoff_exception_with_response_with_unmapped_error( mocker, backoff_time_value, exception_type @@ -444,7 +442,7 @@ def test_session_request_exception_raises_backoff_exception(): ) prepared_request = requests.PreparedRequest() - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) @@ -693,7 +691,7 @@ def backoff_time(self, *args, **kwargs): @pytest.mark.parametrize( "exit_on_rate_limit, expected_call_count, expected_error", - [[True, 6, DefaultBackoffException], [False, 6, RateLimitBackoffException]], + [[True, 6, RetryRequestException], [False, 6, RetryRequestException]], ) @pytest.mark.usefixtures("mock_sleep") def test_backoff_strategy_endless( @@ -785,21 +783,21 @@ def test_send_request_respects_environment_variables(): @pytest.mark.parametrize( "response_code, expected_failure_type, error_message, exception_class", [ - (400, FailureType.system_error, "test error message", UserDefinedBackoffException), - (401, FailureType.config_error, "test error message", UserDefinedBackoffException), - (403, FailureType.transient_error, "test error message", UserDefinedBackoffException), - (400, FailureType.system_error, "test error message", DefaultBackoffException), - (401, FailureType.config_error, "test error message", DefaultBackoffException), - (403, FailureType.transient_error, "test error message", DefaultBackoffException), - (400, FailureType.system_error, "test error message", RateLimitBackoffException), - (401, FailureType.config_error, "test error message", RateLimitBackoffException), - (403, FailureType.transient_error, "test error message", RateLimitBackoffException), + (400, FailureType.system_error, "test error message", "user_defined"), + (401, FailureType.config_error, "test error message", "user_defined"), + (403, FailureType.transient_error, "test error message", "user_defined"), + (400, FailureType.system_error, "test error message", "default"), + (401, FailureType.config_error, "test error message", "default"), + (403, FailureType.transient_error, "test error message", "default"), + (400, FailureType.system_error, "test error message", "rate_limited"), + (401, FailureType.config_error, "test error message", "rate_limited"), + (403, FailureType.transient_error, "test error message", "rate_limited"), ], ) def test_send_with_retry_raises_airbyte_traced_exception_with_failure_type( response_code, expected_failure_type, error_message, exception_class, requests_mock ): - if exception_class == UserDefinedBackoffException: + if exception_class == "user_defined": class CustomBackoffStrategy: def backoff_time(self, response_or_exception, attempt_count): @@ -807,7 +805,7 @@ def backoff_time(self, response_or_exception, attempt_count): backoff_strategy = CustomBackoffStrategy() response_action = ResponseAction.RETRY - elif exception_class == RateLimitBackoffException: + elif exception_class == "rate_limited": backoff_strategy = None response_action = ResponseAction.RATE_LIMITED else: @@ -884,7 +882,7 @@ def test_refresh_token_then_retry_action_refreshes_oauth_token(mocker): mocked_response.ok = False mocked_session.send.return_value = mocked_response - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) assert mock_authenticator.refresh_called @@ -920,7 +918,7 @@ def test_refresh_token_then_retry_action_without_oauth_authenticator_proceeds_wi mocked_response.ok = False mocked_session.send.return_value = mocked_response - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) mocked_logger.warning.assert_called() @@ -965,7 +963,7 @@ def __call__(self, request): mocked_response.ok = False mocked_session.send.return_value = mocked_response - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) mocked_logger.warning.assert_called() @@ -1004,7 +1002,7 @@ def test_refresh_token_then_retry_action_with_single_use_refresh_token_authentic mocked_response.ok = False mocked_session.send.return_value = mocked_response - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) mock_authenticator.refresh_and_set_access_token.assert_called_once() From 3191bd142034695781ab4d3931e06b68bf127564 Mon Sep 17 00:00:00 2001 From: Sophie Cui Date: Tue, 7 Apr 2026 13:27:55 -0700 Subject: [PATCH 2/4] fix: correct backoff timing, wire retry_endlessly, add backoff cap - Fix exponential backoff sequence: 2^(attempt-1) to match old backoff.expo (1, 2, 4, 8s) instead of 2^attempt (2, 4, 8, 16s) - Wire retry_endlessly into the retry loop so rate-limited requests without exit_on_rate_limit genuinely retry past the budget - Guard retry_endlessly with user_defined_backoff_time is None to preserve the old mutually-exclusive branching (custom backoff always bounded, endless only for rate limits without a strategy) - Add _MAX_BACKOFF_SECONDS (300s) cap on exponential backoff - Split test_backoff_strategy_endless into two tests that verify bounded vs endless behavior independently Co-Authored-By: Claude Opus 4.6 (1M context) --- .../sources/streams/http/http_client.py | 26 +++++---- .../sources/streams/http/test_http_client.py | 55 ++++++++++++++----- 2 files changed, 58 insertions(+), 23 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 73aef5145..6deecb585 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -251,17 +251,20 @@ def _max_time(self) -> int: else self._DEFAULT_MAX_TIME ) + _MAX_BACKOFF_SECONDS: float = 300 # 5-minute ceiling for exponential backoff + def _compute_backoff(self, exc: RetryRequestException, attempt: int) -> float: """Compute the backoff duration in seconds for a retry attempt. - If the exception carries a user-defined `backoff_time`, that value plus + If the exception carries a user-defined ``backoff_time``, that value plus one second is returned (preserving the legacy +1 s behaviour). Otherwise - an exponential back-off with base 2 and no jitter is used: - ``2 ** attempt`` seconds. + an exponential back-off of ``2 ** (attempt - 1)`` seconds is used (matching + the previous ``backoff.expo`` with base=2, factor=1), capped at + ``_MAX_BACKOFF_SECONDS``. """ if exc.backoff_time is not None: return exc.backoff_time + 1 # extra second to cover fractions - return float(2**attempt) + return min(float(2 ** (attempt - 1)), self._MAX_BACKOFF_SECONDS) def _send_with_retry( self, @@ -293,12 +296,11 @@ def _send_with_retry( attempt += 1 elapsed = time.monotonic() - start_time - # Determine whether we have exhausted retries. - budget_exhausted = False - if attempt >= max_tries: - budget_exhausted = True - elif elapsed >= max_time: - budget_exhausted = True + # Rate-limited requests retry indefinitely unless exit_on_rate_limit was set. + # All other retryable errors are bounded by max_tries / max_time. + budget_exhausted = not exc.retry_endlessly and ( + attempt >= max_tries or elapsed >= max_time + ) if budget_exhausted: self._logger.error("Retries exhausted with backoff exception.", exc_info=True) @@ -532,9 +534,13 @@ def _handle_error_resolution( or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}." ) + # Only retry endlessly when rate-limited AND no custom backoff strategy matched. + # When a strategy provides a specific backoff_time, retries are always bounded + # by max_tries/max_time (matching the old mutually-exclusive branching). retry_endlessly = ( error_resolution.response_action == ResponseAction.RATE_LIMITED and not exit_on_rate_limit + and user_defined_backoff_time is None ) raise RetryRequestException( diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 6fb609635..7fd22d21a 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -689,14 +689,9 @@ def backoff_time(self, *args, **kwargs): assert len(trace_messages) == mocked_send.call_count -@pytest.mark.parametrize( - "exit_on_rate_limit, expected_call_count, expected_error", - [[True, 6, RetryRequestException], [False, 6, RetryRequestException]], -) @pytest.mark.usefixtures("mock_sleep") -def test_backoff_strategy_endless( - exit_on_rate_limit: bool, expected_call_count: int, expected_error: Exception -): +def test_backoff_strategy_rate_limited_with_exit_on_rate_limit(): + """When exit_on_rate_limit=True, 429 responses exhaust max_tries then raise.""" http_client = HttpClient( name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock()) ) @@ -705,18 +700,47 @@ def test_backoff_strategy_endless( mocked_response.status_code = 429 mocked_response.headers = {} mocked_response.ok = False - session_send = MagicMock(spec=requests.Session.send) - session_send.return_value = mocked_response with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send: - with pytest.raises(AirbyteTracedException) as e: + with pytest.raises(AirbyteTracedException): http_client.send_request( http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={}, - exit_on_rate_limit=exit_on_rate_limit, + exit_on_rate_limit=True, ) - assert mocked_send.call_count == expected_call_count + assert mocked_send.call_count == 6 # 1 initial + 5 retries + + +@pytest.mark.usefixtures("mock_sleep") +def test_backoff_strategy_rate_limited_retries_endlessly(): + """When exit_on_rate_limit=False, 429 responses retry past max_tries until success.""" + http_client = HttpClient( + name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock()) + ) + + rate_limited_response = MagicMock(spec=requests.Response) + rate_limited_response.status_code = 429 + rate_limited_response.headers = {} + rate_limited_response.ok = False + + success_response = MagicMock(spec=requests.Response) + success_response.status_code = 200 + success_response.headers = {} + success_response.ok = True + + # Fail 10 times (well past max_tries=6), then succeed + side_effects = [rate_limited_response] * 10 + [success_response] + + with patch.object(requests.Session, "send", side_effect=side_effects) as mocked_send: + _, response = http_client.send_request( + http_method="get", + url="https://test_base_url.com/v1/endpoint", + request_kwargs={}, + exit_on_rate_limit=False, + ) + assert response.status_code == 200 + assert mocked_send.call_count == 11 # 10 rate-limited + 1 success def test_given_different_headers_then_response_is_not_cached(requests_mock): @@ -834,7 +858,12 @@ def backoff_time(self, response_or_exception, attempt_count): ) with pytest.raises(AirbyteTracedException) as e: - http_client.send_request(http_method="get", url="https://airbyte.io/", request_kwargs={}) + http_client.send_request( + http_method="get", + url="https://airbyte.io/", + request_kwargs={}, + exit_on_rate_limit=True, # ensure rate-limited retries are bounded so the test terminates + ) assert e.value.failure_type == expected_failure_type From 241f621b7af52fac2fa4a1b322451d0c14aa707c Mon Sep 17 00:00:00 2001 From: Sophie Cui Date: Tue, 7 Apr 2026 13:35:38 -0700 Subject: [PATCH 3/4] fix: inline backoff computation, remove backoff cap Drop _compute_backoff helper and inline the logic directly in the retry loop. Remove the 300s backoff cap to preserve the original unbounded exponential behavior. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../sources/streams/http/http_client.py | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 6deecb585..7ba581d34 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -251,21 +251,6 @@ def _max_time(self) -> int: else self._DEFAULT_MAX_TIME ) - _MAX_BACKOFF_SECONDS: float = 300 # 5-minute ceiling for exponential backoff - - def _compute_backoff(self, exc: RetryRequestException, attempt: int) -> float: - """Compute the backoff duration in seconds for a retry attempt. - - If the exception carries a user-defined ``backoff_time``, that value plus - one second is returned (preserving the legacy +1 s behaviour). Otherwise - an exponential back-off of ``2 ** (attempt - 1)`` seconds is used (matching - the previous ``backoff.expo`` with base=2, factor=1), capped at - ``_MAX_BACKOFF_SECONDS``. - """ - if exc.backoff_time is not None: - return exc.backoff_time + 1 # extra second to cover fractions - return min(float(2 ** (attempt - 1)), self._MAX_BACKOFF_SECONDS) - def _send_with_retry( self, request: requests.PreparedRequest, @@ -312,7 +297,8 @@ def _send_with_retry( stream_descriptor=StreamDescriptor(name=self._name), ) - backoff_seconds = self._compute_backoff(exc, attempt) + # User-defined backoff gets +1s to cover fractions; otherwise exponential 2^(n-1) + backoff_seconds = exc.backoff_time + 1 if exc.backoff_time is not None else float(2 ** (attempt - 1)) if exc.response is not None and isinstance(exc.response, requests.Response): self._logger.info( From 9fa863196e56e3cf198500f2dcfc6cc144b86e2c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 13 Apr 2026 21:16:00 +0000 Subject: [PATCH 4/4] fix: address Copilot review - format backoff line, rename exception_class to retry_scenario Co-Authored-By: bot_apk --- airbyte_cdk/sources/streams/http/http_client.py | 6 +++++- unit_tests/sources/streams/http/test_http_client.py | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 7ba581d34..a7fd556d4 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -298,7 +298,11 @@ def _send_with_retry( ) # User-defined backoff gets +1s to cover fractions; otherwise exponential 2^(n-1) - backoff_seconds = exc.backoff_time + 1 if exc.backoff_time is not None else float(2 ** (attempt - 1)) + backoff_seconds = ( + exc.backoff_time + 1 + if exc.backoff_time is not None + else float(2 ** (attempt - 1)) + ) if exc.response is not None and isinstance(exc.response, requests.Response): self._logger.info( diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 7fd22d21a..bf100ad5a 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -805,7 +805,7 @@ def test_send_request_respects_environment_variables(): @pytest.mark.usefixtures("mock_sleep") @pytest.mark.parametrize( - "response_code, expected_failure_type, error_message, exception_class", + "response_code, expected_failure_type, error_message, retry_scenario", [ (400, FailureType.system_error, "test error message", "user_defined"), (401, FailureType.config_error, "test error message", "user_defined"), @@ -819,9 +819,9 @@ def test_send_request_respects_environment_variables(): ], ) def test_send_with_retry_raises_airbyte_traced_exception_with_failure_type( - response_code, expected_failure_type, error_message, exception_class, requests_mock + response_code, expected_failure_type, error_message, retry_scenario, requests_mock ): - if exception_class == "user_defined": + if retry_scenario == "user_defined": class CustomBackoffStrategy: def backoff_time(self, response_or_exception, attempt_count): @@ -829,7 +829,7 @@ def backoff_time(self, response_or_exception, attempt_count): backoff_strategy = CustomBackoffStrategy() response_action = ResponseAction.RETRY - elif exception_class == "rate_limited": + elif retry_scenario == "rate_limited": backoff_strategy = None response_action = ResponseAction.RATE_LIMITED else: