From 866ad48a74901a8ca1caf1ed804e888f5725f2ee Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:03:09 +0000 Subject: [PATCH 1/5] fix: prevent deadlock by returning messages directly instead of queueing Changes the Cursor ABC and all implementations so that ensure_at_least_one_state_emitted() returns Iterable[AirbyteMessage] instead of putting messages on the shared queue. The main thread (consumer) yields these messages directly, eliminating the deadlock where it would block on queue.put() into its own full queue. Also changes on_partition() to yield slice log messages directly instead of emitting through the message repository. Modified files: - cursor.py: ABC + FinalStateCursor + ConcurrentCursor - concurrent_partition_cursor.py: Added _create_state_message() - file_based_concurrent_cursor.py - file_based_final_state_cursor.py - abstract_concurrent_file_based_cursor.py - concurrent_read_processor.py: on_partition() and _on_stream_is_done() - concurrent_source.py: yield from on_partition() - substream_partition_router.py: consume returned iterator Co-Authored-By: gl_anatolii.yatsuk --- .../concurrent_read_processor.py | 11 +++---- .../concurrent_source/concurrent_source.py | 2 +- .../concurrent_partition_cursor.py | 32 +++++++++++++++++-- .../substream_partition_router.py | 9 +++++- .../abstract_concurrent_file_based_cursor.py | 3 +- .../cursor/file_based_concurrent_cursor.py | 15 +++++++-- .../cursor/file_based_final_state_cursor.py | 6 ++-- .../sources/streams/concurrent/cursor.py | 26 +++++++++++---- .../test_concurrent_read_processor.py | 29 +++++++++-------- .../sources/streams/test_stream_read.py | 4 +-- 10 files changed, 101 insertions(+), 36 deletions(-) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index a78905e72..68d5ce5ca 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -140,20 +140,19 @@ def on_partition_generation_completed( if status_message: yield status_message - def on_partition(self, partition: Partition) -> None: + def on_partition(self, partition: Partition) -> Iterable[AirbyteMessage]: """ This method is called when a partition is generated. 1. Add the partition to the set of partitions for the stream - 2. Log the slice if necessary + 2. Log the slice if necessary — yield the log message directly instead of + putting it on the shared queue (prevents deadlock when queue is full) 3. Submit the partition to the thread pool manager """ stream_name = partition.stream_name() self._streams_to_running_partitions[stream_name].add(partition) cursor = self._stream_name_to_instance[stream_name].cursor if self._slice_logger.should_log_slice_message(self._logger): - self._message_repository.emit_message( - self._slice_logger.create_slice_log_message(partition.to_slice()) - ) + yield self._slice_logger.create_slice_log_message(partition.to_slice()) self._thread_pool_manager.submit( self._partition_reader.process_partition, partition, cursor ) @@ -426,7 +425,7 @@ def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: ) self._logger.info(f"Marking stream {stream_name} as STOPPED") stream = self._stream_name_to_instance[stream_name] - stream.cursor.ensure_at_least_one_state_emitted() + yield from stream.cursor.ensure_at_least_one_state_emitted() yield from self._message_repository.consume_queue() self._logger.info(f"Finished syncing {stream.name}") self._streams_done.add(stream_name) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..4b92ba203 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -166,7 +166,7 @@ def _handle_item( elif isinstance(queue_item, PartitionGenerationCompletedSentinel): yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) elif isinstance(queue_item, Partition): - concurrent_stream_processor.on_partition(queue_item) + yield from concurrent_stream_processor.on_partition(queue_item) elif isinstance(queue_item, PartitionCompleteSentinel): yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) elif isinstance(queue_item, Record): diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index d1f2ca41e..6ad98f5c9 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -13,6 +13,7 @@ from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, TypeVar from airbyte_cdk.models import ( + AirbyteMessage, AirbyteStateBlob, AirbyteStateMessage, AirbyteStateType, @@ -268,10 +269,11 @@ def _check_and_update_parent_state(self) -> None: if last_closed_state is not None: self._parent_state = last_closed_state - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: """ The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called. + Returns the state message directly instead of putting it on the shared queue. """ if not any( semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() @@ -281,7 +283,7 @@ def ensure_at_least_one_state_emitted(self) -> None: self._global_cursor = self._new_global_cursor self._lookback_window = self._timer.finish() self._parent_state = self._partition_router.get_stream_state() - self._emit_state_message(throttle=False) + yield from self._create_state_message(throttle=False) def _throttle_state_message(self) -> Optional[float]: """ @@ -292,7 +294,33 @@ def _throttle_state_message(self) -> Optional[float]: return None return current_time + def _create_state_message(self, throttle: bool = True) -> Iterable[AirbyteMessage]: + """ + Build and return the state message directly instead of emitting through the message repository. + Used by ensure_at_least_one_state_emitted() to avoid deadlock when the main thread + would otherwise call queue.put() on a full queue. + """ + if throttle: + current_time = self._throttle_state_message() + if current_time is None: + return + self._last_emission_time = current_time + # Skip state emit for global cursor if parent state is empty + if self._use_global_cursor and not self._parent_state: + return + + self._connector_state_manager.update_state_for_stream( + self._stream_name, + self._stream_namespace, + self.state, + ) + state_message = self._connector_state_manager.create_state_message( + self._stream_name, self._stream_namespace + ) + yield state_message + def _emit_state_message(self, throttle: bool = True) -> None: + """Emit state message via message repository. Used by close_partition() on worker threads.""" if throttle: current_time = self._throttle_state_message() if current_time is None: diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 5d2524b55..916d2026f 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -256,7 +256,14 @@ def stream_slices(self) -> Iterable[StreamSlice]: if is_last_record_in_slice: parent_stream.cursor.close_partition(partition) if is_last_slice: - parent_stream.cursor.ensure_at_least_one_state_emitted() + # ensure_at_least_one_state_emitted now returns messages directly. + # On this worker thread we need to consume the returned iterator + # so the cursor's internal state updates happen, but the messages + # themselves are discarded — the parent cursor's close_partition() + # above already emitted state through the queue. This call just + # ensures internal bookkeeping is finalized. + for _msg in parent_stream.cursor.ensure_at_least_one_state_emitted(): + pass if emit_slice: yield StreamSlice( diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py index 5c30fda4a..e4fc25e68 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py @@ -7,6 +7,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, Iterable, List, MutableMapping +from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor from airbyte_cdk.sources.file_based.types import StreamState @@ -56,4 +57,4 @@ def get_start_time(self) -> datetime: ... def emit_state_message(self) -> None: ... @abstractmethod - def ensure_at_least_one_state_emitted(self) -> None: ... + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: ... diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py index 0ae9178c0..29ae0750a 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py @@ -309,8 +309,19 @@ def get_state(self) -> MutableMapping[str, Any]: def set_initial_state(self, value: StreamState) -> None: pass - def ensure_at_least_one_state_emitted(self) -> None: - self.emit_state_message() + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: + """Return the state message directly instead of putting it on the shared queue.""" + with self._state_lock: + new_state = self.get_state() + self._connector_state_manager.update_state_for_stream( + self._stream_name, + self._stream_namespace, + new_state, + ) + state_message = self._connector_state_manager.create_state_message( + self._stream_name, self._stream_namespace + ) + yield state_message def should_be_synced(self, record: Record) -> bool: return True diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py index b9cb621a5..a57c3ddb3 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py @@ -6,6 +6,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, Iterable, List, MutableMapping, Optional +from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig from airbyte_cdk.sources.file_based.remote_file import RemoteFile @@ -73,14 +74,15 @@ def get_start_time(self) -> datetime: def emit_state_message(self) -> None: pass - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: + """Return the state message directly instead of putting it on the shared queue.""" self._connector_state_manager.update_state_for_stream( self._stream_name, self._stream_namespace, self.state ) state_message = self._connector_state_manager.create_state_message( self._stream_name, self._stream_namespace ) - self._message_repository.emit_message(state_message) + yield state_message def should_be_synced(self, record: Record) -> bool: return True diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index 11eaad235..6145834d9 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -19,6 +19,7 @@ Union, ) +from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY @@ -72,10 +73,13 @@ def close_partition(self, partition: Partition) -> None: raise NotImplementedError() @abstractmethod - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: """ State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called. + + Returns the state messages directly instead of putting them on the shared queue, + so the caller (main thread) can yield them without risk of deadlock. """ raise NotImplementedError() @@ -140,9 +144,10 @@ def observe(self, record: Record) -> None: def close_partition(self, partition: Partition) -> None: pass - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: """ - Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync + Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync. + Returns the state message directly instead of putting it on the shared queue. """ self._connector_state_manager.update_state_for_stream( @@ -151,7 +156,7 @@ def ensure_at_least_one_state_emitted(self) -> None: state_message = self._connector_state_manager.create_state_message( self._stream_name, self._stream_namespace ) - self._message_repository.emit_message(state_message) + yield state_message def should_be_synced(self, record: Record) -> bool: return True @@ -397,12 +402,21 @@ def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType f"Partition is expected to have key `{key}` but could not be found" ) from exception - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: """ The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called. + Returns the state message directly instead of putting it on the shared queue. """ - self._emit_state_message() + self._connector_state_manager.update_state_for_stream( + self._stream_name, + self._stream_namespace, + self.state, + ) + state_message = self._connector_state_manager.create_state_message( + self._stream_name, self._stream_namespace + ) + yield state_message def stream_slices(self) -> Iterable[StreamSlice]: """ diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index 910111a05..78f7f0a6d 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -79,6 +79,7 @@ def setUp(self): json_schema={}, supported_sync_modes=[SyncMode.full_refresh], ) + self._stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([]) self._another_stream = Mock(spec=AbstractStream) self._another_stream.name = _ANOTHER_STREAM_NAME self._another_stream.as_airbyte_stream.return_value = AirbyteStream( @@ -86,6 +87,7 @@ def setUp(self): json_schema={}, supported_sync_modes=[SyncMode.full_refresh], ) + self._another_stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([]) self._record_data = {"id": 1, "value": "A"} self._partition = Mock(spec=Partition) @@ -122,7 +124,7 @@ def test_handle_partition_done_no_other_streams_to_generate_partitions_for(self) self._partition_reader, ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) sentinel = PartitionGenerationCompletedSentinel(self._stream) messages = list(handler.on_partition_generation_completed(sentinel)) @@ -186,7 +188,7 @@ def test_handle_partition(self): expected_cursor = handler._stream_name_to_instance[_ANOTHER_STREAM_NAME].cursor - handler.on_partition(self._a_closed_partition) + list(handler.on_partition(self._a_closed_partition)) self._thread_pool_manager.submit.assert_called_with( self._partition_reader.process_partition, self._a_closed_partition, expected_cursor @@ -213,12 +215,13 @@ def test_handle_partition_emits_log_message_if_it_should_be_logged(self): expected_cursor = handler._stream_name_to_instance[_STREAM_NAME].cursor - handler.on_partition(self._an_open_partition) + messages = list(handler.on_partition(self._an_open_partition)) self._thread_pool_manager.submit.assert_called_with( self._partition_reader.process_partition, self._an_open_partition, expected_cursor ) - self._message_repository.emit_message.assert_called_with(self._log_message) + # Log message is now yielded directly instead of emitted through the repository + assert self._log_message in messages assert self._an_open_partition in handler._streams_to_running_partitions[_STREAM_NAME] @@ -240,7 +243,7 @@ def test_handle_on_partition_complete_sentinel_with_messages_from_repository(sel self._partition_reader, ) handler.start_next_partition_generator() - handler.on_partition(partition) + list(handler.on_partition(partition)) sentinel = PartitionCompleteSentinel(partition) @@ -285,7 +288,7 @@ def test_handle_on_partition_complete_sentinel_yields_status_message_if_the_stre self._partition_reader, ) handler.start_next_partition_generator() - handler.on_partition(self._a_closed_partition) + list(handler.on_partition(self._a_closed_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._another_stream) @@ -560,7 +563,7 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_ ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) @@ -627,7 +630,7 @@ def test_given_underlying_exception_is_traced_exception_on_exception_return_trac ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) @@ -688,7 +691,7 @@ def test_given_partition_completion_is_not_success_then_do_not_close_partition(s ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) @@ -749,7 +752,7 @@ def test_is_done_is_false_if_all_partitions_are_not_closed(self): ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) ) @@ -789,7 +792,7 @@ def test_on_exception_non_ate_uses_templated_message_with_correct_failure_type(s ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) @@ -874,7 +877,7 @@ def _create_mock_stream(self, name: str, block_simultaneous_read: str = ""): json_schema={}, supported_sync_modes=[SyncMode.full_refresh], ) - stream.cursor.ensure_at_least_one_state_emitted = Mock() + stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([]) return stream def _create_mock_stream_with_parent( @@ -889,7 +892,7 @@ def _create_mock_stream_with_parent( json_schema={}, supported_sync_modes=[SyncMode.full_refresh], ) - stream.cursor.ensure_at_least_one_state_emitted = Mock() + stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([]) mock_partition_router = Mock(spec=SubstreamPartitionRouter) mock_parent_config = Mock() diff --git a/unit_tests/sources/streams/test_stream_read.py b/unit_tests/sources/streams/test_stream_read.py index cf550f8cf..1b998c459 100644 --- a/unit_tests/sources/streams/test_stream_read.py +++ b/unit_tests/sources/streams/test_stream_read.py @@ -145,8 +145,8 @@ def close_partition(self, partition: Partition) -> None: ) ) - def ensure_at_least_one_state_emitted(self) -> None: - pass + def ensure_at_least_one_state_emitted(self): + yield from [] def should_be_synced(self, record: Record) -> bool: return True From 26150b30d4283214cb42d7223aabdec499399d45 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:05:05 +0000 Subject: [PATCH 2/5] style: fix ruff formatting in substream_partition_router.py Co-Authored-By: gl_anatolii.yatsuk --- .../partition_routers/substream_partition_router.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 916d2026f..f3116dad3 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -262,7 +262,9 @@ def stream_slices(self) -> Iterable[StreamSlice]: # themselves are discarded — the parent cursor's close_partition() # above already emitted state through the queue. This call just # ensures internal bookkeeping is finalized. - for _msg in parent_stream.cursor.ensure_at_least_one_state_emitted(): + for ( + _msg + ) in parent_stream.cursor.ensure_at_least_one_state_emitted(): pass if emit_slice: From b9ac43f3177c7bf7855b778739339563ea6bf6b4 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:14:36 +0000 Subject: [PATCH 3/5] fix: add expected STATE messages to integration test for direct-return approach Co-Authored-By: gl_anatolii.yatsuk --- unit_tests/sources/test_source_read.py | 28 +++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/test_source_read.py b/unit_tests/sources/test_source_read.py index a25f54a5a..5ad5f1660 100644 --- a/unit_tests/sources/test_source_read.py +++ b/unit_tests/sources/test_source_read.py @@ -6,10 +6,12 @@ from unittest.mock import Mock import freezegun +from airbyte_protocol_dataclasses.models import AirbyteStateType from airbyte_cdk.models import ( AirbyteMessage, AirbyteRecordMessage, + AirbyteStateMessage, AirbyteStream, AirbyteStreamStatus, AirbyteStreamStatusTraceMessage, @@ -22,11 +24,15 @@ TraceType, ) from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.models.airbyte_protocol import ( + AirbyteStateBlob, + AirbyteStreamState, +) from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY, Stream from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor from airbyte_cdk.sources.streams.core import StreamData @@ -212,6 +218,16 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_no_e emitted_at=1577836800000, ), ), + AirbyteMessage( + type=MessageType.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="stream0"), + stream_state=AirbyteStateBlob(**{NO_CURSOR_STATE_KEY: True}), + ), + ), + ), AirbyteMessage( type=MessageType.TRACE, trace=AirbyteTraceMessage( @@ -283,6 +299,16 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_no_e emitted_at=1577836800000, ), ), + AirbyteMessage( + type=MessageType.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="stream1"), + stream_state=AirbyteStateBlob(**{NO_CURSOR_STATE_KEY: True}), + ), + ), + ), AirbyteMessage( type=MessageType.TRACE, trace=AirbyteTraceMessage( From 87db0cc2e81caf44043323ff6324a48c8a629e65 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:23:44 +0000 Subject: [PATCH 4/5] fix: consume ensure_at_least_one_state_emitted generator in perpartition cursor tests Co-Authored-By: gl_anatolii.yatsuk --- .../test_concurrent_perpartitioncursor.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 28b9b8460..5032bc661 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -3739,7 +3739,7 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update(): record_counter=RecordCounter(), ) ) - cursor.ensure_at_least_one_state_emitted() + list(cursor.ensure_at_least_one_state_emitted()) state = cursor.state assert state == { @@ -3835,7 +3835,7 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update record_counter=RecordCounter(), ) ) - cursor.ensure_at_least_one_state_emitted() + list(cursor.ensure_at_least_one_state_emitted()) state = cursor.state assert state == { @@ -3927,7 +3927,7 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi ) ) - cursor.ensure_at_least_one_state_emitted() + state_messages = list(cursor.ensure_at_least_one_state_emitted()) final_state = cursor.state assert final_state["use_global_cursor"] is False @@ -3935,7 +3935,10 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi assert final_state["state"]["updated_at"] == "2024-01-02T00:00:00Z" assert final_state["parent_state"] == {"posts": {"updated_at": "2024-01-06T00:00:00Z"}} assert final_state["lookback_window"] == 86400 - assert cursor._message_repository.emit_message.call_count == 2 + # close_partition() emits 1 state via message_repository (second is throttled) + # ensure_at_least_one_state_emitted() returns 1 state directly (no longer uses message_repository) + assert cursor._message_repository.emit_message.call_count == 1 + assert len(state_messages) == 1 assert mock_cursor.stream_slices.call_count == 2 # Called once for each partition # Checks that all internal variables are cleaned up @@ -4001,7 +4004,7 @@ def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_glob record_counter=RecordCounter(), ) ) - cursor.ensure_at_least_one_state_emitted() + list(cursor.ensure_at_least_one_state_emitted()) final_state = cursor.state assert len(slices) == 3 From e154d08f807ce2dbdc12dfaa09cee93a9de4342d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:32:01 +0000 Subject: [PATCH 5/5] fix: make mock cursor ensure_at_least_one_state_emitted return iterable in substream tests Co-Authored-By: gl_anatolii.yatsuk --- .../partition_routers/test_substream_partition_router.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index b69849ebe..2f5e7c969 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -1208,6 +1208,7 @@ def test_substream_partition_router_closes_all_partitions_even_when_no_records() mock_cursor = Mock() mock_cursor.stream_slices.return_value = [] + mock_cursor.ensure_at_least_one_state_emitted.return_value = [] partition_router = SubstreamPartitionRouter( parent_stream_configs=[ @@ -1270,6 +1271,7 @@ def test_substream_partition_router_closes_partition_even_when_parent_key_missin mock_cursor = Mock() mock_cursor.stream_slices.return_value = [] + mock_cursor.ensure_at_least_one_state_emitted.return_value = [] partition_router = SubstreamPartitionRouter( parent_stream_configs=[