diff --git a/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml index 0e3be65f8..f27b30fb8 100644 --- a/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml @@ -13,7 +13,6 @@ kafka-connector.yaml: - from_.yaml - to.yaml - config-kafka-connector.yaml -- resetter_values.yaml kafka-sink-connector.yaml: [] kafka-source-connector.yaml: - from_-kafka-source-connector.yaml diff --git a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml index 5f922be36..e08aa777b 100644 --- a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml @@ -35,8 +35,6 @@ kpops_components_fields: - to - config - state - - resetter_namespace - - resetter_values kafka-sink-connector: - name - enabled @@ -45,8 +43,6 @@ kpops_components_fields: - to - config - state - - resetter_namespace - - resetter_values kafka-source-connector: - name - enabled @@ -55,8 +51,6 @@ kpops_components_fields: - to - config - state - - resetter_namespace - - resetter_values - offset_topic kubernetes-app: - name diff --git a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml index c517d87e3..a15e7b3bc 100644 --- a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml @@ -23,19 +23,16 @@ kafka-connector.yaml: - from_.yaml - to.yaml - config-kafka-connector.yaml -- resetter_values.yaml kafka-sink-connector.yaml: - prefix.yaml - from_.yaml - to.yaml - config-kafka-connector.yaml -- resetter_values.yaml kafka-source-connector.yaml: - prefix.yaml - from_-kafka-source-connector.yaml - to.yaml - config-kafka-connector.yaml -- resetter_values.yaml - offset_topic-kafka-source-connector.yaml kubernetes-app.yaml: - prefix.yaml diff --git a/docs/docs/resources/pipeline-components/kafka-connector.yaml b/docs/docs/resources/pipeline-components/kafka-connector.yaml index 4a64cdf3a..dc809d45b 100644 --- a/docs/docs/resources/pipeline-components/kafka-connector.yaml +++ b/docs/docs/resources/pipeline-components/kafka-connector.yaml @@ -45,7 +45,3 @@ # Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs config: # required tasks.max: 1 - # Overriding Kafka Connect Resetter Helm values. E.g. to override the - # Image Tag etc. - resetter_values: - imageTag: "1.2.3" diff --git a/docs/docs/resources/pipeline-components/kafka-sink-connector.yaml b/docs/docs/resources/pipeline-components/kafka-sink-connector.yaml index 9253db0e0..dae7ef1e4 100644 --- a/docs/docs/resources/pipeline-components/kafka-sink-connector.yaml +++ b/docs/docs/resources/pipeline-components/kafka-sink-connector.yaml @@ -46,7 +46,3 @@ # Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs config: # required tasks.max: 1 - # Overriding Kafka Connect Resetter Helm values. E.g. to override the - # Image Tag etc. - resetter_values: - imageTag: "1.2.3" diff --git a/docs/docs/resources/pipeline-components/kafka-source-connector.yaml b/docs/docs/resources/pipeline-components/kafka-source-connector.yaml index 8fa6c51d5..e35ad39db 100644 --- a/docs/docs/resources/pipeline-components/kafka-source-connector.yaml +++ b/docs/docs/resources/pipeline-components/kafka-source-connector.yaml @@ -27,10 +27,6 @@ # Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs config: # required tasks.max: 1 - # Overriding Kafka Connect Resetter Helm values. E.g. to override the - # Image Tag etc. - resetter_values: - imageTag: "1.2.3" # offset.storage.topic # https://kafka.apache.org/documentation/#connect_running offset_topic: offset_topic diff --git a/docs/docs/resources/pipeline-components/pipeline.yaml b/docs/docs/resources/pipeline-components/pipeline.yaml index b6ad9a726..566cfae61 100644 --- a/docs/docs/resources/pipeline-components/pipeline.yaml +++ b/docs/docs/resources/pipeline-components/pipeline.yaml @@ -248,10 +248,6 @@ # Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs config: # required tasks.max: 1 - # Overriding Kafka Connect Resetter Helm values. E.g. to override the - # Image Tag etc. - resetter_values: - imageTag: "1.2.3" # Kafka source connector - type: kafka-source-connector # required name: kafka-source-connector # required @@ -281,10 +277,6 @@ # Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs config: # required tasks.max: 1 - # Overriding Kafka Connect Resetter Helm values. E.g. to override the - # Image Tag etc. - resetter_values: - imageTag: "1.2.3" # offset.storage.topic # https://kafka.apache.org/documentation/#connect_running offset_topic: offset_topic diff --git a/docs/docs/resources/pipeline-defaults/defaults-kafka-connector.yaml b/docs/docs/resources/pipeline-defaults/defaults-kafka-connector.yaml index 9ace5173b..dc06b447d 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-kafka-connector.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-kafka-connector.yaml @@ -48,7 +48,3 @@ kafka-connector: # Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs config: # required tasks.max: 1 - # Overriding Kafka Connect Resetter Helm values. E.g. to override the - # Image Tag etc. - resetter_values: - imageTag: "1.2.3" diff --git a/docs/docs/resources/pipeline-defaults/defaults.yaml b/docs/docs/resources/pipeline-defaults/defaults.yaml index e1e16c56c..a16a0e880 100644 --- a/docs/docs/resources/pipeline-defaults/defaults.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults.yaml @@ -172,10 +172,6 @@ kafka-connector: # Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs config: # required tasks.max: 1 - # Overriding Kafka Connect Resetter Helm values. E.g. to override the - # Image Tag etc. - resetter_values: - imageTag: "1.2.3" # Kafka sink connector # # Child of: KafkaConnector diff --git a/docs/docs/schema/defaults.json b/docs/docs/schema/defaults.json index e18ff92d6..27f3b09f1 100644 --- a/docs/docs/schema/defaults.json +++ b/docs/docs/schema/defaults.json @@ -1371,23 +1371,6 @@ "title": "Prefix", "type": "string" }, - "resetter_namespace": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", - "title": "Resetter Namespace" - }, - "resetter_values": { - "$ref": "#/$defs/HelmAppValues", - "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." - }, "state": { "anyOf": [ { @@ -1522,23 +1505,6 @@ "title": "Prefix", "type": "string" }, - "resetter_namespace": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", - "title": "Resetter Namespace" - }, - "resetter_values": { - "$ref": "#/$defs/HelmAppValues", - "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." - }, "state": { "anyOf": [ { @@ -1628,23 +1594,6 @@ "title": "Prefix", "type": "string" }, - "resetter_namespace": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", - "title": "Resetter Namespace" - }, - "resetter_values": { - "$ref": "#/$defs/HelmAppValues", - "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." - }, "state": { "anyOf": [ { diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 779d3131c..0b3373880 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -1327,23 +1327,6 @@ "title": "Prefix", "type": "string" }, - "resetter_namespace": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", - "title": "Resetter Namespace" - }, - "resetter_values": { - "$ref": "#/$defs/HelmAppValues", - "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." - }, "state": { "anyOf": [ { @@ -1433,23 +1416,6 @@ "title": "Prefix", "type": "string" }, - "resetter_namespace": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": null, - "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", - "title": "Resetter Namespace" - }, - "resetter_values": { - "$ref": "#/$defs/HelmAppValues", - "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." - }, "state": { "anyOf": [ { diff --git a/kpops/component_handlers/kafka_connect/connect_wrapper.py b/kpops/component_handlers/kafka_connect/connect_wrapper.py index 97755b37b..794dd31ee 100644 --- a/kpops/component_handlers/kafka_connect/connect_wrapper.py +++ b/kpops/component_handlers/kafka_connect/connect_wrapper.py @@ -232,3 +232,19 @@ async def delete_connector(self, connector_name: str) -> None: await asyncio.sleep(1) return await self.delete_connector(connector_name) raise KafkaConnectError(response) + + async def reset_offset(self, connector_name: str) -> None: + """Reset the offsets for a connector; the connector must exist, and must be in the STOPPED state. + + API Reference: + https://docs.confluent.io/platform/current/connect/references/restapi.html#delete--connectors-connector-offsets + :param connector_name: Configuration parameters for the connector. + :raises ConnectorNotFoundException: Connector not found + """ + response = await self._client.delete(f"/connectors/{connector_name}/offsets") + if response.status_code == httpx.codes.OK: + log.info(f"Connector {connector_name} offsets reset.") + return + if response.status_code == httpx.codes.NOT_FOUND: + raise ConnectorNotFoundException + raise KafkaConnectError(response) diff --git a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py index de1d1a22f..71fda5781 100644 --- a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py +++ b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py @@ -87,6 +87,24 @@ async def destroy_connector(self, connector_name: str, *, dry_run: bool) -> None f"Connector Destruction: the connector {connector_name} does not exist. Skipping." ) + async def reset_connector(self, connector_name: str, *, dry_run: bool) -> None: + """Reset connector offsets. + + :param connector_name: The connector name. + :param dry_run: Whether the connector reset should be run in dry run mode. + """ + if dry_run: + pass # TODO + else: + try: + await self._connect_wrapper.stop_connector(connector_name) + await self._connect_wrapper.reset_offset(connector_name) + + except ConnectorNotFoundException: + log.warning( + f"Connector reset: the connector {connector_name} does not exist. Skipping." + ) + async def __dry_run_connector_creation( self, connector_config: KafkaConnectorConfig, diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index fd68ffae2..7f9b1ccc0 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -2,101 +2,24 @@ import logging from abc import ABC -from functools import cached_property -from typing import Any, Literal, NoReturn, Self +from typing import Any, NoReturn -import pydantic -from pydantic import Field, PrivateAttr, ValidationInfo, field_validator +from pydantic import PrivateAttr, ValidationInfo, field_validator from typing_extensions import override from kpops.component_handlers import get_handlers -from kpops.component_handlers.helm_wrapper.model import ( - HelmRepoConfig, -) from kpops.component_handlers.kafka_connect.model import ( ConnectorNewState, KafkaConnectorConfig, KafkaConnectorType, ) -from kpops.components.base_components.cleaner import Cleaner -from kpops.components.base_components.helm_app import HelmAppValues from kpops.components.base_components.models.from_section import FromTopic from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.components.common.topic import KafkaTopic -from kpops.config import get_config -from kpops.utils.colorify import magentaify -from kpops.utils.pydantic import CamelCaseConfigModel, SkipGenerate log = logging.getLogger("KafkaConnector") -class KafkaConnectorResetterConfig(CamelCaseConfigModel): - brokers: str - connector: str - delete_consumer_group: bool | None = None - offset_topic: str | None = None - - -class KafkaConnectorResetterValues(HelmAppValues): - connector_type: Literal["source", "sink"] - config: KafkaConnectorResetterConfig - - -class KafkaConnectorResetter(Cleaner, ABC): - """Helm app for resetting and cleaning a Kafka Connector. - - :param repo_config: Configuration of the Helm chart repo to be used for - deploying the component, defaults to kafka-connect-resetter Helm repo - :param version: Helm chart version, defaults to "1.0.4" - """ - - from_: None = None # pyright: ignore[reportIncompatibleVariableOverride] - to: None = None # pyright: ignore[reportIncompatibleVariableOverride] - values: KafkaConnectorResetterValues # pyright: ignore[reportIncompatibleVariableOverride] - repo_config: SkipGenerate[HelmRepoConfig] = HelmRepoConfig( # pyright: ignore[reportIncompatibleVariableOverride] - repository_name="bakdata-kafka-connect-resetter", - url="https://bakdata.github.io/kafka-connect-resetter/", - ) - version: str | None = "1.0.4" - - @property - @override - def helm_chart(self) -> str: - return f"{self.repo_config.repository_name}/kafka-connect-resetter" - - @override - async def reset(self, dry_run: bool) -> None: - """Reset connector. - - At first, it deletes the previous cleanup job (connector resetter) - to make sure that there is no running clean job in the cluster. Then it releases a cleanup job. - If retain_clean_jobs config is set to false the cleanup job will be deleted subsequently. - - :param dry_run: If the cleanup should be run in dry run mode or not - """ - log.info( - magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.values.config.connector}" - ) - ) - await self.destroy(dry_run) - - log.info( - magentaify( - f"Connector Cleanup: deploy Connect {self.values.connector_type} resetter for {self.values.config.connector}" - ) - ) - await self.deploy(dry_run) - - if not get_config().retain_clean_jobs: - log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) - await self.destroy(dry_run) - - @override - async def clean(self, dry_run: bool) -> None: - await self.reset(dry_run) - - class KafkaConnector(PipelineComponent, ABC): """Base class for all Kafka connectors. @@ -104,15 +27,10 @@ class KafkaConnector(PipelineComponent, ABC): :param config: Connector config :param state: Connector state - :param resetter_namespace: Kubernetes namespace in which the Kafka Connect resetter shall be deployed - :param resetter_values: Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc., - defaults to empty HelmAppValues """ config: KafkaConnectorConfig state: ConnectorNewState | None = None - resetter_namespace: str | None = None - resetter_values: HelmAppValues = Field(default_factory=HelmAppValues) _connector_type: KafkaConnectorType = PrivateAttr() @field_validator("config", mode="before") @@ -132,35 +50,6 @@ def connector_config_should_have_component_name( config["name"] = component_name return KafkaConnectorConfig.model_validate(config) - @cached_property - def _resetter(self) -> KafkaConnectorResetter: - kwargs: dict[str, Any] = {} - if self.resetter_namespace: - kwargs["namespace"] = self.resetter_namespace - return KafkaConnectorResetter( - **kwargs, - **self.model_dump( - by_alias=True, - exclude={ - "_resetter", - "resetter_values", - "resetter_namespace", - "values", - "config", - "from_", - "to", - }, - ), - values=KafkaConnectorResetterValues( - connector_type=self._connector_type.value, - config=KafkaConnectorResetterConfig( - connector=self.full_name, - brokers=get_config().kafka_brokers, - ), - **self.resetter_values.model_dump(), - ), - ) - @override async def deploy(self, dry_run: bool) -> None: """Deploy Kafka Connector (Source/Sink). Create output topics and register schemas if configured.""" @@ -182,9 +71,17 @@ async def destroy(self, dry_run: bool) -> None: self.full_name, dry_run=dry_run ) + @override + async def reset(self, dry_run: bool) -> None: + """Reset connector offsets without deleting the connector.""" + await get_handlers().connector_handler.reset_connector( + self.full_name, dry_run=dry_run + ) + @override async def clean(self, dry_run: bool) -> None: """Delete Kafka Connector. If schema handler is enabled, then remove schemas. Delete all the output topics.""" + await self.reset(dry_run) await super().clean(dry_run) if self.to: if schema_handler := get_handlers().schema_handler: @@ -205,29 +102,11 @@ class KafkaSourceConnector(KafkaConnector): _connector_type: KafkaConnectorType = PrivateAttr(KafkaConnectorType.SOURCE) - @pydantic.model_validator(mode="after") - def populate_offset_topic(self) -> Self: - if self.offset_topic: - self._resetter.values.config.offset_topic = self.offset_topic - return self - @override def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: msg = "Kafka source connector doesn't support FromSection" raise NotImplementedError(msg) - @override - async def reset(self, dry_run: bool) -> None: - """Reset state. Keep connector.""" - await super().reset(dry_run) - await self._resetter.reset(dry_run) - - @override - async def clean(self, dry_run: bool) -> None: - """Delete connector and reset state.""" - await super().clean(dry_run) - await self._resetter.clean(dry_run) - class KafkaSinkConnector(KafkaConnector): """Kafka sink connector model.""" @@ -250,17 +129,3 @@ def set_input_pattern(self, name: str) -> None: @override def set_error_topic(self, topic: KafkaTopic) -> None: self.config.errors_deadletterqueue_topic_name = topic - - @override - async def reset(self, dry_run: bool) -> None: - """Reset state. Keep consumer group and connector.""" - await super().reset(dry_run) - self._resetter.values.config.delete_consumer_group = False - await self._resetter.reset(dry_run) - - @override - async def clean(self, dry_run: bool) -> None: - """Delete connector and consumer group.""" - await super().clean(dry_run) - self._resetter.values.config.delete_consumer_group = True - await self._resetter.clean(dry_run) diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index 237b18461..9e873ddd1 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -19,7 +19,6 @@ "${pipeline.name}-" + "test-connector-with-long-612f3-clean" ) CONNECTOR_CLASS = "com.bakdata.connect.TestConnector" -RESETTER_NAMESPACE = "test-namespace" @pytest.mark.usefixtures("mock_env") @@ -49,7 +48,6 @@ def connector(self, connector_config: KafkaConnectorConfig) -> KafkaConnector: return KafkaConnector( # HACK: not supposed to be instantiated, because ABC name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, ) def test_connector_config_name_override(self, connector: KafkaConnector): @@ -58,7 +56,6 @@ def test_connector_config_name_override(self, connector: KafkaConnector): connector = KafkaConnector( name=CONNECTOR_NAME, config={"connector.class": CONNECTOR_CLASS}, # pyright: ignore[reportArgumentType], gets enriched - resetter_namespace=RESETTER_NAMESPACE, ) assert connector.config.name == CONNECTOR_FULL_NAME diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 4d586c501..2aab3be4b 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -1,21 +1,16 @@ -from unittest.mock import ANY, MagicMock, call +from unittest.mock import MagicMock, call import pytest from pytest_mock import MockerFixture from typing_extensions import override from kpops.component_handlers import get_handlers -from kpops.component_handlers.helm_wrapper.model import ( - HelmUpgradeInstallFlags, - RepoAuthFlags, -) from kpops.component_handlers.kafka_connect.model import ( ConnectorNewState, KafkaConnectorConfig, KafkaConnectorType, ) from kpops.components.base_components.kafka_connector import ( - KafkaConnectorResetter, KafkaSinkConnector, ) from kpops.components.base_components.models import TopicName @@ -34,12 +29,8 @@ ) from kpops.utils.colorify import magentaify from tests.components.test_kafka_connector import ( - CONNECTOR_CLEAN_FULL_NAME, - CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - CONNECTOR_CLEAN_RELEASE_NAME, CONNECTOR_FULL_NAME, CONNECTOR_NAME, - RESETTER_NAMESPACE, TestKafkaConnector, ) @@ -57,7 +48,6 @@ def connector(self, connector_config: KafkaConnectorConfig) -> KafkaSinkConnecto return KafkaSinkConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, to=ToSection( topics={ TopicName("${output_topic_name}"): TopicConfig( @@ -67,41 +57,6 @@ def connector(self, connector_config: KafkaConnectorConfig) -> KafkaSinkConnecto ), ) - def test_resetter(self, connector: KafkaSinkConnector): - resetter = connector._resetter - assert isinstance(resetter, KafkaConnectorResetter) - assert resetter.full_name == CONNECTOR_CLEAN_FULL_NAME - - def test_resetter_release_name(self, connector: KafkaSinkConnector): - assert connector.config.name == CONNECTOR_FULL_NAME - assert connector._resetter.helm_release_name == CONNECTOR_CLEAN_RELEASE_NAME - - def test_resetter_helm_name_override(self, connector: KafkaSinkConnector): - assert ( - connector._resetter.to_helm_values()["nameOverride"] - == CONNECTOR_CLEAN_HELM_NAMEOVERRIDE - ) - assert ( - connector._resetter.to_helm_values()["fullnameOverride"] - == CONNECTOR_CLEAN_HELM_NAMEOVERRIDE - ) - - def test_resetter_inheritance(self, connector: KafkaSinkConnector): - setattr(connector.resetter_values, "testKey", "foo") - resetter = connector._resetter - assert resetter - assert not hasattr(resetter, "_resetter") - - assert not hasattr(resetter, "resetter_namespace") - assert resetter.namespace == connector.resetter_namespace - - assert not hasattr(resetter, "resetter_values") - # check that resetter values are contained in resetter app values - assert ( - connector.resetter_values.model_dump().items() - <= resetter.values.model_dump().items() - ) - def test_connector_config_parsing(self, connector_config: KafkaConnectorConfig): topic_pattern = ".*" connector = KafkaSinkConnector( @@ -112,7 +67,6 @@ def test_connector_config_parsing(self, connector_config: KafkaConnectorConfig): "topics.regex": topic_pattern, } ), - resetter_namespace=RESETTER_NAMESPACE, ) assert connector.config.topics_regex == topic_pattern assert connector.config.model_dump()["topics.regex"] == topic_pattern @@ -125,7 +79,6 @@ def test_from_section_parsing_input_topic( connector = KafkaSinkConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, from_=FromSection( topics={ topic1: FromTopic(type=InputTopicTypes.INPUT), @@ -160,7 +113,6 @@ def test_from_section_parsing_input_pattern( connector = KafkaSinkConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, from_=FromSection( topics={topic_pattern: FromTopic(type=InputTopicTypes.PATTERN)} ), @@ -239,13 +191,11 @@ async def test_reset_when_dry_run_is_true( mocker: MockerFixture, ): mock_destroy = mocker.patch.object(connector, "destroy") - mock_resetter_reset = mocker.spy(connector._resetter, "reset") dry_run = True await connector.reset(dry_run=dry_run) - mock_destroy.assert_called_once_with(dry_run) - mock_resetter_reset.assert_called_once_with(dry_run) - dry_run_handler_mock.print_helm_diff.assert_called_once() + mock_destroy.assert_not_called() + dry_run_handler_mock.print_helm_diff.assert_not_called() async def test_reset_when_dry_run_is_false( self, @@ -261,7 +211,6 @@ async def test_reset_when_dry_run_is_false( mock_clean_connector = mocker.patch.object( get_handlers().connector_handler, "clean_connector" ) - mock_resetter_reset = mocker.spy(connector._resetter, "reset") mock = mocker.MagicMock() mock.attach_mock(mock_destroy, "destroy_connector") @@ -271,53 +220,8 @@ async def test_reset_when_dry_run_is_false( dry_run = False await connector.reset(dry_run=dry_run) - mock_resetter_reset.assert_called_once_with(dry_run) - mock.assert_has_calls( - [ - mocker.call.destroy_connector(dry_run), - mocker.call.helm.add_repo( - "bakdata-kafka-connect-resetter", - "https://bakdata.github.io/kafka-connect-resetter/", - RepoAuthFlags(), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - mocker.call.helm.upgrade_install( - CONNECTOR_CLEAN_RELEASE_NAME, - "bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run, - RESETTER_NAMESPACE, - { - "nameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "fullnameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "connectorType": CONNECTOR_TYPE, - "config": { - "brokers": "broker:9092", - "connector": CONNECTOR_FULL_NAME, - "deleteConsumerGroup": False, - }, - }, - HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - ] - ) + mock.assert_has_calls([]) dry_run_handler_mock.print_helm_diff.assert_not_called() mock_delete_topic.assert_not_called() @@ -379,46 +283,6 @@ async def test_clean_when_dry_run_is_false( mocker.call.mock_delete_topic(topic, dry_run=dry_run) for topic in connector.to.kafka_topics ), - mocker.call.helm.add_repo( - "bakdata-kafka-connect-resetter", - "https://bakdata.github.io/kafka-connect-resetter/", - RepoAuthFlags(), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - mocker.call.helm.upgrade_install( - CONNECTOR_CLEAN_RELEASE_NAME, - "bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run, - RESETTER_NAMESPACE, - { - "nameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "fullnameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "connectorType": CONNECTOR_TYPE, - "config": { - "brokers": "broker:9092", - "connector": CONNECTOR_FULL_NAME, - "deleteConsumerGroup": True, - }, - }, - HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ ] dry_run_handler_mock.print_helm_diff.assert_not_called() @@ -430,7 +294,6 @@ async def test_clean_without_to_when_dry_run_is_true( connector = KafkaSinkConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, ) dry_run = True @@ -448,7 +311,6 @@ async def test_clean_without_to_when_dry_run_is_false( connector = KafkaSinkConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, ) mock_destroy = mocker.patch.object(connector, "destroy") @@ -470,51 +332,6 @@ async def test_clean_without_to_when_dry_run_is_false( assert mock.mock_calls == [ mocker.call.destroy_connector(dry_run), - mocker.call.helm.add_repo( - "bakdata-kafka-connect-resetter", - "https://bakdata.github.io/kafka-connect-resetter/", - RepoAuthFlags( - username=None, - password=None, - ca_file=None, - insecure_skip_tls_verify=False, - ), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - mocker.call.helm.upgrade_install( - CONNECTOR_CLEAN_RELEASE_NAME, - "bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run, - RESETTER_NAMESPACE, - { - "nameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "fullnameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "connectorType": CONNECTOR_TYPE, - "config": { - "brokers": "broker:9092", - "connector": CONNECTOR_FULL_NAME, - "deleteConsumerGroup": True, - }, - }, - HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ ] dry_run_handler_mock.print_helm_diff.assert_not_called() diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index af89bcb8b..243cc24d8 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -1,21 +1,16 @@ -from unittest.mock import ANY, MagicMock +from unittest.mock import MagicMock import pytest from pytest_mock import MockerFixture from typing_extensions import override from kpops.component_handlers import get_handlers -from kpops.component_handlers.helm_wrapper.model import ( - HelmUpgradeInstallFlags, - RepoAuthFlags, -) from kpops.component_handlers.kafka_connect.model import ( ConnectorNewState, KafkaConnectorConfig, KafkaConnectorType, ) from kpops.components.base_components.kafka_connector import ( - KafkaConnectorResetter, KafkaSourceConnector, ) from kpops.components.base_components.models import TopicName @@ -28,13 +23,9 @@ ToSection, ) from kpops.components.common.topic import OutputTopicTypes, TopicConfig -from kpops.utils.environment import ENV from tests.components.test_kafka_connector import ( - CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - CONNECTOR_CLEAN_RELEASE_NAME, CONNECTOR_FULL_NAME, CONNECTOR_NAME, - RESETTER_NAMESPACE, TestKafkaConnector, ) @@ -53,7 +44,6 @@ def connector( return KafkaSourceConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, to=ToSection( topics={ TopicName("${output_topic_name}"): TopicConfig( @@ -64,15 +54,6 @@ def connector( offset_topic=OFFSETS_TOPIC, ) - def test_resetter_release_name(self, connector: KafkaSourceConnector): - assert connector.config.name == CONNECTOR_FULL_NAME - resetter = connector._resetter - assert isinstance(resetter, KafkaConnectorResetter) - assert connector._resetter.helm_release_name == CONNECTOR_CLEAN_RELEASE_NAME - - def test_resetter_offset_topic(self, connector: KafkaSourceConnector): - assert connector._resetter.values.config.offset_topic == OFFSETS_TOPIC - def test_from_section_raises_exception( self, connector_config: KafkaConnectorConfig, @@ -81,7 +62,6 @@ def test_from_section_raises_exception( KafkaSourceConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, from_=FromSection( topics={ TopicName("connector-topic"): FromTopic( @@ -147,7 +127,6 @@ async def test_destroy( connector: KafkaSourceConnector, mocker: MockerFixture, ): - ENV["KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC"] = OFFSETS_TOPIC assert get_handlers().connector_handler mock_destroy_connector = mocker.patch.object( @@ -167,13 +146,11 @@ async def test_reset_when_dry_run_is_true( mocker: MockerFixture, ): mock_destroy = mocker.patch.object(connector, "destroy") - mock_resetter_reset = mocker.spy(connector._resetter, "reset") dry_run = True await connector.reset(dry_run=dry_run) - mock_destroy.assert_called_once_with(dry_run) - mock_resetter_reset.assert_called_once_with(dry_run) - dry_run_handler_mock.print_helm_diff.assert_called_once() + mock_destroy.assert_not_called() + dry_run_handler_mock.print_helm_diff.assert_not_called() async def test_reset_when_dry_run_is_false( self, @@ -199,49 +176,7 @@ async def test_reset_when_dry_run_is_false( dry_run = False await connector.reset(dry_run) - assert mock.mock_calls == [ - mocker.call.destroy_connector(dry_run), - mocker.call.helm.add_repo( - "bakdata-kafka-connect-resetter", - "https://bakdata.github.io/kafka-connect-resetter/", - RepoAuthFlags(), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - mocker.call.helm.upgrade_install( - CONNECTOR_CLEAN_RELEASE_NAME, - "bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run, - RESETTER_NAMESPACE, - { - "connectorType": CONNECTOR_TYPE, - "config": { - "brokers": "broker:9092", - "connector": CONNECTOR_FULL_NAME, - "offsetTopic": OFFSETS_TOPIC, - }, - "nameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "fullnameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - }, - HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - ] + assert mock.mock_calls == [] mock_delete_topic.assert_not_called() dry_run_handler_mock.print_helm_diff.assert_not_called() @@ -252,7 +187,7 @@ async def test_clean_when_dry_run_is_true( ): await connector.clean(dry_run=True) - dry_run_handler_mock.print_helm_diff.assert_called_once() + dry_run_handler_mock.print_helm_diff.assert_not_called() async def test_clean_when_dry_run_is_false( self, @@ -286,46 +221,6 @@ async def test_clean_when_dry_run_is_false( mocker.call.mock_delete_topic(topic, dry_run=dry_run) for topic in connector.to.kafka_topics ), - mocker.call.helm.add_repo( - "bakdata-kafka-connect-resetter", - "https://bakdata.github.io/kafka-connect-resetter/", - RepoAuthFlags(), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - mocker.call.helm.upgrade_install( - CONNECTOR_CLEAN_RELEASE_NAME, - "bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run, - RESETTER_NAMESPACE, - { - "nameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "fullnameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "connectorType": CONNECTOR_TYPE, - "config": { - "brokers": "broker:9092", - "connector": CONNECTOR_FULL_NAME, - "offsetTopic": OFFSETS_TOPIC, - }, - }, - HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ ] dry_run_handler_mock.print_helm_diff.assert_not_called() @@ -340,7 +235,6 @@ async def test_clean_without_to_when_dry_run_is_false( connector = KafkaSourceConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, offset_topic=OFFSETS_TOPIC, ) assert connector.to is None @@ -367,46 +261,6 @@ async def test_clean_without_to_when_dry_run_is_false( assert mock.mock_calls == [ mocker.call.destroy_connector(dry_run), - mocker.call.helm.add_repo( - "bakdata-kafka-connect-resetter", - "https://bakdata.github.io/kafka-connect-resetter/", - RepoAuthFlags(), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - mocker.call.helm.upgrade_install( - CONNECTOR_CLEAN_RELEASE_NAME, - "bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run, - RESETTER_NAMESPACE, - { - "nameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "fullnameOverride": CONNECTOR_CLEAN_HELM_NAMEOVERRIDE, - "connectorType": CONNECTOR_TYPE, - "config": { - "brokers": "broker:9092", - "connector": CONNECTOR_FULL_NAME, - "offsetTopic": OFFSETS_TOPIC, - }, - }, - HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - ), - mocker.call.helm.uninstall( - RESETTER_NAMESPACE, - CONNECTOR_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ ] mock_delete_topic.assert_not_called() @@ -420,7 +274,6 @@ async def test_clean_without_to_when_dry_run_is_true( connector = KafkaSourceConnector( name=CONNECTOR_NAME, config=connector_config, - resetter_namespace=RESETTER_NAMESPACE, offset_topic=OFFSETS_TOPIC, ) assert connector.to is None @@ -429,4 +282,4 @@ async def test_clean_without_to_when_dry_run_is_true( await connector.clean(dry_run=True) - dry_run_handler_mock.print_helm_diff.assert_called_once() + dry_run_handler_mock.print_helm_diff.assert_not_called() diff --git a/tests/pipeline/test_generate.py b/tests/pipeline/test_generate.py index 57d3ddf7a..00c7cd5f1 100644 --- a/tests/pipeline/test_generate.py +++ b/tests/pipeline/test_generate.py @@ -877,32 +877,6 @@ def test_temp_trim_release_name(self): == "in-order-to-have-len-fifty-two-name-should-end--here" ) - def test_substitution_in_inflated_component(self): - pipeline = kpops.generate(RESOURCE_PATH / "resetter_values" / PIPELINE_YAML) - assert isinstance(pipeline.components[1], KafkaSinkConnector) - assert ( - pipeline.components[1]._resetter.values.label == "inflated-connector-name" # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType] - ) - assert ( - pipeline.components[1]._resetter.values.imageTag # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType] - == "override-default-image-tag" - ) - - def test_substitution_in_resetter(self): - pipeline = kpops.generate( - RESOURCE_PATH - / "resetter_values" - / KpopsFileType.PIPELINE.as_yaml_file(suffix="_connector_only"), - ) - assert isinstance(pipeline.components[0], KafkaSinkConnector) - assert pipeline.components[0].name == "es-sink-connector" - assert pipeline.components[0]._resetter.name == "es-sink-connector" - assert pipeline.components[0]._resetter.values.label == "es-sink-connector" # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType] - assert ( - pipeline.components[0]._resetter.values.imageTag # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType] - == "override-default-image-tag" - ) - def test_streams_bootstrap(self, snapshot: Snapshot): pipeline = kpops.generate( RESOURCE_PATH / "streams-bootstrap" / PIPELINE_YAML,