Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co
con.execute can lead to `sqlite3.InterfaceError: bad parameter or other API misuse`. There was a fix implemented
[here](https://github.com/requests-cache/requests-cache/commit/5ca6b9cdcb2797dd2fed485872110ccd72aee55d#diff-f43db4a5edf931647c32dec28ea7557aae4cae8444af4b26c8ecbe88d8c925aaL330-R332)
but there is still no official releases of requests_cache that this is part of. Hence, we will monkeypatch it for now.

Additionally, ``self.deserialize`` can raise ``EOFError`` when the SQLite cache contains
corrupted or truncated pickle data (which may happen due to ``fast_save=True`` combined with
``synchronous=OFF``). We treat this the same as a cache miss by converting it to ``KeyError``,
so ``requests_cache`` will transparently re-fetch from the upstream API.
"""
with self.connection() as con:
# Using placeholders here with python 3.12+ and concurrency results in the error:
Expand All @@ -81,7 +86,10 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co
if not row:
raise KeyError(key)

return self.deserialize(key, row[0])
try:
return self.deserialize(key, row[0])
except EOFError:
raise KeyError(key)


requests_cache.SQLiteDict.__getitem__ = monkey_patched_get_item # type: ignore # see the method doc for more information
Expand Down
48 changes: 48 additions & 0 deletions unit_tests/test_requests_cache_monkeypatch_version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from unittest.mock import MagicMock

import pytest
import requests_cache

from airbyte_cdk.sources.streams.http.http_client import monkey_patched_get_item


def test_assert_requests_cache_version():
"""
Expand All @@ -9,3 +14,46 @@ def test_assert_requests_cache_version():
For more information about the reasons of this test, see monkey_patched_get_item in http_client.py
"""
assert requests_cache.__version__ == "1.2.1"


def test_monkey_patched_get_item_raises_key_error_on_missing_row():
"""Verify that a missing cache key raises KeyError (standard cache miss)."""
mock_self = MagicMock()
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = None
mock_self.connection().__enter__().execute.return_value = mock_cursor

with pytest.raises(KeyError):
monkey_patched_get_item(mock_self, "missing_key")


def test_monkey_patched_get_item_returns_deserialized_value():
"""Verify that a valid cache entry is deserialized and returned."""
mock_self = MagicMock()
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = ("serialized_data",)
mock_self.connection().__enter__().execute.return_value = mock_cursor
mock_self.deserialize.return_value = "deserialized_value"

result = monkey_patched_get_item(mock_self, "valid_key")

assert result == "deserialized_value"
mock_self.deserialize.assert_called_once_with("valid_key", "serialized_data")


def test_monkey_patched_get_item_converts_eoferror_to_key_error():
"""
Verify that EOFError during deserialization of corrupted cache data
is caught and converted to KeyError, treating it as a cache miss.

This handles the case where fast_save=True + synchronous=OFF leads
to truncated pickle data in the SQLite cache.
"""
mock_self = MagicMock()
mock_cursor = MagicMock()
mock_cursor.fetchone.return_value = ("corrupted_data",)
mock_self.connection().__enter__().execute.return_value = mock_cursor
mock_self.deserialize.side_effect = EOFError("Ran out of input")

with pytest.raises(KeyError):
monkey_patched_get_item(mock_self, "corrupted_key")
Loading