Version 6.34.0 of the CDK removes support for stream_state in the Jinja interpolation context. This change is breaking for any low-code connectors that use stream_state in the interpolation context.
The following components are impacted by this change:
HttpRequesterrequest_parametersrequest_body_jsonrequest_body_datarequest_headers
RecordFilterAddField
Where applicable, we recommend updating to use stream_interval instead.
# Before
record_filter:
type: RecordFilter
condition: "{{ stream_state['updated_at'] }}"
# After
record_filter:
type: RecordFilter
condition: "{{ stream_interval['start_date'] }}"Starting from version 6.28.0, the CDK no longer includes Pendulum as a transitive dependency. If your connector relies on Pendulum without explicitly declaring it as a dependency, you will need to add it to your connector's dependencies going forward.
More info:
Version 6.x.x of the CDK introduces concurrent processing of low-code incremental streams. This is breaking because non-manifest only connectors must update their self-managed run.py and source.py files. This section is intended to clarify how to upgrade a low-code connector to use the Concurrent CDK to sync incremental streams.
Note
This version introduces parallel processing of only incremental streams. It does not include the parallel processing of substreams that rely on a parent stream It also does not include processing of full-refresh streams in parallel.
Low-code incremental streams that match any of the following criteria are not supported by concurrent as of this version:
- Uses a custom implementation of the
DatetimeBasedCursorcomponent - The
DatetimeBasedCursordefines astepwhich will partition a stream's request into time intervals AND aAddedField/HttpRequester/RecordFilterthat relies on interpolation of thestream_statevalue. See below for the complete list
In order to enable concurrency for a low-code connector, the following changes must be made:
- In the connector's
source.py, change the method signature to accept catalog, config, and state parameters. Change the invocation ofsuper()to pass in those new parameters
class SourceName(YamlDeclarativeSource):
def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: TState, **kwargs):
super().__init__(catalog=catalog, config=config, state=state, **{"path_to_yaml": "manifest.yaml"})- In the connector's
run.py, update it to pass variables
def _get_source(args: List[str]):
catalog_path = AirbyteEntrypoint.extract_catalog(args)
config_path = AirbyteEntrypoint.extract_config(args)
state_path = AirbyteEntrypoint.extract_state(args)
try:
return SourceName(
SourceName.read_catalog(catalog_path) if catalog_path else None,
SourceName.read_config(config_path) if config_path else None,
SourceName.read_state(state_path) if state_path else None,
)
except Exception as error:
print(
orjson.dumps(
AirbyteMessageSerializer.dump(
AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.ERROR,
emitted_at=int(datetime.now().timestamp() * 1000),
error=AirbyteErrorTraceMessage(
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
stack_trace=traceback.format_exc(),
),
),
)
)
).decode()
)
return None
def run():
_args = sys.argv[1:]
source = _get_source(_args)
if source:
launch(source, _args)- Add the
ConcurrencyLevelcomponent to the connector'smanifest.yamlfile
concurrency_level:
type: ConcurrencyLevel
default_concurrency: "{{ config['num_workers'] or 10 }}"
max_concurrency: 20Connectors that have streams that use stream_state during interpolation and must be run synchronously until they are fixed or updated:
- Http Requester
source-insightly: Uses an DatetimeBasedCursor with a step interval and the HttpRequester has request_parameters relying onstream_state. This should be replaced bystep_intervalsource-intercom: Uses a customincremental_synccomponent andstream_stateused as part of the HttpRequester request_body_json. However, because this processed on a single slice,stream_intervalcan be used
- Record Filter
source-chargebee: Uses a customincremental_synccomponent andstream_statein the RecordFilter condition. However, because this processed on a single slice,stream_intervalcan be usedsource-intercom: Uses a customincremental_synccomponent andstream_stateused as part of the RecordFilter condition. However, because this processed on a single slice,stream_intervalcan be usedsource-railz: Uses a customincremental_synccomponent andstream_stateused as part of the RecordFilter condition. This also uses multiple one month time intervals and is not currently compatible for concurrentsource-tiktok-marketing: Contains DatetimeBasedCursor with a step interval and relies on a CustomRecordFilter with a condition relying onstream_state. This should be replaced bystream_interval
AddFields: No connectors usestream_statewhen performing an additive transformation for a record
To enable concurrency on these streams, stream_state should be removed from the interpolated value and replaced
by a thread safe interpolation context like stream_interval or stream_partition.
All manifest-only sources are run using the source-declarative-manifest which serves as the base image with the common code and flows for connectors that only define a manifest.yaml file.
Within this package, to enable concurrent processing:
- Modify
airbyte-cdkpackage inpyproject.tomlto the current version - In
run.py, parse all entrypoint arguments into the respective config, catalog, and state objects - In
run.py, modify the flow that instantiates aManifestDeclarativeSourcefrom the__injected_declarative_manifestto instantiate aConcurrentDeclarativeSource - In
run.pymodify theSourceLocalYamlclass to accept config, catalog, and state. And use that in theYamlDeclarativeSource.__init__. This should look similar to the migration of sources that are not manifest-only
Version 5.0.0 of the CDK updates the airbyte_cdk.models dependency to replace Pydantic v2 models with Python dataclasses. It also
updates the airbyte-protocol-models dependency to a version that uses dataclasses models.
The changes to Airbyte CDK itself are backwards-compatible, but some changes are required if the connector:
- uses the
airbyte_protocolmodels directly, orairbyte_cdk.models, which points toairbyte_protocolmodels - uses third-party libraries, such as
pandas, to read data from sources, which output non-native Python objects that cannot be serialized by the orjson library.
Note
All Serializers have omit_none=True parameter that is applied recursively. Thus, all None values are excluded from output. This is expected behaviour and does not break anything in protocol.
- If the connector uses Pydantic based Airbyte Protocol Models, the code will need to be updated to reflect the changes
pydantic. - It is recommended to import protocol classes not directly by
import airbyte_protocolstatement, but fromairbyte_cdk.modelspackage. - It is also recommended to use *-
Serializerfromairbyte_cdk.modelsto manipulate the data or convert to/from JSON. These are based on the serpyco-rs library. - These classes have a
dumpmethod that converts the model to a dictionary and aloadmethod that converts a dictionary to a model. - The recommended serialization strategy is to pass the dictionary to the
orjsonlibrary when serializing as a JSON string.
E.g.
import orjson
from airbyte_cdk.models import AirbyteMessage, AirbyteMessageSerializer
# Before (pydantic model message serialization)
AirbyteMessage().model_dump_json()
# After (dataclass model serialization)
orjson.dumps(AirbyteMessageSerializer.dump(AirbyteMessage())).decode()For example, if pandas outputs data from the source, which has date-time pandas.Timestamp object in
it, Orjson supported Types, these fields should be transformed to native JSON
objects.
# Before
yield from df.to_dict(orient="records")
# After - Option 1
yield orjson.loads(df.to_json(orient="records", date_format="iso", date_unit="us"))In this release, we are no longer supporting the legacy state format in favor of the current per-stream state
format which has been running in production for over 2 years. The impacts to connectors should be minimal, but for
the small number of connectors that instantiate their own ConnectorStateManager, the fix to upgrade to the latest
version of the CDK is to stop passing the stream_instance_map parameter to the ConnectorStateManager constructor.
We are unifying the BackoffStrategy interface as it currently differs from the Python CDK package to the declarative one. The different is that the interface will require the attempt_count to be passed.
Main impact: This change is mostly internal but we spotted a couple of tests that expect backoff_time to not have the attempt_count parameter so these tests would fail (example).
This change should not impact the following classes even though they have a different interface as they accept kwargs and attempt_count is currently passed as a keyword argument within the CDK. However, once there is a CDK change where backoff_time is called not as a keyword argument, they will fail:
- Zendesk Support: ZendeskSupportBackoffStrategy (this one will be updated shortly after as it is used for CI to validate CDK changes)
- Klaviyo: KlaviyoBackoffStrategy (the logic has been generified so we will remove this custom component shortly after this update)
- GitHub: GithubStreamABCBackoffStrategy and ContributorActivityBackoffStrategy
- Airtable: AirtableBackoffStrategy
- Slack: SlackBackoffStrategy
This change should not impact WaitUntilMidnightBackoffStrategy from source-gnews as well but it is interesting to note that its interface is also wrong as it considers the first parameter as a requests.Response instead of a Optional[Union[requests.Response, requests.RequestException]].
Updated the codebase to utilize new Python syntax features. As a result, support for Python 3.9 has been dropped. The minimum required Python version is now 3.10.
Version 3.0.0 of the CDK updates the HTTPStream class by reusing the HTTPClient under the hood.
backoff_timeandshould_retrymethods are removed from HttpStreamHttpStreamAdapterHttpStatusErrorHandlerandHttpStreamAdapterBackoffStrategyadapters are marked asdeprecatedraise_on_http_errors,max_retries,max_time,retry_factorare marked asdeprecated
Exceptions from the requests library should no longer be raised when calling read_records.
Therefore, catching exceptions should be updated, and error messages might change.
See Migration of Source Zendesk Support as an example.
In case the connector uses custom logic for backoff based on the response from the server, a new method get_error_handler should be implemented.
This method should return instance of ErrorHandler.
In case the connector uses custom logic for backoff time calculation, a new method get_backoff_strategy should be implemented.
This method should return instance(s) of BackoffStrategy.
Version 2.0.0 of the CDK updates the pydantic dependency to from Pydantic v1 to Pydantic v2. It also
updates the airbyte-protocol-models dependency to a version that uses Pydantic V2 models.
The changes to Airbyte CDK itself are backwards-compatible, but some changes are required if the connector:
- uses Pydantic directly, e.g. for its own custom models, or
- uses the
airbyte_protocolmodels directly, orairbyte_cdk.models, which points toairbyte_protocolmodels, or - customizes HashableStreamDescriptor, which inherits from a protocol model and has therefore been updated to use Pydantic V2 models.
Some test assertions may also need updating due to changes to default serialization of the protocol models.
If the connector uses pydantic, the code will need to be updated to reflect the change pydantic dependency version.
The Pydantic migration guide is a great resource for any questions that
might arise around upgrade behavior.
The easiest way to update the code to be compatible without major changes is to update the import statements from
from pydantic to from pydantic.v1, as Pydantic has kept the v1 module for backwards compatibility.
Some potential gotchas:
ValidationErrormust be imported frompydantic.v1.error_wrappersinstead ofpydantic.v1ModelMetaclassmust be imported frompydantic.v1.maininstead ofpydantic.v1resolve_annotationsmust be imported frompydantic.v1.typinginstead ofpydantic.v1
To upgrade all the way to V2 proper, Pydantic also offers a migration tool to automatically update the code to be compatible with Pydantic V2.
It's possible that a connector might make assertions against protocol models without actually
importing them - for example when testing methods which return AirbyteStateBlob or AnyUrl.
To resolve this, either compare directly to a model, or dict() or str() your model accordingly, depending
on if you care most about the serialized output or the model (for a method which returns a model, option 1 is
preferred). For example:
# Before
assert stream_read.slices[1].state[0].stream.stream_state == {"a_timestamp": 123}
# After - Option 1
from airbyte_cdk.models import AirbyteStateBlob
assert stream_read.slices[1].state[0].stream.stream_state == AirbyteStateBlob(a_timestamp=123)
# After - Option 2
assert stream_read.slices[1].state[0].stream.stream_state.dict() == {"a_timestamp": 123}Starting from 1.0.0, CDK classes and functions should be imported directly from airbyte_cdk (example: from airbyte_cdk import HttpStream). Lower-level __init__ files are not considered stable, and will be modified without introducing a major release.
Introducing breaking changes to a class or function exported from the top level __init__.py will require a major version bump and a migration note to help developer upgrade.
Note that the following packages are not part of the top level init because they require extras dependencies, but are still considered stable:
destination.vector_db_basedsource.file_based
The test package is not included in the top level init either. The test package is still evolving and isn't considered stable.
A few classes were deleted from the Airbyte CDK in version 1.0.0:
- AirbyteLogger
- AirbyteSpec
- Authenticators in the
sources.streams.http.authmodule
No connectors should still be using AirbyteLogger directly, but the class is still used in some interfaces. The only required change is to update the type annotation from AirbyteLogger to logging.Logger. For example:
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
to
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
Don't forget to also update the imports. You can delete from airbyte_cdk import AirbyteLogger and replace it with import logging.
AirbyteSpec isn't used by any connectors in the repository, and I don't expect any custom connectors to use the class either. This should be a no-op.
Replace usage of authenticators in the airbyte_cdk.sources.streams.http.auth module with their sister classes in the airbyte_cdk.sources.streams.http.requests_native_auth module.
If any of your streams reference self.authenticator, you'll also need to update these references to self._session.auth as the authenticator is embedded in the session object.
Here is a pull request that can serve as an example.