Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
jobs:
build-and-publish:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.75.0
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@feat/output-publish-artifacts
with:
java-version: 17
secrets:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.bakdata.kafka.AppConfiguration;
import com.bakdata.kafka.Configurator;
import com.bakdata.kafka.Preconfigured;
import java.util.Map;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -65,7 +66,8 @@ public <K, V> Consumer<K, V> createConsumer() {
}

/**
* Create a new {@code Consumer} using {@link #kafkaProperties} and provided {@code Deserializers}
* Create a new {@code Consumer} using {@link #kafkaProperties} and provided {@code Deserializer Deserializers}. The
* deserializers will be configured automatically.
*
* @param keyDeserializer {@code Deserializer} to use for keys
* @param valueDeserializer {@code Deserializer} to use for values
Expand All @@ -76,7 +78,28 @@ public <K, V> Consumer<K, V> createConsumer() {
*/
public <K, V> Consumer<K, V> createConsumer(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
return new KafkaConsumer<>(this.kafkaProperties, keyDeserializer, valueDeserializer);
return this.createConsumer(Preconfigured.create(keyDeserializer), Preconfigured.create(valueDeserializer));
}


/**
* Create a new {@code Consumer} using {@link #kafkaProperties} and provided {@code Preconfigured} deserializers.
* The preconfiguration will be resolved to yield configured {@link Deserializer Deserializers} that are used to
* create the new {@link Consumer}.
*
* @param keyDeserializer {@code Preconfigured} to use for keys
* @param valueDeserializer {@code Preconfigured} to use for values
* @param <K> type of keys
* @param <V> type of values
* @return {@code Consumer}
* @see KafkaConsumer#KafkaConsumer(Map, Deserializer, Deserializer)
*/
public <K, V> Consumer<K, V> createConsumer(final Preconfigured<Deserializer<K>> keyDeserializer,
final Preconfigured<Deserializer<V>> valueDeserializer) {
final Deserializer<K> configuredKeyDeserializer = keyDeserializer.configureForKeys(this.kafkaProperties);
final Deserializer<V> configuredValueDeserializer = valueDeserializer.configureForKeys(this.kafkaProperties);

return new KafkaConsumer<>(this.kafkaProperties, configuredKeyDeserializer, configuredValueDeserializer);
}

/**
Expand All @@ -98,10 +121,9 @@ public AppConfiguration<ConsumerTopicConfig> createConfiguration() {
}

/**
* Subscribes the given {@link Consumer} to all input topics and patterns
* configured in {@link #topics}.
* This includes all topics from {@code getInputTopics()}, {@code getLabeledInputTopics()},
* {@code getInputPattern()}, and {@code getLabeledInputPatterns()}.
* Subscribes the given {@link Consumer} to all input topics and patterns configured in {@link #topics}. This
* includes all topics from {@code getInputTopics()}, {@code getLabeledInputTopics()}, {@code getInputPattern()},
* and {@code getLabeledInputPatterns()}.
*
* @param <K> type of keys
* @param <V> type of values
Expand All @@ -123,8 +145,8 @@ public <K, V> void subscribeToAllTopics(final Consumer<K, V> consumer) {
}

/**
* Creates a {@link DefaultConsumerRunnable} using the provided consumer, processor,
* and {@link ConsumerExecutionOptions}.
* Creates a {@link DefaultConsumerRunnable} using the provided consumer, processor, and
* {@link ConsumerExecutionOptions}.
*
* @param <K> type of keys
* @param <V> type of values
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -26,12 +26,14 @@

import com.bakdata.kafka.AppConfiguration;
import com.bakdata.kafka.Configurator;
import com.bakdata.kafka.Preconfigured;
import java.util.Map;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

/**
Expand All @@ -50,32 +52,55 @@ public class ProducerBuilder {

/**
* Create a new {@link Producer} using {@link #kafkaProperties}
* @return {@link Producer}
*
* @param <K> type of keys
* @param <V> type of values
* @return {@link Producer}
* @see KafkaProducer#KafkaProducer(Map)
*/
public <K, V> Producer<K, V> createProducer() {
return new KafkaProducer<>(this.kafkaProperties);
}

/**
* Create a new {@link Producer} using {@link #kafkaProperties} and provided {@link Serializer Serializers}
* Create a new {@link Producer} using {@link #kafkaProperties} and provided {@link Serializer Serializers}. The
* serializers will be configured automatically.
*
* @param keySerializer {@link Serializer} to use for keys
* @param valueSerializer {@link Serializer} to use for values
* @return {@link Producer}
* @param <K> type of keys
* @param <V> type of values
* @return {@link Producer}
* @see KafkaProducer#KafkaProducer(Map, Serializer, Serializer)
*/
public <K, V> Producer<K, V> createProducer(final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
return new KafkaProducer<>(this.kafkaProperties, keySerializer, valueSerializer);
return this.createProducer(Preconfigured.create(keySerializer), Preconfigured.create(valueSerializer));
}

/**
* Create a new {@link Producer} using {@link #kafkaProperties} and provided {@link Preconfigured} serializers. The
* preconfiguration will be resolved to yield configured {@link Serializer Serializers} that are used to create the
* new {@link Producer}.
*
* @param keySerializer {@link Preconfigured} to use for keys
* @param valueSerializer {@link Preconfigured} to use for values
* @param <K> type of keys
* @param <V> type of values
* @return {@link Producer}
* @see KafkaProducer#KafkaProducer(Map, Serializer, Serializer)
*/
public <K, V> Producer<K, V> createProducer(final Preconfigured<Serializer<K>> keySerializer,
final Preconfigured<Serializer<V>> valueSerializer) {
final Serializer<K> configuredKeySerializer = keySerializer.configureForKeys(this.kafkaProperties);
final Serializer<V> configuredValueSerializer = valueSerializer.configureForValues(this.kafkaProperties);

return new KafkaProducer<>(this.kafkaProperties, configuredKeySerializer, configuredValueSerializer);
}

/**
* Create {@link Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and
* {@link Serializer} using {@link #kafkaProperties}.
* Create {@link Configurator} to configure {@link Serde} and {@link Serializer} using {@link #kafkaProperties}.
*
* @return {@link Configurator}
*/
public Configurator createConfigurator() {
Expand All @@ -84,6 +109,7 @@ public Configurator createConfigurator() {

/**
* Create {@link AppConfiguration} used by this app
*
* @return {@link AppConfiguration}
*/
public AppConfiguration<ProducerTopicConfig> createConfiguration() {
Expand Down
Loading
Loading