From a4e47ea055cc761c5b9ce9cf401a52a219a37efd Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+jkbe@users.noreply.github.com> Date: Wed, 25 Feb 2026 16:30:31 +0100 Subject: [PATCH 1/8] WIP: Allow preconfiguration of consumer builder and producer builder --- .../kafka/consumer/ConsumerBuilder.java | 2 ++ .../kafka/producer/ProducerBuilder.java | 26 ++++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java index 3bbe0b7de..9ba875972 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java @@ -79,6 +79,8 @@ public Consumer createConsumer(final Deserializer keyDeserialize return new KafkaConsumer<>(this.kafkaProperties, keyDeserializer, valueDeserializer); } + // TODO: + /** * Create {@code Configurator} to configure {@link Serde} and {@link Deserializer} using {@link #kafkaProperties}. * diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java index 50430dc2d..6d2c40c73 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java @@ -26,12 +26,12 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.Configurator; +import com.bakdata.kafka.Preconfigured; import java.util.Map; import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import lombok.Value; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; /** @@ -50,9 +50,10 @@ public class ProducerBuilder { /** * Create a new {@link Producer} using {@link #kafkaProperties} - * @return {@link Producer} + * * @param type of keys * @param type of values + * @return {@link Producer} * @see KafkaProducer#KafkaProducer(Map) */ public Producer createProducer() { @@ -61,21 +62,31 @@ public Producer createProducer() { /** * Create a new {@link Producer} using {@link #kafkaProperties} and provided {@link Serializer Serializers} + * * @param keySerializer {@link Serializer} to use for keys * @param valueSerializer {@link Serializer} to use for values - * @return {@link Producer} * @param type of keys * @param type of values + * @return {@link Producer} * @see KafkaProducer#KafkaProducer(Map, Serializer, Serializer) */ public Producer createProducer(final Serializer keySerializer, final Serializer valueSerializer) { - return new KafkaProducer<>(this.kafkaProperties, keySerializer, valueSerializer); + return this.createProducer(Preconfigured.create(keySerializer), Preconfigured.create(valueSerializer)); + } + + // TODO: docs + public Producer createProducer(final Preconfigured> keySerializer, + final Preconfigured> valueSerializer) { + final Serializer configuredKeySerializer = keySerializer.configureForKeys(this.kafkaProperties); + final Serializer configuredValueSerializer = valueSerializer.configureForValues(this.kafkaProperties); + + return new KafkaProducer<>(this.kafkaProperties, configuredKeySerializer, configuredValueSerializer); } /** - * Create {@link Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and - * {@link Serializer} using {@link #kafkaProperties}. + * Create {@link Configurator} to configure {@link Serde} and {@link Serializer} using {@link #kafkaProperties}. + * * @return {@link Configurator} */ public Configurator createConfigurator() { @@ -84,6 +95,7 @@ public Configurator createConfigurator() { /** * Create {@link AppConfiguration} used by this app + * * @return {@link AppConfiguration} */ public AppConfiguration createConfiguration() { From 8e7c6429f3794646e205bf0439eac97b5139a429 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+jkbe@users.noreply.github.com> Date: Wed, 11 Mar 2026 16:54:40 +0100 Subject: [PATCH 2/8] Bump CI to print snapshot versions to job summary --- .github/workflows/build-and-publish.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index 3998a12b7..83d6ab12b 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -10,7 +10,7 @@ on: jobs: build-and-publish: name: Java Gradle - uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.75.0 + uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@feat/output-publish-artifacts with: java-version: 17 secrets: From 42a75dc99ab5d7b1edefbd2b1e539421dcc494f3 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+jkbe@users.noreply.github.com> Date: Wed, 11 Mar 2026 18:35:21 +0100 Subject: [PATCH 3/8] Add missing imports --- .../main/java/com/bakdata/kafka/producer/ProducerBuilder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java index 6d2c40c73..46260156b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,6 +29,8 @@ import com.bakdata.kafka.Preconfigured; import java.util.Map; import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Value; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.Serde; From 75f3fb1f69d0138fe968664466ce9a97b2761918 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+jkbe@users.noreply.github.com> Date: Fri, 13 Mar 2026 11:33:25 +0100 Subject: [PATCH 4/8] Add tests for preconfigured SerDe in producer builder --- .../kafka/producer/ProducerBuilderTest.java | 215 ++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 streams-bootstrap-core/src/test/java/com/bakdata/kafka/producer/ProducerBuilderTest.java diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/producer/ProducerBuilderTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/producer/ProducerBuilderTest.java new file mode 100644 index 000000000..a043be6a9 --- /dev/null +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/producer/ProducerBuilderTest.java @@ -0,0 +1,215 @@ +/* + * MIT License + * + * Copyright (c) 2026 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.producer; + +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.Preconfigured; +import com.bakdata.kafka.TestHelper; +import com.bakdata.kafka.TestRecord; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +class ProducerBuilderTest extends KafkaTest { + + @InjectSoftAssertions + private SoftAssertions softly; + + private static ConfiguredProducerApp createApp(final ProducerApp app) { + final ProducerTopicConfig topics = ProducerTopicConfig.builder() + .outputTopic("output") + .build(); + return new ConfiguredProducerApp<>(app, new ProducerAppConfiguration(topics)); + } + + private List> readOutputTopic() { + final List> records = this.newTestClient().read() + .withKeyDeserializer(new StringDeserializer()) + .withValueDeserializer(new StringDeserializer()) + .from("output", POLL_TIMEOUT); + + return records.stream() + .map(TestHelper::toKeyValue) + .toList(); + } + + private List> readOutputTopicAvro() { + final List> records = this.newTestClient().read() + .withKeyDeserializer(new SpecificAvroDeserializer()) + .withValueDeserializer(new SpecificAvroDeserializer()) + .from("output", POLL_TIMEOUT); + + return records.stream() + .map(TestHelper::toKeyValue) + .toList(); + } + + @Test + void shouldCreateProducerWithDefaultSerializers() { + final ProducerApp app = new ProducerApp() { + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer()) { + producer.send(new ProducerRecord<>("output", "foo", "bar")); + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + }; + + try (final ProducerRunner runner = createApp(app) + .withRuntimeConfiguration(this.createConfig()) + .createRunner()) { + runner.run(); + + final List> output = this.readOutputTopic(); + this.softly.assertThat(output) + .containsExactly(new KeyValue<>("foo", "bar")); + } + } + + @Test + void shouldCreateProducerWithStringSerializers() { + final ProducerApp app = new ProducerApp() { + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer( + new StringSerializer(), new StringSerializer())) { + producer.send(new ProducerRecord<>("output", "foo", "bar")); + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + // Important: Do not set StringSerializer as default to test that serializers provided in + // createProducer are used + return new SerializerConfig(ByteArraySerializer.class, ByteArraySerializer.class); + } + }; + + try (final ProducerRunner runner = createApp(app) + .withRuntimeConfiguration(this.createConfig()) + .createRunner()) { + runner.run(); + + final List> output = this.readOutputTopic(); + this.softly.assertThat(output) + .containsExactly(new KeyValue<>("foo", "bar")); + } + } + + @Test + void shouldCreateProducerWithAvroSerializersRequiringConfiguredSchemaRegistryUrl() { + final ProducerApp app = new ProducerApp() { + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer( + new SpecificAvroSerializer<>(), new SpecificAvroSerializer<>())) { + producer.send(new ProducerRecord<>("output", + TestRecord.newBuilder().setContent("foo").build(), + TestRecord.newBuilder().setContent("bar").build())); + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + // Important: Do not set SpecificAvroSerializer as default so that schema.registry.url is not yet + // configured + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + }; + + try (final ProducerRunner runner = createApp(app) + .withRuntimeConfiguration(this.createConfigWithSchemaRegistry()) + .createRunner()) { + runner.run(); + + final List> output = this.readOutputTopicAvro(); + this.softly.assertThat(output) + .containsExactly(new KeyValue<>( + TestRecord.newBuilder().setContent("foo").build(), + TestRecord.newBuilder().setContent("bar").build())); + } + } + + @Test + void shouldCreateProducerWithPreconfiguredAvroSerializersRequiringConfiguredSchemaRegistryUrl() { + final ProducerApp app = new ProducerApp() { + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer( + Preconfigured.create(new SpecificAvroSerializer<>()), + Preconfigured.create(new SpecificAvroSerializer<>()))) { + producer.send(new ProducerRecord<>("output", + TestRecord.newBuilder().setContent("foo").build(), + TestRecord.newBuilder().setContent("bar").build())); + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + // Important: Do not set SpecificAvroSerializer as default so that schema.registry.url is not yet + // configured + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + }; + + try (final ProducerRunner runner = createApp(app) + .withRuntimeConfiguration(this.createConfigWithSchemaRegistry()) + .createRunner()) { + runner.run(); + + final List> output = this.readOutputTopicAvro(); + this.softly.assertThat(output) + .containsExactly(new KeyValue<>( + TestRecord.newBuilder().setContent("foo").build(), + TestRecord.newBuilder().setContent("bar").build())); + } + } +} From 88bf6741be5796c00a9413840af03d8d12d495f0 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+jkbe@users.noreply.github.com> Date: Fri, 13 Mar 2026 11:47:02 +0100 Subject: [PATCH 5/8] Add Javadoc --- .../bakdata/kafka/producer/ProducerBuilder.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java index 46260156b..c3f4d8905 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/producer/ProducerBuilder.java @@ -63,7 +63,8 @@ public Producer createProducer() { } /** - * Create a new {@link Producer} using {@link #kafkaProperties} and provided {@link Serializer Serializers} + * Create a new {@link Producer} using {@link #kafkaProperties} and provided {@link Serializer Serializers}. The + * serializers will be configured automatically. * * @param keySerializer {@link Serializer} to use for keys * @param valueSerializer {@link Serializer} to use for values @@ -77,7 +78,18 @@ public Producer createProducer(final Serializer keySerializer, return this.createProducer(Preconfigured.create(keySerializer), Preconfigured.create(valueSerializer)); } - // TODO: docs + /** + * Create a new {@link Producer} using {@link #kafkaProperties} and provided {@link Preconfigured} serializers. The + * preconfiguration will be resolved to yield configured {@link Serializer Serializers} that are used to create the + * new {@link Producer}. + * + * @param keySerializer {@link Preconfigured} to use for keys + * @param valueSerializer {@link Preconfigured} to use for values + * @param type of keys + * @param type of values + * @return {@link Producer} + * @see KafkaProducer#KafkaProducer(Map, Serializer, Serializer) + */ public Producer createProducer(final Preconfigured> keySerializer, final Preconfigured> valueSerializer) { final Serializer configuredKeySerializer = keySerializer.configureForKeys(this.kafkaProperties); From ea791895788eb7cf348d816dc847198c138b0771 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+jkbe@users.noreply.github.com> Date: Fri, 13 Mar 2026 14:26:27 +0100 Subject: [PATCH 6/8] Fix hint in and simplify default serialization configs --- .../bakdata/kafka/producer/ProducerBuilderTest.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/producer/ProducerBuilderTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/producer/ProducerBuilderTest.java index a043be6a9..6a9ce16fc 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/producer/ProducerBuilderTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/producer/ProducerBuilderTest.java @@ -123,8 +123,6 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { @Override public SerializerConfig defaultSerializationConfig() { - // Important: Do not set StringSerializer as default to test that serializers provided in - // createProducer are used return new SerializerConfig(ByteArraySerializer.class, ByteArraySerializer.class); } }; @@ -157,9 +155,7 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { @Override public SerializerConfig defaultSerializationConfig() { - // Important: Do not set SpecificAvroSerializer as default so that schema.registry.url is not yet - // configured - return new SerializerConfig(StringSerializer.class, StringSerializer.class); + return new SerializerConfig(ByteArraySerializer.class, ByteArraySerializer.class); } }; @@ -194,9 +190,7 @@ public ProducerRunnable buildRunnable(final ProducerBuilder builder) { @Override public SerializerConfig defaultSerializationConfig() { - // Important: Do not set SpecificAvroSerializer as default so that schema.registry.url is not yet - // configured - return new SerializerConfig(StringSerializer.class, StringSerializer.class); + return new SerializerConfig(ByteArraySerializer.class, ByteArraySerializer.class); } }; From 4e45060407938133f70cbf0159a0fa97562c5d23 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+jkbe@users.noreply.github.com> Date: Fri, 13 Mar 2026 15:00:54 +0100 Subject: [PATCH 7/8] Apply autoformatting to existing file --- .../com/bakdata/kafka/consumer/ConsumerBuilder.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java index 9ba875972..b60dbb680 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java @@ -100,10 +100,9 @@ public AppConfiguration createConfiguration() { } /** - * Subscribes the given {@link Consumer} to all input topics and patterns - * configured in {@link #topics}. - * This includes all topics from {@code getInputTopics()}, {@code getLabeledInputTopics()}, - * {@code getInputPattern()}, and {@code getLabeledInputPatterns()}. + * Subscribes the given {@link Consumer} to all input topics and patterns configured in {@link #topics}. This + * includes all topics from {@code getInputTopics()}, {@code getLabeledInputTopics()}, {@code getInputPattern()}, + * and {@code getLabeledInputPatterns()}. * * @param type of keys * @param type of values @@ -125,8 +124,8 @@ public void subscribeToAllTopics(final Consumer consumer) { } /** - * Creates a {@link DefaultConsumerRunnable} using the provided consumer, processor, - * and {@link ConsumerExecutionOptions}. + * Creates a {@link DefaultConsumerRunnable} using the provided consumer, processor, and + * {@link ConsumerExecutionOptions}. * * @param type of keys * @param type of values From 1eac222d96eb65cebb125565c1af220dd8d5d4c2 Mon Sep 17 00:00:00 2001 From: Jakob Edding <15202881+jkbe@users.noreply.github.com> Date: Fri, 13 Mar 2026 18:52:22 +0100 Subject: [PATCH 8/8] Add deserializer preconfiguration to consumer builder --- .../kafka/consumer/ConsumerBuilder.java | 27 +- .../kafka/consumer/ConsumerBuilderTest.java | 255 ++++++++++++++++++ 2 files changed, 279 insertions(+), 3 deletions(-) create mode 100644 streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerBuilderTest.java diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java index b60dbb680..4b69a3b82 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/consumer/ConsumerBuilder.java @@ -26,6 +26,7 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.Configurator; +import com.bakdata.kafka.Preconfigured; import java.util.Map; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -65,7 +66,8 @@ public Consumer createConsumer() { } /** - * Create a new {@code Consumer} using {@link #kafkaProperties} and provided {@code Deserializers} + * Create a new {@code Consumer} using {@link #kafkaProperties} and provided {@code Deserializer Deserializers}. The + * deserializers will be configured automatically. * * @param keyDeserializer {@code Deserializer} to use for keys * @param valueDeserializer {@code Deserializer} to use for values @@ -76,10 +78,29 @@ public Consumer createConsumer() { */ public Consumer createConsumer(final Deserializer keyDeserializer, final Deserializer valueDeserializer) { - return new KafkaConsumer<>(this.kafkaProperties, keyDeserializer, valueDeserializer); + return this.createConsumer(Preconfigured.create(keyDeserializer), Preconfigured.create(valueDeserializer)); } - // TODO: + + /** + * Create a new {@code Consumer} using {@link #kafkaProperties} and provided {@code Preconfigured} deserializers. + * The preconfiguration will be resolved to yield configured {@link Deserializer Deserializers} that are used to + * create the new {@link Consumer}. + * + * @param keyDeserializer {@code Preconfigured} to use for keys + * @param valueDeserializer {@code Preconfigured} to use for values + * @param type of keys + * @param type of values + * @return {@code Consumer} + * @see KafkaConsumer#KafkaConsumer(Map, Deserializer, Deserializer) + */ + public Consumer createConsumer(final Preconfigured> keyDeserializer, + final Preconfigured> valueDeserializer) { + final Deserializer configuredKeyDeserializer = keyDeserializer.configureForKeys(this.kafkaProperties); + final Deserializer configuredValueDeserializer = valueDeserializer.configureForKeys(this.kafkaProperties); + + return new KafkaConsumer<>(this.kafkaProperties, configuredKeyDeserializer, configuredValueDeserializer); + } /** * Create {@code Configurator} to configure {@link Serde} and {@link Deserializer} using {@link #kafkaProperties}. diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerBuilderTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerBuilderTest.java new file mode 100644 index 000000000..656638907 --- /dev/null +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/consumer/ConsumerBuilderTest.java @@ -0,0 +1,255 @@ +/* + * MIT License + * + * Copyright (c) 2026 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.consumer; + +import static java.util.concurrent.CompletableFuture.runAsync; + +import com.bakdata.kafka.DeserializerConfig; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.Preconfigured; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; +import com.bakdata.kafka.TestHelper; +import com.bakdata.kafka.TestRecord; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +@ExtendWith(SoftAssertionsExtension.class) +class ConsumerBuilderTest extends KafkaTest { + + @InjectSoftAssertions + private SoftAssertions softly; + + private static ConfiguredConsumerApp createApp(final ConsumerApp app) { + final ConsumerTopicConfig topics = ConsumerTopicConfig.builder() + .inputTopics(List.of("input")) + .build(); + return new ConfiguredConsumerApp<>(app, new ConsumerAppConfiguration(topics)); + } + + private void assertRecords(final List> consumedRecords, final KeyValue expected) { + Awaitility.await() + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> { + final List> consumedKeyValues = consumedRecords + .stream() + .map(TestHelper::toKeyValue) + .toList(); + this.softly.assertThat(consumedKeyValues) + .containsExactly(expected); + }); + } + + @Test + void shouldCreateConsumerWithDefaultDeserializers() { + final List> consumedRecords = new ArrayList<>(); + + final ConsumerApp app = new ConsumerApp() { + @Override + public ConsumerRunnable buildRunnable(final ConsumerBuilder builder) { + final Consumer consumer = builder.createConsumer(); + builder.subscribeToAllTopics(consumer); + return builder.createDefaultConsumerRunnable(consumer, + records -> records.forEach(consumedRecords::add)); + } + + @Override + public DeserializerConfig defaultSerializationConfig() { + return new DeserializerConfig(StringDeserializer.class, StringDeserializer.class); + } + + @Override + public String getUniqueGroupId(final ConsumerAppConfiguration configuration) { + return "test-group"; + } + }; + + try (final ExecutableConsumerApp executableApp = createApp(app) + .withRuntimeConfiguration(this.createConfig()); + final ConsumerRunner runner = executableApp.createRunner()) { + + runAsync(runner); + awaitActive(executableApp); + + final KafkaTestClient testClient = this.newTestClient(); + testClient.send() + .withKeySerializer(new StringSerializer()) + .withValueSerializer(new StringSerializer()) + .to("input", List.of(new SimpleProducerRecord<>("foo", "bar"))); + awaitProcessing(executableApp); + + this.assertRecords(consumedRecords, new KeyValue<>("foo", "bar")); + } + } + + @Test + void shouldCreateConsumerWithStringDeserializers() { + final List> consumedRecords = new ArrayList<>(); + + final ConsumerApp app = new ConsumerApp() { + @Override + public ConsumerRunnable buildRunnable(final ConsumerBuilder builder) { + final Consumer consumer = builder.createConsumer( + new StringDeserializer(), new StringDeserializer()); + builder.subscribeToAllTopics(consumer); + return builder.createDefaultConsumerRunnable(consumer, + records -> records.forEach(consumedRecords::add)); + } + + @Override + public DeserializerConfig defaultSerializationConfig() { + return new DeserializerConfig(ByteArrayDeserializer.class, ByteArrayDeserializer.class); + } + + @Override + public String getUniqueGroupId(final ConsumerAppConfiguration configuration) { + return "test-group"; + } + }; + + try (final ExecutableConsumerApp executableApp = createApp(app) + .withRuntimeConfiguration(this.createConfig()); + final ConsumerRunner runner = executableApp.createRunner()) { + + runAsync(runner); + awaitActive(executableApp); + + final KafkaTestClient testClient = this.newTestClient(); + testClient.send() + .withKeySerializer(new StringSerializer()) + .withValueSerializer(new StringSerializer()) + .to("input", List.of(new SimpleProducerRecord<>("foo", "bar"))); + awaitProcessing(executableApp); + + this.assertRecords(consumedRecords, new KeyValue<>("foo", "bar")); + } + } + + @Test + void shouldCreateConsumerWithAvroDeserializersRequiringConfiguredSchemaRegistryUrl() { + final List> consumedRecords = new ArrayList<>(); + + final ConsumerApp app = new ConsumerApp() { + @Override + public ConsumerRunnable buildRunnable(final ConsumerBuilder builder) { + final Consumer consumer = builder.createConsumer( + new SpecificAvroDeserializer<>(), new SpecificAvroDeserializer<>()); + builder.subscribeToAllTopics(consumer); + return builder.createDefaultConsumerRunnable(consumer, + records -> records.forEach(consumedRecords::add)); + } + + @Override + public DeserializerConfig defaultSerializationConfig() { + return new DeserializerConfig(ByteArrayDeserializer.class, ByteArrayDeserializer.class); + } + + @Override + public String getUniqueGroupId(final ConsumerAppConfiguration configuration) { + return "test-group"; + } + }; + + try (final ExecutableConsumerApp executableApp = createApp(app) + .withRuntimeConfiguration(this.createConfigWithSchemaRegistry()); + final ConsumerRunner runner = executableApp.createRunner()) { + + runAsync(runner); + awaitActive(executableApp); + + final TestRecord key = TestRecord.newBuilder().setContent("foo").build(); + final TestRecord value = TestRecord.newBuilder().setContent("bar").build(); + final KafkaTestClient testClient = this.newTestClient(); + testClient.send() + .withKeySerializer(new SpecificAvroSerializer<>()) + .withValueSerializer(new SpecificAvroSerializer<>()) + .to("input", List.of(new SimpleProducerRecord<>(key, value))); + awaitProcessing(executableApp); + + this.assertRecords(consumedRecords, new KeyValue<>(key, value)); + } + } + + @Test + void shouldCreateConsumerWithPreconfiguredAvroDeserializersRequiringConfiguredSchemaRegistryUrl() { + final List> consumedRecords = new ArrayList<>(); + + final ConsumerApp app = new ConsumerApp() { + @Override + public ConsumerRunnable buildRunnable(final ConsumerBuilder builder) { + final Consumer consumer = builder.createConsumer( + Preconfigured.create(new SpecificAvroDeserializer<>()), + Preconfigured.create(new SpecificAvroDeserializer<>())); + builder.subscribeToAllTopics(consumer); + return builder.createDefaultConsumerRunnable(consumer, + records -> records.forEach(consumedRecords::add)); + } + + @Override + public DeserializerConfig defaultSerializationConfig() { + return new DeserializerConfig(ByteArrayDeserializer.class, ByteArrayDeserializer.class); + } + + @Override + public String getUniqueGroupId(final ConsumerAppConfiguration configuration) { + return "test-group"; + } + }; + + try (final ExecutableConsumerApp executableApp = createApp(app) + .withRuntimeConfiguration(this.createConfigWithSchemaRegistry()); + final ConsumerRunner runner = executableApp.createRunner()) { + + runAsync(runner); + awaitActive(executableApp); + + final TestRecord key = TestRecord.newBuilder().setContent("foo").build(); + final TestRecord value = TestRecord.newBuilder().setContent("bar").build(); + final KafkaTestClient testClient = this.newTestClient(); + testClient.send() + .withKeySerializer(new SpecificAvroSerializer<>()) + .withValueSerializer(new SpecificAvroSerializer<>()) + .to("input", List.of(new SimpleProducerRecord<>(key, value))); + awaitProcessing(executableApp); + + this.assertRecords(consumedRecords, new KeyValue<>(key, value)); + } + } +}