From c1cb2c76ffee9d465d20f9d04e7959af41fb1524 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 7 Apr 2026 23:49:04 +0000 Subject: [PATCH] fix: handle EOFError in monkey_patched_get_item for corrupted cache entries When the SQLite cache contains corrupted/truncated pickle data (which can happen due to fast_save=True + synchronous=OFF), the requests_cache deserialization pipeline raises EOFError. This was not caught, causing the sync to crash. Catch EOFError during deserialization and convert it to KeyError, which requests_cache treats as a cache miss and transparently re-fetches from the upstream API. Co-Authored-By: bot_apk --- .../sources/streams/http/http_client.py | 10 +++- ...test_requests_cache_monkeypatch_version.py | 48 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 8f70b6e35..51370ea04 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -71,6 +71,11 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co con.execute can lead to `sqlite3.InterfaceError: bad parameter or other API misuse`. There was a fix implemented [here](https://github.com/requests-cache/requests-cache/commit/5ca6b9cdcb2797dd2fed485872110ccd72aee55d#diff-f43db4a5edf931647c32dec28ea7557aae4cae8444af4b26c8ecbe88d8c925aaL330-R332) but there is still no official releases of requests_cache that this is part of. Hence, we will monkeypatch it for now. + + Additionally, ``self.deserialize`` can raise ``EOFError`` when the SQLite cache contains + corrupted or truncated pickle data (which may happen due to ``fast_save=True`` combined with + ``synchronous=OFF``). We treat this the same as a cache miss by converting it to ``KeyError``, + so ``requests_cache`` will transparently re-fetch from the upstream API. """ with self.connection() as con: # Using placeholders here with python 3.12+ and concurrency results in the error: @@ -81,7 +86,10 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co if not row: raise KeyError(key) - return self.deserialize(key, row[0]) + try: + return self.deserialize(key, row[0]) + except EOFError: + raise KeyError(key) requests_cache.SQLiteDict.__getitem__ = monkey_patched_get_item # type: ignore # see the method doc for more information diff --git a/unit_tests/test_requests_cache_monkeypatch_version.py b/unit_tests/test_requests_cache_monkeypatch_version.py index d9c7bc510..98066e4a7 100644 --- a/unit_tests/test_requests_cache_monkeypatch_version.py +++ b/unit_tests/test_requests_cache_monkeypatch_version.py @@ -1,5 +1,10 @@ +from unittest.mock import MagicMock + +import pytest import requests_cache +from airbyte_cdk.sources.streams.http.http_client import monkey_patched_get_item + def test_assert_requests_cache_version(): """ @@ -9,3 +14,46 @@ def test_assert_requests_cache_version(): For more information about the reasons of this test, see monkey_patched_get_item in http_client.py """ assert requests_cache.__version__ == "1.2.1" + + +def test_monkey_patched_get_item_raises_key_error_on_missing_row(): + """Verify that a missing cache key raises KeyError (standard cache miss).""" + mock_self = MagicMock() + mock_cursor = MagicMock() + mock_cursor.fetchone.return_value = None + mock_self.connection().__enter__().execute.return_value = mock_cursor + + with pytest.raises(KeyError): + monkey_patched_get_item(mock_self, "missing_key") + + +def test_monkey_patched_get_item_returns_deserialized_value(): + """Verify that a valid cache entry is deserialized and returned.""" + mock_self = MagicMock() + mock_cursor = MagicMock() + mock_cursor.fetchone.return_value = ("serialized_data",) + mock_self.connection().__enter__().execute.return_value = mock_cursor + mock_self.deserialize.return_value = "deserialized_value" + + result = monkey_patched_get_item(mock_self, "valid_key") + + assert result == "deserialized_value" + mock_self.deserialize.assert_called_once_with("valid_key", "serialized_data") + + +def test_monkey_patched_get_item_converts_eoferror_to_key_error(): + """ + Verify that EOFError during deserialization of corrupted cache data + is caught and converted to KeyError, treating it as a cache miss. + + This handles the case where fast_save=True + synchronous=OFF leads + to truncated pickle data in the SQLite cache. + """ + mock_self = MagicMock() + mock_cursor = MagicMock() + mock_cursor.fetchone.return_value = ("corrupted_data",) + mock_self.connection().__enter__().execute.return_value = mock_cursor + mock_self.deserialize.side_effect = EOFError("Ran out of input") + + with pytest.raises(KeyError): + monkey_patched_get_item(mock_self, "corrupted_key")