diff --git a/airbyte_cdk/sources/streams/http/exceptions.py b/airbyte_cdk/sources/streams/http/exceptions.py index 73d2947fa..8139aa5cb 100644 --- a/airbyte_cdk/sources/streams/http/exceptions.py +++ b/airbyte_cdk/sources/streams/http/exceptions.py @@ -69,3 +69,25 @@ class DefaultBackoffException(BaseBackoffException): class RateLimitBackoffException(BaseBackoffException): pass + + +class RetryRequestException(BaseBackoffException): + """Unified retry signal raised by HttpClient when a request should be retried.""" + + def __init__( + self, + request: requests.PreparedRequest, + response: Optional[Union[requests.Response, Exception]], + error_message: str = "", + failure_type: Optional[FailureType] = None, + backoff_time: Optional[float] = None, + retry_endlessly: bool = False, + ): + self.backoff_time = backoff_time + self.retry_endlessly = retry_endlessly + super().__init__( + request=request, + response=response, + error_message=error_message, + failure_type=failure_type, + ) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 8f70b6e35..a7fd556d4 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -4,6 +4,7 @@ import logging import os +import time import urllib from pathlib import Path from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union @@ -36,20 +37,12 @@ ResponseAction, ) from airbyte_cdk.sources.streams.http.exceptions import ( - BaseBackoffException, - DefaultBackoffException, - RateLimitBackoffException, RequestBodyException, - UserDefinedBackoffException, + RetryRequestException, ) from airbyte_cdk.sources.streams.http.pagination_reset_exception import ( PaginationResetRequiredException, ) -from airbyte_cdk.sources.streams.http.rate_limiting import ( - http_client_default_backoff_handler, - rate_limit_default_backoff_handler, - user_defined_backoff_handler, -) from airbyte_cdk.sources.utils.types import JsonType from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets from airbyte_cdk.utils.constants import ENV_REQUEST_CACHE_PATH @@ -265,47 +258,61 @@ def _send_with_retry( log_formatter: Optional[Callable[[requests.Response], Any]] = None, exit_on_rate_limit: Optional[bool] = False, ) -> requests.Response: + """Send a request with an explicit retry loop. + + Replaces the previous three-layer ``backoff`` decorator chain with a + single ``while True`` loop that catches `RetryRequestException`, + computes the appropriate back-off, and sleeps before retrying. """ - Sends a request with retry logic. + max_tries = max(0, self._max_retries) + 1 + max_time = self._max_time + attempt = 0 + start_time = time.monotonic() - Args: - request (requests.PreparedRequest): The prepared HTTP request to send. - request_kwargs (Mapping[str, Any]): Additional keyword arguments for the request. + while True: + try: + return self._send( + request, + request_kwargs, + log_formatter=log_formatter, + exit_on_rate_limit=exit_on_rate_limit, + ) + except RetryRequestException as exc: + attempt += 1 + elapsed = time.monotonic() - start_time + + # Rate-limited requests retry indefinitely unless exit_on_rate_limit was set. + # All other retryable errors are bounded by max_tries / max_time. + budget_exhausted = not exc.retry_endlessly and ( + attempt >= max_tries or elapsed >= max_time + ) - Returns: - requests.Response: The HTTP response received from the server after retries. - """ + if budget_exhausted: + self._logger.error("Retries exhausted with backoff exception.", exc_info=True) + raise AirbyteTracedException( + internal_message=f"Exhausted available request attempts. Exception: {exc}", + message=f"Exhausted available request attempts. Please see logs for more details. Exception: {exc}", + failure_type=exc.failure_type or FailureType.system_error, + exception=exc, + stream_descriptor=StreamDescriptor(name=self._name), + ) - max_retries = self._max_retries - max_tries = max(0, max_retries) + 1 - max_time = self._max_time + # User-defined backoff gets +1s to cover fractions; otherwise exponential 2^(n-1) + backoff_seconds = ( + exc.backoff_time + 1 + if exc.backoff_time is not None + else float(2 ** (attempt - 1)) + ) - user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)( - self._send - ) - rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries) - backoff_handler = http_client_default_backoff_handler( - max_tries=max_tries, max_time=max_time - ) - # backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted - try: - response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))( - request, - request_kwargs, - log_formatter=log_formatter, - exit_on_rate_limit=exit_on_rate_limit, - ) # type: ignore # mypy can't infer that backoff_handler wraps _send - - return response - except BaseBackoffException as e: - self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True) - raise AirbyteTracedException( - internal_message=f"Exhausted available request attempts. Exception: {e}", - message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}", - failure_type=e.failure_type or FailureType.system_error, - exception=e, - stream_descriptor=StreamDescriptor(name=self._name), - ) + if exc.response is not None and isinstance(exc.response, requests.Response): + self._logger.info( + f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}" + ) + self._logger.info( + f"Caught retryable error '{exc!s}' after {attempt} tries. " + f"Waiting {backoff_seconds} seconds then retrying..." + ) + time.sleep(backoff_seconds) def _send( self, @@ -503,7 +510,7 @@ def _handle_error_resolution( ResponseAction.RATE_LIMITED, ResponseAction.REFRESH_TOKEN_THEN_RETRY, ): - user_defined_backoff_time = None + user_defined_backoff_time: Optional[float] = None for backoff_strategy in self._backoff_strategies: backoff_time = backoff_strategy.backoff_time( response_or_exception=response if response is not None else exc, @@ -517,33 +524,22 @@ def _handle_error_resolution( or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}." ) + # Only retry endlessly when rate-limited AND no custom backoff strategy matched. + # When a strategy provides a specific backoff_time, retries are always bounded + # by max_tries/max_time (matching the old mutually-exclusive branching). retry_endlessly = ( error_resolution.response_action == ResponseAction.RATE_LIMITED and not exit_on_rate_limit + and user_defined_backoff_time is None ) - if user_defined_backoff_time: - raise UserDefinedBackoffException( - backoff=user_defined_backoff_time, - request=request, - response=(response if response is not None else exc), - error_message=error_message, - failure_type=error_resolution.failure_type, - ) - - elif retry_endlessly: - raise RateLimitBackoffException( - request=request, - response=(response if response is not None else exc), - error_message=error_message, - failure_type=error_resolution.failure_type, - ) - - raise DefaultBackoffException( + raise RetryRequestException( request=request, response=(response if response is not None else exc), error_message=error_message, failure_type=error_resolution.failure_type, + backoff_time=user_defined_backoff_time, + retry_endlessly=retry_endlessly, ) elif response: diff --git a/airbyte_cdk/sources/streams/http/rate_limiting.py b/airbyte_cdk/sources/streams/http/rate_limiting.py index 0137aa504..d0c7e8b89 100644 --- a/airbyte_cdk/sources/streams/http/rate_limiting.py +++ b/airbyte_cdk/sources/streams/http/rate_limiting.py @@ -4,17 +4,12 @@ import logging import sys -import time from typing import Any, Callable, Mapping, Optional import backoff from requests import PreparedRequest, RequestException, Response, codes, exceptions -from .exceptions import ( - DefaultBackoffException, - RateLimitBackoffException, - UserDefinedBackoffException, -) +from .exceptions import DefaultBackoffException TRANSIENT_EXCEPTIONS = ( DefaultBackoffException, @@ -69,98 +64,3 @@ def should_give_up(exc: Exception) -> bool: factor=factor, **kwargs, ) - - -def http_client_default_backoff_handler( - max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any -) -> Callable[[SendRequestCallableType], SendRequestCallableType]: - def log_retry_attempt(details: Mapping[str, Any]) -> None: - _, exc, _ = sys.exc_info() - if isinstance(exc, RequestException) and exc.response: - logger.info( - f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}" - ) - logger.info( - f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." - ) - - def should_give_up(exc: Exception) -> bool: - # If made it here, the ResponseAction was RETRY and therefore should not give up - return False - - return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function - backoff.expo, - TRANSIENT_EXCEPTIONS, - jitter=None, - on_backoff=log_retry_attempt, - giveup=should_give_up, - max_tries=max_tries, - max_time=max_time, - **kwargs, - ) - - -def user_defined_backoff_handler( - max_tries: Optional[int], max_time: Optional[int] = None, **kwargs: Any -) -> Callable[[SendRequestCallableType], SendRequestCallableType]: - def sleep_on_ratelimit(details: Mapping[str, Any]) -> None: - _, exc, _ = sys.exc_info() - if isinstance(exc, UserDefinedBackoffException): - retry_after = exc.backoff - sleep_time = retry_after + 1 # extra second to cover any fractions of second - if exc.response is not None: - logger.info( - f"UserDefinedBackoffException: Rate limit exceeded (HTTP {exc.response.status_code}). Retrying in {sleep_time} seconds." - ) - else: - logger.info( - f"UserDefinedBackoffException: Rate limit exceeded. Retrying in {sleep_time} seconds." - ) - time.sleep(sleep_time) - - def log_give_up(details: Mapping[str, Any]) -> None: - _, exc, _ = sys.exc_info() - if isinstance(exc, RequestException): - logger.error( - f"Max retry limit reached after {details['elapsed']:.1f}s. Request: {exc.request}, Response: {exc.response}" - ) - else: - logger.error("Max retry limit reached for unknown request and response") - - # Suppress the backoff library's default log that misleadingly reports interval (0s) instead of actual sleep time - kwargs.pop("logger", None) - - return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function - backoff.constant, - UserDefinedBackoffException, - interval=0, # skip waiting, we'll wait in on_backoff handler - on_backoff=sleep_on_ratelimit, - on_giveup=log_give_up, - jitter=None, - max_tries=max_tries, - max_time=max_time, - logger=None, - **kwargs, - ) - - -def rate_limit_default_backoff_handler( - **kwargs: Any, -) -> Callable[[SendRequestCallableType], SendRequestCallableType]: - def log_retry_attempt(details: Mapping[str, Any]) -> None: - _, exc, _ = sys.exc_info() - if isinstance(exc, RequestException) and exc.response: - logger.info( - f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}" - ) - logger.info( - f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..." - ) - - return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function - backoff.expo, - RateLimitBackoffException, - jitter=None, - on_backoff=log_retry_attempt, - **kwargs, - ) diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 7dc8054b5..bf100ad5a 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -20,10 +20,8 @@ ResponseAction, ) from airbyte_cdk.sources.streams.http.exceptions import ( - DefaultBackoffException, - RateLimitBackoffException, RequestBodyException, - UserDefinedBackoffException, + RetryRequestException, ) from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator @@ -265,7 +263,7 @@ def backoff_time(self, *args, **kwargs) -> float: @pytest.mark.parametrize( "backoff_time_value, exception_type", - [(0.1, UserDefinedBackoffException), (None, DefaultBackoffException)], + [(0.1, RetryRequestException), (None, RetryRequestException)], ) def test_raises_backoff_exception_with_retry_response_action( mocker, backoff_time_value, exception_type @@ -306,7 +304,7 @@ def test_raises_backoff_exception_with_retry_response_action( @pytest.mark.parametrize( "backoff_time_value, exception_type", - [(0.1, UserDefinedBackoffException), (None, DefaultBackoffException)], + [(0.1, RetryRequestException), (None, RetryRequestException)], ) def test_raises_backoff_exception_with_response_with_unmapped_error( mocker, backoff_time_value, exception_type @@ -444,7 +442,7 @@ def test_session_request_exception_raises_backoff_exception(): ) prepared_request = requests.PreparedRequest() - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) @@ -691,14 +689,9 @@ def backoff_time(self, *args, **kwargs): assert len(trace_messages) == mocked_send.call_count -@pytest.mark.parametrize( - "exit_on_rate_limit, expected_call_count, expected_error", - [[True, 6, DefaultBackoffException], [False, 6, RateLimitBackoffException]], -) @pytest.mark.usefixtures("mock_sleep") -def test_backoff_strategy_endless( - exit_on_rate_limit: bool, expected_call_count: int, expected_error: Exception -): +def test_backoff_strategy_rate_limited_with_exit_on_rate_limit(): + """When exit_on_rate_limit=True, 429 responses exhaust max_tries then raise.""" http_client = HttpClient( name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock()) ) @@ -707,18 +700,47 @@ def test_backoff_strategy_endless( mocked_response.status_code = 429 mocked_response.headers = {} mocked_response.ok = False - session_send = MagicMock(spec=requests.Session.send) - session_send.return_value = mocked_response with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send: - with pytest.raises(AirbyteTracedException) as e: + with pytest.raises(AirbyteTracedException): http_client.send_request( http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={}, - exit_on_rate_limit=exit_on_rate_limit, + exit_on_rate_limit=True, ) - assert mocked_send.call_count == expected_call_count + assert mocked_send.call_count == 6 # 1 initial + 5 retries + + +@pytest.mark.usefixtures("mock_sleep") +def test_backoff_strategy_rate_limited_retries_endlessly(): + """When exit_on_rate_limit=False, 429 responses retry past max_tries until success.""" + http_client = HttpClient( + name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock()) + ) + + rate_limited_response = MagicMock(spec=requests.Response) + rate_limited_response.status_code = 429 + rate_limited_response.headers = {} + rate_limited_response.ok = False + + success_response = MagicMock(spec=requests.Response) + success_response.status_code = 200 + success_response.headers = {} + success_response.ok = True + + # Fail 10 times (well past max_tries=6), then succeed + side_effects = [rate_limited_response] * 10 + [success_response] + + with patch.object(requests.Session, "send", side_effect=side_effects) as mocked_send: + _, response = http_client.send_request( + http_method="get", + url="https://test_base_url.com/v1/endpoint", + request_kwargs={}, + exit_on_rate_limit=False, + ) + assert response.status_code == 200 + assert mocked_send.call_count == 11 # 10 rate-limited + 1 success def test_given_different_headers_then_response_is_not_cached(requests_mock): @@ -783,23 +805,23 @@ def test_send_request_respects_environment_variables(): @pytest.mark.usefixtures("mock_sleep") @pytest.mark.parametrize( - "response_code, expected_failure_type, error_message, exception_class", + "response_code, expected_failure_type, error_message, retry_scenario", [ - (400, FailureType.system_error, "test error message", UserDefinedBackoffException), - (401, FailureType.config_error, "test error message", UserDefinedBackoffException), - (403, FailureType.transient_error, "test error message", UserDefinedBackoffException), - (400, FailureType.system_error, "test error message", DefaultBackoffException), - (401, FailureType.config_error, "test error message", DefaultBackoffException), - (403, FailureType.transient_error, "test error message", DefaultBackoffException), - (400, FailureType.system_error, "test error message", RateLimitBackoffException), - (401, FailureType.config_error, "test error message", RateLimitBackoffException), - (403, FailureType.transient_error, "test error message", RateLimitBackoffException), + (400, FailureType.system_error, "test error message", "user_defined"), + (401, FailureType.config_error, "test error message", "user_defined"), + (403, FailureType.transient_error, "test error message", "user_defined"), + (400, FailureType.system_error, "test error message", "default"), + (401, FailureType.config_error, "test error message", "default"), + (403, FailureType.transient_error, "test error message", "default"), + (400, FailureType.system_error, "test error message", "rate_limited"), + (401, FailureType.config_error, "test error message", "rate_limited"), + (403, FailureType.transient_error, "test error message", "rate_limited"), ], ) def test_send_with_retry_raises_airbyte_traced_exception_with_failure_type( - response_code, expected_failure_type, error_message, exception_class, requests_mock + response_code, expected_failure_type, error_message, retry_scenario, requests_mock ): - if exception_class == UserDefinedBackoffException: + if retry_scenario == "user_defined": class CustomBackoffStrategy: def backoff_time(self, response_or_exception, attempt_count): @@ -807,7 +829,7 @@ def backoff_time(self, response_or_exception, attempt_count): backoff_strategy = CustomBackoffStrategy() response_action = ResponseAction.RETRY - elif exception_class == RateLimitBackoffException: + elif retry_scenario == "rate_limited": backoff_strategy = None response_action = ResponseAction.RATE_LIMITED else: @@ -836,7 +858,12 @@ def backoff_time(self, response_or_exception, attempt_count): ) with pytest.raises(AirbyteTracedException) as e: - http_client.send_request(http_method="get", url="https://airbyte.io/", request_kwargs={}) + http_client.send_request( + http_method="get", + url="https://airbyte.io/", + request_kwargs={}, + exit_on_rate_limit=True, # ensure rate-limited retries are bounded so the test terminates + ) assert e.value.failure_type == expected_failure_type @@ -884,7 +911,7 @@ def test_refresh_token_then_retry_action_refreshes_oauth_token(mocker): mocked_response.ok = False mocked_session.send.return_value = mocked_response - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) assert mock_authenticator.refresh_called @@ -920,7 +947,7 @@ def test_refresh_token_then_retry_action_without_oauth_authenticator_proceeds_wi mocked_response.ok = False mocked_session.send.return_value = mocked_response - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) mocked_logger.warning.assert_called() @@ -965,7 +992,7 @@ def __call__(self, request): mocked_response.ok = False mocked_session.send.return_value = mocked_response - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) mocked_logger.warning.assert_called() @@ -1004,7 +1031,7 @@ def test_refresh_token_then_retry_action_with_single_use_refresh_token_authentic mocked_response.ok = False mocked_session.send.return_value = mocked_response - with pytest.raises(DefaultBackoffException): + with pytest.raises(RetryRequestException): http_client._send(prepared_request, {}) mock_authenticator.refresh_and_set_access_token.assert_called_once()