diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index a78905e72..68d5ce5ca 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -140,20 +140,19 @@ def on_partition_generation_completed( if status_message: yield status_message - def on_partition(self, partition: Partition) -> None: + def on_partition(self, partition: Partition) -> Iterable[AirbyteMessage]: """ This method is called when a partition is generated. 1. Add the partition to the set of partitions for the stream - 2. Log the slice if necessary + 2. Log the slice if necessary — yield the log message directly instead of + putting it on the shared queue (prevents deadlock when queue is full) 3. Submit the partition to the thread pool manager """ stream_name = partition.stream_name() self._streams_to_running_partitions[stream_name].add(partition) cursor = self._stream_name_to_instance[stream_name].cursor if self._slice_logger.should_log_slice_message(self._logger): - self._message_repository.emit_message( - self._slice_logger.create_slice_log_message(partition.to_slice()) - ) + yield self._slice_logger.create_slice_log_message(partition.to_slice()) self._thread_pool_manager.submit( self._partition_reader.process_partition, partition, cursor ) @@ -426,7 +425,7 @@ def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: ) self._logger.info(f"Marking stream {stream_name} as STOPPED") stream = self._stream_name_to_instance[stream_name] - stream.cursor.ensure_at_least_one_state_emitted() + yield from stream.cursor.ensure_at_least_one_state_emitted() yield from self._message_repository.consume_queue() self._logger.info(f"Finished syncing {stream.name}") self._streams_done.add(stream_name) diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..4b92ba203 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -166,7 +166,7 @@ def _handle_item( elif isinstance(queue_item, PartitionGenerationCompletedSentinel): yield from concurrent_stream_processor.on_partition_generation_completed(queue_item) elif isinstance(queue_item, Partition): - concurrent_stream_processor.on_partition(queue_item) + yield from concurrent_stream_processor.on_partition(queue_item) elif isinstance(queue_item, PartitionCompleteSentinel): yield from concurrent_stream_processor.on_partition_complete_sentinel(queue_item) elif isinstance(queue_item, Record): diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index d1f2ca41e..6ad98f5c9 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -13,6 +13,7 @@ from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, TypeVar from airbyte_cdk.models import ( + AirbyteMessage, AirbyteStateBlob, AirbyteStateMessage, AirbyteStateType, @@ -268,10 +269,11 @@ def _check_and_update_parent_state(self) -> None: if last_closed_state is not None: self._parent_state = last_closed_state - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: """ The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called. + Returns the state message directly instead of putting it on the shared queue. """ if not any( semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items() @@ -281,7 +283,7 @@ def ensure_at_least_one_state_emitted(self) -> None: self._global_cursor = self._new_global_cursor self._lookback_window = self._timer.finish() self._parent_state = self._partition_router.get_stream_state() - self._emit_state_message(throttle=False) + yield from self._create_state_message(throttle=False) def _throttle_state_message(self) -> Optional[float]: """ @@ -292,7 +294,33 @@ def _throttle_state_message(self) -> Optional[float]: return None return current_time + def _create_state_message(self, throttle: bool = True) -> Iterable[AirbyteMessage]: + """ + Build and return the state message directly instead of emitting through the message repository. + Used by ensure_at_least_one_state_emitted() to avoid deadlock when the main thread + would otherwise call queue.put() on a full queue. + """ + if throttle: + current_time = self._throttle_state_message() + if current_time is None: + return + self._last_emission_time = current_time + # Skip state emit for global cursor if parent state is empty + if self._use_global_cursor and not self._parent_state: + return + + self._connector_state_manager.update_state_for_stream( + self._stream_name, + self._stream_namespace, + self.state, + ) + state_message = self._connector_state_manager.create_state_message( + self._stream_name, self._stream_namespace + ) + yield state_message + def _emit_state_message(self, throttle: bool = True) -> None: + """Emit state message via message repository. Used by close_partition() on worker threads.""" if throttle: current_time = self._throttle_state_message() if current_time is None: diff --git a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py index 5d2524b55..f3116dad3 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py +++ b/airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py @@ -256,7 +256,16 @@ def stream_slices(self) -> Iterable[StreamSlice]: if is_last_record_in_slice: parent_stream.cursor.close_partition(partition) if is_last_slice: - parent_stream.cursor.ensure_at_least_one_state_emitted() + # ensure_at_least_one_state_emitted now returns messages directly. + # On this worker thread we need to consume the returned iterator + # so the cursor's internal state updates happen, but the messages + # themselves are discarded — the parent cursor's close_partition() + # above already emitted state through the queue. This call just + # ensures internal bookkeeping is finalized. + for ( + _msg + ) in parent_stream.cursor.ensure_at_least_one_state_emitted(): + pass if emit_slice: yield StreamSlice( diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py index 5c30fda4a..e4fc25e68 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/abstract_concurrent_file_based_cursor.py @@ -7,6 +7,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, Iterable, List, MutableMapping +from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.stream.cursor import AbstractFileBasedCursor from airbyte_cdk.sources.file_based.types import StreamState @@ -56,4 +57,4 @@ def get_start_time(self) -> datetime: ... def emit_state_message(self) -> None: ... @abstractmethod - def ensure_at_least_one_state_emitted(self) -> None: ... + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: ... diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py index 0ae9178c0..29ae0750a 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py @@ -309,8 +309,19 @@ def get_state(self) -> MutableMapping[str, Any]: def set_initial_state(self, value: StreamState) -> None: pass - def ensure_at_least_one_state_emitted(self) -> None: - self.emit_state_message() + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: + """Return the state message directly instead of putting it on the shared queue.""" + with self._state_lock: + new_state = self.get_state() + self._connector_state_manager.update_state_for_stream( + self._stream_name, + self._stream_namespace, + new_state, + ) + state_message = self._connector_state_manager.create_state_message( + self._stream_name, self._stream_namespace + ) + yield state_message def should_be_synced(self, record: Record) -> bool: return True diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py index b9cb621a5..a57c3ddb3 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py @@ -6,6 +6,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Any, Iterable, List, MutableMapping, Optional +from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig from airbyte_cdk.sources.file_based.remote_file import RemoteFile @@ -73,14 +74,15 @@ def get_start_time(self) -> datetime: def emit_state_message(self) -> None: pass - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: + """Return the state message directly instead of putting it on the shared queue.""" self._connector_state_manager.update_state_for_stream( self._stream_name, self._stream_namespace, self.state ) state_message = self._connector_state_manager.create_state_message( self._stream_name, self._stream_namespace ) - self._message_repository.emit_message(state_message) + yield state_message def should_be_synced(self, record: Record) -> bool: return True diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index 11eaad235..6145834d9 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -19,6 +19,7 @@ Union, ) +from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY @@ -72,10 +73,13 @@ def close_partition(self, partition: Partition) -> None: raise NotImplementedError() @abstractmethod - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: """ State messages are emitted when a partition is closed. However, the platform expects at least one state to be emitted per sync per stream. Hence, if no partitions are generated, this method needs to be called. + + Returns the state messages directly instead of putting them on the shared queue, + so the caller (main thread) can yield them without risk of deadlock. """ raise NotImplementedError() @@ -140,9 +144,10 @@ def observe(self, record: Record) -> None: def close_partition(self, partition: Partition) -> None: pass - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: """ - Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync + Used primarily for full refresh syncs that do not have a valid cursor value to emit at the end of a sync. + Returns the state message directly instead of putting it on the shared queue. """ self._connector_state_manager.update_state_for_stream( @@ -151,7 +156,7 @@ def ensure_at_least_one_state_emitted(self) -> None: state_message = self._connector_state_manager.create_state_message( self._stream_name, self._stream_namespace ) - self._message_repository.emit_message(state_message) + yield state_message def should_be_synced(self, record: Record) -> bool: return True @@ -397,12 +402,21 @@ def _extract_from_slice(self, partition: Partition, key: str) -> CursorValueType f"Partition is expected to have key `{key}` but could not be found" ) from exception - def ensure_at_least_one_state_emitted(self) -> None: + def ensure_at_least_one_state_emitted(self) -> Iterable[AirbyteMessage]: """ The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be called. + Returns the state message directly instead of putting it on the shared queue. """ - self._emit_state_message() + self._connector_state_manager.update_state_for_stream( + self._stream_name, + self._stream_namespace, + self.state, + ) + state_message = self._connector_state_manager.create_state_message( + self._stream_name, self._stream_namespace + ) + yield state_message def stream_slices(self) -> Iterable[StreamSlice]: """ diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 28b9b8460..5032bc661 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -3739,7 +3739,7 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update(): record_counter=RecordCounter(), ) ) - cursor.ensure_at_least_one_state_emitted() + list(cursor.ensure_at_least_one_state_emitted()) state = cursor.state assert state == { @@ -3835,7 +3835,7 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update record_counter=RecordCounter(), ) ) - cursor.ensure_at_least_one_state_emitted() + list(cursor.ensure_at_least_one_state_emitted()) state = cursor.state assert state == { @@ -3927,7 +3927,7 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi ) ) - cursor.ensure_at_least_one_state_emitted() + state_messages = list(cursor.ensure_at_least_one_state_emitted()) final_state = cursor.state assert final_state["use_global_cursor"] is False @@ -3935,7 +3935,10 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi assert final_state["state"]["updated_at"] == "2024-01-02T00:00:00Z" assert final_state["parent_state"] == {"posts": {"updated_at": "2024-01-06T00:00:00Z"}} assert final_state["lookback_window"] == 86400 - assert cursor._message_repository.emit_message.call_count == 2 + # close_partition() emits 1 state via message_repository (second is throttled) + # ensure_at_least_one_state_emitted() returns 1 state directly (no longer uses message_repository) + assert cursor._message_repository.emit_message.call_count == 1 + assert len(state_messages) == 1 assert mock_cursor.stream_slices.call_count == 2 # Called once for each partition # Checks that all internal variables are cleaned up @@ -4001,7 +4004,7 @@ def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_glob record_counter=RecordCounter(), ) ) - cursor.ensure_at_least_one_state_emitted() + list(cursor.ensure_at_least_one_state_emitted()) final_state = cursor.state assert len(slices) == 3 diff --git a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py index b69849ebe..2f5e7c969 100644 --- a/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py @@ -1208,6 +1208,7 @@ def test_substream_partition_router_closes_all_partitions_even_when_no_records() mock_cursor = Mock() mock_cursor.stream_slices.return_value = [] + mock_cursor.ensure_at_least_one_state_emitted.return_value = [] partition_router = SubstreamPartitionRouter( parent_stream_configs=[ @@ -1270,6 +1271,7 @@ def test_substream_partition_router_closes_partition_even_when_parent_key_missin mock_cursor = Mock() mock_cursor.stream_slices.return_value = [] + mock_cursor.ensure_at_least_one_state_emitted.return_value = [] partition_router = SubstreamPartitionRouter( parent_stream_configs=[ diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index 910111a05..78f7f0a6d 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -79,6 +79,7 @@ def setUp(self): json_schema={}, supported_sync_modes=[SyncMode.full_refresh], ) + self._stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([]) self._another_stream = Mock(spec=AbstractStream) self._another_stream.name = _ANOTHER_STREAM_NAME self._another_stream.as_airbyte_stream.return_value = AirbyteStream( @@ -86,6 +87,7 @@ def setUp(self): json_schema={}, supported_sync_modes=[SyncMode.full_refresh], ) + self._another_stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([]) self._record_data = {"id": 1, "value": "A"} self._partition = Mock(spec=Partition) @@ -122,7 +124,7 @@ def test_handle_partition_done_no_other_streams_to_generate_partitions_for(self) self._partition_reader, ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) sentinel = PartitionGenerationCompletedSentinel(self._stream) messages = list(handler.on_partition_generation_completed(sentinel)) @@ -186,7 +188,7 @@ def test_handle_partition(self): expected_cursor = handler._stream_name_to_instance[_ANOTHER_STREAM_NAME].cursor - handler.on_partition(self._a_closed_partition) + list(handler.on_partition(self._a_closed_partition)) self._thread_pool_manager.submit.assert_called_with( self._partition_reader.process_partition, self._a_closed_partition, expected_cursor @@ -213,12 +215,13 @@ def test_handle_partition_emits_log_message_if_it_should_be_logged(self): expected_cursor = handler._stream_name_to_instance[_STREAM_NAME].cursor - handler.on_partition(self._an_open_partition) + messages = list(handler.on_partition(self._an_open_partition)) self._thread_pool_manager.submit.assert_called_with( self._partition_reader.process_partition, self._an_open_partition, expected_cursor ) - self._message_repository.emit_message.assert_called_with(self._log_message) + # Log message is now yielded directly instead of emitted through the repository + assert self._log_message in messages assert self._an_open_partition in handler._streams_to_running_partitions[_STREAM_NAME] @@ -240,7 +243,7 @@ def test_handle_on_partition_complete_sentinel_with_messages_from_repository(sel self._partition_reader, ) handler.start_next_partition_generator() - handler.on_partition(partition) + list(handler.on_partition(partition)) sentinel = PartitionCompleteSentinel(partition) @@ -285,7 +288,7 @@ def test_handle_on_partition_complete_sentinel_yields_status_message_if_the_stre self._partition_reader, ) handler.start_next_partition_generator() - handler.on_partition(self._a_closed_partition) + list(handler.on_partition(self._a_closed_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._another_stream) @@ -560,7 +563,7 @@ def test_on_exception_return_trace_message_and_on_stream_complete_return_stream_ ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) @@ -627,7 +630,7 @@ def test_given_underlying_exception_is_traced_exception_on_exception_return_trac ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) @@ -688,7 +691,7 @@ def test_given_partition_completion_is_not_success_then_do_not_close_partition(s ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) @@ -749,7 +752,7 @@ def test_is_done_is_false_if_all_partitions_are_not_closed(self): ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) ) @@ -789,7 +792,7 @@ def test_on_exception_non_ate_uses_templated_message_with_correct_failure_type(s ) handler.start_next_partition_generator() - handler.on_partition(self._an_open_partition) + list(handler.on_partition(self._an_open_partition)) list( handler.on_partition_generation_completed( PartitionGenerationCompletedSentinel(self._stream) @@ -874,7 +877,7 @@ def _create_mock_stream(self, name: str, block_simultaneous_read: str = ""): json_schema={}, supported_sync_modes=[SyncMode.full_refresh], ) - stream.cursor.ensure_at_least_one_state_emitted = Mock() + stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([]) return stream def _create_mock_stream_with_parent( @@ -889,7 +892,7 @@ def _create_mock_stream_with_parent( json_schema={}, supported_sync_modes=[SyncMode.full_refresh], ) - stream.cursor.ensure_at_least_one_state_emitted = Mock() + stream.cursor.ensure_at_least_one_state_emitted.side_effect = lambda: iter([]) mock_partition_router = Mock(spec=SubstreamPartitionRouter) mock_parent_config = Mock() diff --git a/unit_tests/sources/streams/test_stream_read.py b/unit_tests/sources/streams/test_stream_read.py index cf550f8cf..1b998c459 100644 --- a/unit_tests/sources/streams/test_stream_read.py +++ b/unit_tests/sources/streams/test_stream_read.py @@ -145,8 +145,8 @@ def close_partition(self, partition: Partition) -> None: ) ) - def ensure_at_least_one_state_emitted(self) -> None: - pass + def ensure_at_least_one_state_emitted(self): + yield from [] def should_be_synced(self, record: Record) -> bool: return True diff --git a/unit_tests/sources/test_source_read.py b/unit_tests/sources/test_source_read.py index a25f54a5a..5ad5f1660 100644 --- a/unit_tests/sources/test_source_read.py +++ b/unit_tests/sources/test_source_read.py @@ -6,10 +6,12 @@ from unittest.mock import Mock import freezegun +from airbyte_protocol_dataclasses.models import AirbyteStateType from airbyte_cdk.models import ( AirbyteMessage, AirbyteRecordMessage, + AirbyteStateMessage, AirbyteStream, AirbyteStreamStatus, AirbyteStreamStatusTraceMessage, @@ -22,11 +24,15 @@ TraceType, ) from airbyte_cdk.models import Type as MessageType +from airbyte_cdk.models.airbyte_protocol import ( + AirbyteStateBlob, + AirbyteStreamState, +) from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource from airbyte_cdk.sources.concurrent_source.concurrent_source_adapter import ConcurrentSourceAdapter from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY, Stream from airbyte_cdk.sources.streams.concurrent.adapters import StreamFacade from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor from airbyte_cdk.sources.streams.core import StreamData @@ -212,6 +218,16 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_no_e emitted_at=1577836800000, ), ), + AirbyteMessage( + type=MessageType.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="stream0"), + stream_state=AirbyteStateBlob(**{NO_CURSOR_STATE_KEY: True}), + ), + ), + ), AirbyteMessage( type=MessageType.TRACE, trace=AirbyteTraceMessage( @@ -283,6 +299,16 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_no_e emitted_at=1577836800000, ), ), + AirbyteMessage( + type=MessageType.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="stream1"), + stream_state=AirbyteStateBlob(**{NO_CURSOR_STATE_KEY: True}), + ), + ), + ), AirbyteMessage( type=MessageType.TRACE, trace=AirbyteTraceMessage(