diff --git a/docs/docs/index.md b/docs/docs/index.md index 91f12c935..e8fea159d 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -1 +1,41 @@ -# Coming soon +# What is streams-bootstrap? + +`streams-bootstrap` is a Java library that standardizes the development and operation of Kafka-based applications (Kafka +Streams and plain Kafka clients). + +The framework supports Apache Kafka 4.1 and Java 17. Its modules are published to Maven Central for straightforward +integration into existing projects. + +## Why use it? + +Kafka Streams and the core Kafka clients provide strong primitives for stream processing and messaging, but they do not +prescribe: + +- How to structure a full application around those primitives +- How to configure applications consistently +- How to deploy and operate these services on Kubernetes +- How to perform repeatable reprocessing and cleanup +- How to handle errors and large messages uniformly + +`streams-bootstrap` addresses these aspects by supplying: + +1. **Standardized base classes** for Kafka Streams and client applications. +2. **A common CLI/configuration contract** for all Kafka applications. +3. **Helm-based deployment templates** and conventions for Kubernetes. +4. **Built-in reset/clean workflows** for reprocessing and state management. +5. **Consistent error-handling** and dead-letter integration. +6. **Testing infrastructure** for local development and CI environments. +7. **Optional blob-storage-backed serialization** for large messages. + +## Architecture + +The framework uses a modular architecture with a clear separation of concerns. + +### Core Modules + +- `streams-bootstrap-core`: Core abstractions for application lifecycle, execution, and cleanup +- `streams-bootstrap-cli`: CLI framework based on `picocli` +- `streams-bootstrap-test`: Utilities for testing streams-bootstrap applications +- `streams-bootstrap-large-messages`: Support for handling large Kafka messages +- `streams-bootstrap-cli-test`: Test support for CLI-based applications + diff --git a/docs/docs/user/concepts/common.md b/docs/docs/user/concepts/common.md new file mode 100644 index 000000000..3d150359b --- /dev/null +++ b/docs/docs/user/concepts/common.md @@ -0,0 +1,179 @@ +# Common concepts + +## Application types + +In streams-bootstrap, there are three application types: + +- **App** +- **ConfiguredApp** +- **ExecutableApp** + +--- + +### App + +The **App** represents your application logic. Each application type has its own `App` interface: + +- **StreamsApp** – for Kafka Streams applications +- **ProducerApp** – for producer applications +- **ConsumerApp** – for consumer applications +- **ConsumerProducerApp** – for consumer–producer applications + +You implement the appropriate interface to define your application's behavior. + +--- + +### ConfiguredApp + +A **ConfiguredApp** pairs an `App` with its configuration. Examples include: + +- `ConfiguredStreamsApp` +- `ConfiguredProducerApp` +- `ConfiguredConsumerApp` +- `ConfiguredConsumerProducerApp` + +This layer handles Kafka property creation, combining: + +- base configuration +- app-specific configuration +- user configuration +- runtime configuration, e.g., brokers and schema registry + +--- + +### ExecutableApp + +An **ExecutableApp** is a `ConfiguredApp` with runtime configuration applied, making it ready to execute. +It can create: + +- a **Runner** for running the application +- a **CleanUpRunner** for cleanup operations + +--- + +### Usage Pattern + +1. You implement an **App**. +2. The system wraps it in a **ConfiguredApp**, applying the configuration. +3. Runtime configuration is then applied to create an **ExecutableApp**, which can be: + +- **run**, or +- **cleaned up**. + +--- + +## Application lifecycle + +Applications built with streams-bootstrap follow a defined lifecycle with specific states and transitions. + +The lifecycle is managed through the KafkaApplication base class and provides several extension points for +customization. + +| Phase | Description | Entry Point | +|----------------|--------------------------------------------------------------------------|----------------------------------------------------------| +| Initialization | Parse CLI arguments, inject environment variables, configure application | `startApplication()` or `startApplicationWithoutExit()` | +| Preparation | Execute pre-run/pre-clean hooks | `onApplicationStart()`, `prepareRun()`, `prepareClean()` | +| Execution | Run main application logic or cleanup operations | `run()`, `clean()`, `reset()` | +| Shutdown | Stop runners, close resources, cleanup | `stop()`, `close()` | + +### Running an application + +Applications built with streams-bootstrap can be started in two primary ways: + +- **Via Command Line Interface**: When packaged as a runnable JAR (for example, in a container), + the `run` command is the default entrypoint. An example invocation: + + ```bash + java -jar example-app.jar \ + run \ + --bootstrap-servers kafka:9092 \ + --input-topics input-topic \ + --output-topic output-topic \ + --schema-registry-url http://schema-registry:8081 + ``` + +- **Programmatically**: You can create a `Runner` from an `ExecutableApp` to run it directly. + +```java +// For streams applications +try (StreamsRunner runner = streamsApp.createRunner()) { + runner.run(); +} + +// For producer applications +try (Runner runner = producerApp.createRunner()) { + runner.run(); +} +``` + +### Cleaning an application + +A built-in mechanism is provided to clean up all resources associated with an application. + +When the cleanup operation is triggered, the following resources are removed: + +| Resource Type | Description | Streams Apps | Producer Apps | Consumer Apps | Consumer-Producer Apps | +|---------------------|-----------------------------------------------------------|--------------|---------------|---------------|------------------------| +| Output Topics | Topics the application produces to | ✓ | ✓ | N/A | ✓ | +| Intermediate Topics | Topics the applications produces to and consumes from | ✓ | N/A | N/A | N/A | +| Internal Topics | Topics for state stores or repartitioning (Kafka Streams) | ✓ | N/A | N/A | N/A | +| Consumer Groups | Consumer group metadata | ✓ | N/A | ✓ | ✓ | + +Cleanup can be triggered: + +- **Via Command Line**: When packaged as a runnable JAR, the `clean` command can be used. + + ```bash + java -jar example-app.jar \ + clean \ + --bootstrap-servers kafka:9092 \ + --output-topic output-topic + ``` +- **Programmatically**: + +```java +// For streams applications +try(StreamsCleanUpRunner cleanUpRunner = streamsApp.createCleanUpRunner()){ + cleanUpRunner. + +clean(); +} + +// For producer applications + try( +CleanUpRunner cleanUpRunner = producerApp.createCleanUpRunner()){ + cleanUpRunner. + +clean(); +} +``` + +Cleanup operations are idempotent, meaning they can be safely retried without causing +additional issues. + +## Configuration + +Kafka properties are applied in the following order (later values override earlier ones): + +1. Base configuration +2. App config from .createKafkaProperties() +3. Kafka-specific environment variables with the `KAFKA_` prefix +4. Runtime args (`--bootstrap-servers`, `--schema-registry`, `--kafka-config`) +5. Serialization config +6. Group ID configuration + +Environment variables with the `APP_ prefix` (configurable via `ENV_PREFIX`) are automatically parsed. +Environment variables are converted to CLI arguments: + +```text +APP_BOOTSTRAP_SERVERS → --bootstrap-servers +APP_SCHEMA_REGISTRY_URL → --schema-registry-url +APP_OUTPUT_TOPIC → --output-topic +``` + +### Common CLI Configuration Options + +- `--bootstrap-servers`: Kafka bootstrap servers (required) +- `--schema-registry-url`: URL for the Schema Registry. When this option is provided schema cleanup is handled as part + of the `clean` command +- `--kafka-config`: Key-value Kafka configuration diff --git a/docs/docs/user/concepts/consumer-producer.md b/docs/docs/user/concepts/consumer-producer.md new file mode 100644 index 000000000..e69de29bb diff --git a/docs/docs/user/concepts/consumer.md b/docs/docs/user/concepts/consumer.md new file mode 100644 index 000000000..e69de29bb diff --git a/docs/docs/user/concepts/producer.md b/docs/docs/user/concepts/producer.md new file mode 100644 index 000000000..794462b25 --- /dev/null +++ b/docs/docs/user/concepts/producer.md @@ -0,0 +1,150 @@ +# Producer applications + +Producer applications generate data and send it to Kafka topics. They can be used to produce messages from various +sources, such as databases, files, or real-time events. + +streams-bootstrap provides a structured way to build producer applications with consistent configuration handling, +command-line support, and lifecycle management. + +--- + +## Application lifecycle + +### Running an application + +Producer applications are executed using the `ProducerRunner`, which runs the producer logic defined by the application. + +Unlike Kafka Streams applications, producer applications typically: + +- Run to completion and terminate automatically, or +- Run continuously when implemented as long-lived services + +The execution model is fully controlled by the producer implementation and its runnable logic. + +--- + +### Cleaning an application + +Producer applications support a dedicated `clean` command. + +```bash +java -jar my-producer-app.jar \ + --bootstrap-servers localhost:9092 \ + --output-topic my-topic \ + clean +``` + +The clean process can perform the following operations: + +- Delete output topics +- Delete registered schemas from Schema Registry +- Execute custom cleanup hooks defined by the application + +Applications can register custom cleanup logic by overriding `setupCleanUp`. + +--- + +## Configuration + +### Topics + +Producer applications support output topic configuration: + +- `--output-topic`: Default output topic for produced messages +- `--labeled-output-topics`: Named output topics with different message types + +### Kafka properties + +#### Base configuration + +The following Kafka properties are configured by default for Producer applications in streams-bootstrap: + +- `max.in.flight.requests.per.connection = 1` +- `acks = all` +- `compression.type = gzip` + +#### Custom Kafka properties + +Kafka configuration can be customized by overriding `createKafkaProperties()`: + +```java + +@Override +public Map createKafkaProperties() { + return Map.of( + ProducerConfig.RETRIES_CONFIG, 3, + ProducerConfig.BATCH_SIZE_CONFIG, 16384, + ProducerConfig.LINGER_MS_CONFIG, 5 + ); +} +``` + +--- + +### Lifecycle hooks + +Producer applications can register cleanup logic via `setupCleanUp`. This method allows you to attach: + +- **Cleanup hooks** – for general cleanup logic not tied to Kafka topics +- **Topic hooks** – for reacting to topic lifecycle events (e.g. deletion) + +#### Clean up + +Custom cleanup logic that is not tied to Kafka topics can be registered via cleanup hooks: + +```java + +@Override +public ProducerCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return ProducerApp.super.setupCleanUp(configuration) + .registerCleanHook(() -> { + // Custom cleanup logic + }); +} +``` + +#### Topic hooks + +Topic hooks should be used for topic-related cleanup or side effects, such as releasing external +resources associated with a topic or logging topic deletions: + +```java + +@Override +public ProducerCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return ProducerApp.super.setupCleanUp(configuration) + .registerTopicHook(new TopicHook() { + + @Override + public void deleted(final String topic) { + // Called when a managed topic is deleted + System.out.println("Deleted topic: " + topic); + } + + @Override + public void close() { + // Optional closing of connections/resources + } + }); +} +``` + +## Command line interface + +Producer applications inherit standard CLI options from `KafkaApplication`. The following CLI options are +producer-specific: + +| Option | Description | Default | +|---------------------------|-------------------------------------------|---------| +| `--output-topic` | Default output topic | - | +| `--labeled-output-topics` | Named output topics (`label1=topic1,...`) | - | + +--- + +## Deployment + +TODO diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md new file mode 100644 index 000000000..c4278611a --- /dev/null +++ b/docs/docs/user/concepts/streams.md @@ -0,0 +1,272 @@ +# Streams applications + +Streams apps are applications that process data in real-time as it flows through Kafka topics. +They can be used to filter, transform, aggregate, or enrich data streams. +Streams apps can also produce new messages to other topics based on the processed data. + +--- + +## Application lifecycle + +### Running an application + +Kafka Streams applications are started via the `KafkaStreamsApplication` entry point. + +--- + +### Resetting an application + +Streams applications support a dedicated `reset` operation that clears processing state while preserving the +application definition and configuration. This is useful for reprocessing input data from the beginning. + +When a reset is triggered, the following resources are affected: + +| Resource | Action | +|------------------|-------------------------------------------| +| State stores | Cleared locally, changelog topics deleted | +| Internal topics | Deleted (e.g. repartition topics) | +| Consumer offsets | Reset to earliest for input topics | +| Output topics | Preserved | + +Triggering a reset via CLI: + +```bash +java -jar my-streams-app.jar reset +``` + +Triggering a reset programmatically: + +```java +try (StreamsCleanUpRunner cleanUpRunner = streamsApp.createCleanUpRunner()) { + cleanUpRunner.reset(); +} +``` + +After a reset, the application can be started again and will reprocess all input data. + +--- + +### Cleaning an application + +The `clean` command performs everything that `reset` does and additionally removes the Kafka consumer groups and output +topics created by the application. + +```bash +java -jar my-streams-app.jar clean +``` + +--- + +## Configuration + +### Topics + +Streams applications support flexible topic configuration: + +- `--input-topics`: Comma-separated list of input topics +- `--input-pattern`: Regex pattern for input topics +- `--output-topic`: Default output topic +- `--error-topic`: Topic for error records +- `--labeled-input-topics`: Named input topics with different message types +- `--labeled-input-patterns`: Additional labeled input topic patterns +- `--labeled-output-topics`: Named output topics with different message types + +--- + +### Application ID + +- `--application-id`: Unique Kafka Streams application ID + +--- + +### Kafka properties + +#### Base configuration + +The following Kafka properties are configured by default for Streams +applications in streams-bootstrap: + +- `processing.guarantee = exactly_once_v2` +- `producer.max.in.flight.requests.per.connection = 1` +- `producer.acks = all` +- `producer.compression.type = gzip` + +#### Custom Kafka properties + +Kafka configuration can be customized by overriding `createKafkaProperties()`: + +```java + +@Override +public Map createKafkaProperties() { + return Map.of( + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4, + StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + LogAndContinueExceptionHandler.class.getName() + ); +} +``` + +--- + +### Lifecycle hooks + +Streams applications support the following hook types: + +- **Cleanup hooks** – for general cleanup logic not tied to Kafka topics +- **Topic hooks** – for reacting to topic lifecycle events (e.g. deletion) +- **Reset hooks** – for logic that should run only during an application reset + +#### Clean up + +Use cleanup hooks for logic that is not tied to Kafka topics, such as closing external resources +or cleaning up temporary state. + +```java + +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerCleanHook(() -> { + // Custom cleanup logic + }); +} +``` + +#### Topic hooks + +Topic hooks allow Kafka Streams applications to react to Kafka topic lifecycle events, such as topic +deletion during `clean` or `reset` operations. + +```java + +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerTopicHook(new TopicHook() { + @Override + public void deleted(final String topic) { + // Called when a managed topic is deleted + System.out.println("Deleted topic: " + topic); + } + + @Override + public void close() { + // Optional closing of connections/resources + } + }); +} +``` + +#### Reset hooks + +Reset hooks allow Kafka Streams applications to execute custom logic only during a reset operation. They are not invoked +during a regular clean. + +```java + +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerResetHook(() -> { + // Custom logic executed only during reset + }); +} +``` + +--- + +### Execution options + +#### On start + +The `onStreamsStart` method is a lifecycle hook that gets called after Kafka Streams has successfully started. This hook +receives a `RunningStreams` parameter that provides access to the running Kafka Streams instance and its configuration. + +```java + +@Override +private void onStreamsStart(final RunningStreams runningStreams) { + // Custom startup logic +} +``` + +##### Application server + +A common use case for the `onStreamsStart` hook is to start an embedded application server (e.g., for REST APIs, +GraphQL, gRPC). + +```java + +@Override +private void onStreamsStart(final RunningStreams runningStreams) { +// Access the application server configuration + final Optional applicationServer = runningStreams.getConfig().getApplicationServer(); + + applicationServer.ifPresent(hostInfo -> { + final String host = hostInfo.host(); + final int port = hostInfo.port(); + + // Start your application server + log.info("Starting application server on {}:{}", host, port); + // startRestServer(host, port); + // startGrpcServer(host, port); + }); +} +``` + +#### State listener + +TODO + +#### Uncaught exception handler + +TODO + +#### Closing options + +TODO + +--- + +## Command line interface + +Streams applications inherit standard CLI options from `KafkaStreamsApplication`. The following CLI options are +streams-app-specific: + +| Option | Description | Default | +|--------------------------------|-----------------------------------------------------------------------------------------------|----------------| +| `--application-id` | Kafka Streams application ID | Auto-generated | +| `--volatile-group-instance-id` | Use volatile group instance ID. This changes shutdown behavior of the Kafka Streams instance. | false | + +--- + +## Deployment + +TODO + +--- + +## Kafka Streams extensions + +Several extensions are provided that simplify working with Kafka Streams. + +### Simple topic access + +TODO + +### Error handling + +TODO + +### Serde auto configuration + +TODO + +--- diff --git a/docs/docs/user/deployment/kubernetes.md b/docs/docs/user/deployment/kubernetes.md new file mode 100644 index 000000000..48ab3db96 --- /dev/null +++ b/docs/docs/user/deployment/kubernetes.md @@ -0,0 +1,154 @@ +# Deployment to Kubernetes + +`streams-bootstrap` provides support for deploying applications to Kubernetes using Helm +charts. The charts cover Kafka Streams, producer, consumer, and producer-consumer applications and offer standardized +solutions for autoscaling, monitoring, and state persistence. + +--- + +## Core capabilities + +- **Autoscaling** – KEDA-based horizontal scaling driven by Kafka consumer lag +- **Monitoring** – JMX metrics export with Prometheus integration +- **Persistence** – Persistent volumes for Kafka Streams state stores + +--- + +## Helm charts + +A set of Helm charts is shipped, tailored to different application types: + +| Chart name | Purpose | Kubernetes workload types | +|------------------------|-----------------------------------------------|--------------------------------| +| `streams-app` | Deploy Kafka Streams applications | `Deployment`, `StatefulSet` | +| `producer-app` | Deploy Kafka Producer applications | `Deployment`, `Job`, `CronJob` | +| `consumer-app` | Deploy Kafka Consumer applications | `Deployment`, `StatefulSet` | +| `consumerproducer-app` | Deploy batch / consumer–producer applications | `Deployment`, `StatefulSet` | +| `*-cleanup-job` | Clean Kafka resources before deployment | `Job` | + +--- + +## Chart repository and installation + +The Helm charts are published as a Helm repository: + +```bash +helm repo add streams-bootstrap https://bakdata.github.io/streams-bootstrap/ +helm repo update +``` + +A Streams application can then be installed with: + +```bash +helm install my-app streams-bootstrap/streams-app --values my-values.yaml +``` + +--- + +## Deployment patterns + +### Streams, consumer and consumer–producer applications + +Streams, consumer and consumer–producer applications support both stateless and stateful deployment modes: + +- **Deployment** + - Used for stateless applications or when state is small and can be restored easily + - Enabled when `statefulSet: false` + +- **StatefulSet** + - Used for stateful Kafka Streams applications with local state stores + - Enabled when `statefulSet: true` + - Required when `persistence.enabled: true` + - If persistence is enabled each pod receives a dedicated `PersistentVolumeClaim` for RocksDB state + +--- + +### Producer applications + +Producer applications support multiple execution modes depending on workload characteristics: + +- **Deployment** + - Used for continuous producers + - Enabled when `deployment: true` + - Supports horizontal scaling via `replicaCount` + +- **Job** + - Used for one-time runs or backfills + - Default when `deployment: false` and no `schedule` is provided + - Supports `restartPolicy`, `backoffLimit`, and `ttlSecondsAfterFinished` + +- **CronJob** + - Used for scheduled, periodic execution + - Enabled when a cron expression is provided via `schedule` + - Supports everything a job supports and `suspend`, `successfulJobsHistoryLimit`, and `failedJobsHistoryLimit` + +--- + +## Configuration structure + +TODO + +--- + +## Autoscaling + +Autoscaling is implemented using Kubernetes Event-Driven Autoscaling (KEDA). When enabled, KEDA monitors Kafka consumer +lag and adjusts the number of replicas accordingly. + +Autoscaling is disabled by default. + +### Enabling autoscaling + +```yaml +autoscaling: + enabled: true + lagThreshold: "1000" + minReplicas: 0 + maxReplicas: 5 +``` + +When enabled, the chart creates a KEDA `ScaledObject` and omits a fixed `replicaCount` from the workload specification. + +### Scaling behavior + +For details on the scaling behavior, please refer to +the [official KEDA documentation for the Kafka scaler](https://keda.sh/docs/scalers/apache-kafka/). + +### Integration with persistence + +When persistence is enabled for Streams applications, autoscaling targets a `StatefulSet`. Each replica receives its own +`PersistentVolumeClaim`. + +--- + +## Monitoring + +Monitoring is based on JMX metrics and Prometheus scraping: + +- `jmx.enabled: true` exposes the JMX port in Kubernetes, allowing users to debug applications +- `prometheus.jmx.enabled: true` adds a Prometheus JMX exporter sidecar that exposes metrics on a dedicated `/metrics` + endpoint + +Collected metrics include consumer lag, processing rates, and RocksDB statistics. + +--- + +## Persistence + +Persistence is configured via the `persistence.*` section (for Streams, Consumer and Consumer-Producer applications): + +```yaml +persistence: + enabled: true + size: 1Gi + storageClassName: standard +``` + +When enabled together with `statefulSet: true`, each pod receives a dedicated volume for local state storage. This +enables: + +- Faster restarts due to warm state +- Improved recovery semantics for stateful topologies + +If persistence is disabled, applications behave as stateless deployments and rely on Kafka changelogs for state +reconstruction. diff --git a/docs/docs/user/deployment/local.md b/docs/docs/user/deployment/local.md new file mode 100644 index 000000000..b3d5deffc --- /dev/null +++ b/docs/docs/user/deployment/local.md @@ -0,0 +1,42 @@ +# Local deployment + +Applications can be run locally for development and testing purposes. This can be done programmatically within your +code. + +### Programmatic Local Execution + +Here is an example of how to run a producer application programmatically. This is useful for simple applications or for +testing. + +```java +try (final KafkaProducerApplication app = + new SimpleKafkaProducerApplication<>(() -> new ProducerApp() { + + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer()) { + // Producer logic + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + })) { + + app.setBootstrapServers("localhost:9092"); + app.setOutputTopic("output-topic"); + app.run(); +} +``` + +### Command Line Execution + +You can also run the application from the command line by packaging it as a JAR file. + +```bash +java -jar my-producer-app.jar --bootstrap-servers localhost:9092 --output-topic my-topic run +``` diff --git a/docs/docs/user/examples/interactive-queries.md b/docs/docs/user/examples/interactive-queries.md new file mode 100644 index 000000000..a10120417 --- /dev/null +++ b/docs/docs/user/examples/interactive-queries.md @@ -0,0 +1 @@ +# Interactive queries diff --git a/docs/docs/user/examples/word-count.md b/docs/docs/user/examples/word-count.md new file mode 100644 index 000000000..77509ef90 --- /dev/null +++ b/docs/docs/user/examples/word-count.md @@ -0,0 +1 @@ +# Word count diff --git a/docs/docs/user/extensions/large-messages.md b/docs/docs/user/extensions/large-messages.md new file mode 100644 index 000000000..29dbf6506 --- /dev/null +++ b/docs/docs/user/extensions/large-messages.md @@ -0,0 +1,58 @@ +# Large messages + +## Overview + +The Large Messages extension adds support for handling messages that exceed Kafka's size limitations by using +external storage mechanisms with automatic cleanup. +It integrates with *streams-bootstrap* to transparently manage: + +- large message serialization +- large message deserialization +- blob storage files cleanup + +For more details, see the large messages +module: [streams-bootstrap-large-messages GitHub repository](https://github.com/bakdata/streams-bootstrap/tree/master/streams-bootstrap-large-messages) + +There are two supported ways to enable cleanup for large messages: + +- Implement `LargeMessageStreamsApp` +- Register a topic cleanup hook manually + +--- + +### Option 1: Implement `LargeMessageStreamsApp` + +Use this option for Kafka Streams applications where large message cleanup should always run together with topic +cleanup. + +```java +public final class MyStreamsApp implements LargeMessageStreamsApp { + + @Override + public void buildTopology(final StreamsBuilderX builder) { + // build topology here + } +} +``` + +### Option 2: Register a cleanup hook manually + +If cleanup should only happen conditionally or requires custom behavior, a topic hook can be registered explicitly. + +```java +private final boolean largeMessageCleanupEnabled; + +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + final StreamsCleanUpConfiguration cleanUp = + StreamsApp.super.setupCleanUp(configuration); + + if (this.largeMessageEnabled) { + return LargeMessageAppUtils.registerTopicHook(cleanUp, configuration); + } + + return cleanUp; +} +``` diff --git a/docs/docs/user/getting-started/quick-start.md b/docs/docs/user/getting-started/quick-start.md new file mode 100644 index 000000000..129c3098c --- /dev/null +++ b/docs/docs/user/getting-started/quick-start.md @@ -0,0 +1,84 @@ +# Quick Start + +This page shows how to add `streams-bootstrap` to a project and how to create and run a minimal application. + +## Prerequisites + +- Java 17 +- Apache Kafka cluster (brokers reachable from the application) +- `streams-bootstrap-cli` dependency (see [Setup](setup.md) for Gradle/Maven snippets) + +## Minimal Kafka Streams Application + +Create a subclass of `KafkaStreamsApplication` and implement the required methods. + +```java +import com.bakdata.kafka.streams.KafkaStreamsApplication; +import com.bakdata.kafka.streams.SerdeConfig; +import com.bakdata.kafka.streams.StreamsApp; +import com.bakdata.kafka.streams.StreamsTopicConfig; +import com.bakdata.kafka.streams.kstream.KStreamX; +import com.bakdata.kafka.streams.kstream.StreamsBuilderX; +import java.util.Map; +import org.apache.kafka.common.serialization.Serdes.StringSerde; + +public class MyStreamsApplication extends KafkaStreamsApplication { + + public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); + } + + @Override + public StreamsApp createApp() { + return new StreamsApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.streamInput(); + // topology definition + input.toOutputTopic(); + } + + @Override + public String getUniqueAppId(final StreamsTopicConfig topics) { + return "streams-bootstrap-app-" + topics.getOutputTopic(); + } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + }; + } +} +``` + +## Running the Application + +### Via Command Line Interface + +When packaged as a runnable JAR (for example, in a container), the `run` command is the default entrypoint: + +```bash +java -jar my-streams-app.jar \ + run \ + --bootstrap-servers kafka:9092 \ + --input-topics input-topic \ + --output-topic output-topic \ + --schema-registry-url http://schema-registry:8081 +``` + +Additional subcommands such as `clean` and `reset` are available for lifecycle management. + +### From the `main` Method + +In the `main` method, the application subclass starts up via: + +```java +public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); +} +``` + +This delegates configuration loading, lifecycle handling, and shutdown to `streams-bootstrap`. + +--- diff --git a/docs/docs/user/getting-started/setup.md b/docs/docs/user/getting-started/setup.md new file mode 100644 index 000000000..1bea5752d --- /dev/null +++ b/docs/docs/user/getting-started/setup.md @@ -0,0 +1,32 @@ +# Setup + +This page describes dependency setup, configuration options, commands, and Helm-based deployment for +`streams-bootstrap`. + +## Dependencies + +### Gradle + +```gradle +implementation group: 'com.bakdata.kafka', name: 'streams-bootstrap-cli', version: '7.0.0' +``` + +With Kotlin DSL: + +```gradle +implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = "7.0.0") +``` + +### Maven + +```xml + + + com.bakdata.kafka + streams-bootstrap-cli + 7.0.0 + +``` + +For other build tools or versions, refer to the +[latest version in MvnRepository](https://mvnrepository.com/artifact/com.bakdata.kafka/streams-bootstrap/latest). diff --git a/docs/docs/user/monitoring.md b/docs/docs/user/monitoring.md new file mode 100644 index 000000000..94e83e4a3 --- /dev/null +++ b/docs/docs/user/monitoring.md @@ -0,0 +1,337 @@ +# Monitoring + +Monitoring features are provided for your applications. + +- **JMX Metrics Export**: Applications built with `streams-bootstrap` can expose JMX (Java Management Extensions) + metrics, which provide insights into the performance and health of the Java application and the Kafka clients. +- **Prometheus Integration**: The Helm charts are configured to work with Prometheus, a popular open-source monitoring + and alerting toolkit. This allows you to scrape the JMX metrics and visualize them in dashboards (e.g., using + Grafana). + +## Monitoring and Observability + +The Helm charts provide integrated monitoring and observability for Kafka applications using a combination of +JMX, Prometheus, Kubernetes probes, and Services. Monitoring can be tailored from lightweight setups for development +to full production stacks with Prometheus Operator. + +### Monitoring Mechanisms + +| Mechanism | Use Case | Key Values | +|-------------------------|-------------------------------------|--------------------------| +| JMX remote access | Direct debugging and inspection | `jmx.enabled` | +| Prometheus JMX exporter | Production metrics collection | `prometheus.jmx.enabled` | +| Liveness probes | Container health checks | `livenessProbe` | +| Readiness probes | Traffic readiness / rollout control | `readinessProbe` | + +### JMX Configuration + +JMX (Java Management Extensions) provides direct access to application metrics and management operations, typically +used for development and debugging. + +Enable JMX in `values.yaml`: + +```yaml +jmx: + enabled: true + port: 5555 + host: localhost +``` + +Parameters: + +- `jmx.enabled`: Enable JMX port for remote access (default: `false`). +- `jmx.port`: JMX port number (default: `5555`). +- `jmx.host`: Host binding for the RMI server (default: `localhost`). + +When enabled, the chart configures the JVM with flags similar to: + +```text +-Dcom.sun.management.jmxremote +-Dcom.sun.management.jmxremote.port=5555 +-Dcom.sun.management.jmxremote.local.only=false +-Dcom.sun.management.jmxremote.authenticate=false +-Dcom.sun.management.jmxremote.ssl=false +-Djava.rmi.server.hostname=localhost +``` + +Accessing JMX metrics from a local client: + +```bash +kubectl port-forward 5555:5555 +jconsole localhost:5555 +``` + +### Prometheus JMX Exporter + +For production monitoring, the Prometheus JMX Exporter runs as a sidecar container that scrapes JMX metrics from the +application and exposes them in Prometheus format. + +Enable the exporter in `values.yaml`: + +```yaml +prometheus: + jmx: + enabled: true + image: bitnami/jmx-exporter + imageTag: 1.1.0 + imagePullPolicy: Always + port: 5556 + metricRules: + - pattern: ".*" + resources: + requests: + cpu: 10m + memory: 100Mi + limits: + cpu: 100m + memory: 100Mi +``` + +Key parameters: + +- `prometheus.jmx.enabled`: Deploy JMX exporter sidecar (default: `false`). +- `prometheus.jmx.image`: Container image for the exporter (default: `bitnami/jmx-exporter`). +- `prometheus.jmx.imageTag`: Exporter image tag (default: `1.1.0`). +- `prometheus.jmx.port`: HTTP port for metrics endpoint (default: `5556`). +- `prometheus.jmx.metricRules`: JMX metric selection and mapping rules. +- `prometheus.jmx.resources`: Resource requests/limits for the exporter container. + +#### Metric Rules + +The `metricRules` section configures which JMX beans are exposed and how they are mapped to Prometheus metrics. The +default configuration uses `pattern: ".*"` to export all metrics, but production setups should restrict this to +relevant Kafka Streams/producer/consumer metrics. + +Example rule set: + +```yaml +prometheus: + jmx: + metricRules: + - pattern: "kafka.streams<>(.+):" + name: kafka_streams_$1_$5 + labels: + client_id: "$2" + $3: "$4" + - pattern: "kafka.producer<>(.+):" + name: kafka_producer_$1_$3 + labels: + client_id: "$2" +``` + +A ConfigMap containing the JMX exporter configuration is generated automatically by the chart and mounted into the +sidecar container. + +### Prometheus Integration + +#### Pod annotations + +For Prometheus instances that use pod annotations for discovery: + +```yaml +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/path: "/metrics" + prometheus.io/port: "5556" +``` + +This enables scraping the JMX exporter endpoint exposed on `prometheus.jmx.port`. + +### PodMonitor + +For more advanced Prometheus Operator setups, a `PodMonitor` custom resource can be deployed. + +The `streams-bootstrap` repository provides a reference `PodMonitor` +configuration: [monitoring/pod_monitor.yaml](https://github.com/bakdata/streams-bootstrap/blob/1ff01c2f/monitoring/pod_monitor.yaml) + +### Health Checks + +Kubernetes uses liveness and readiness probes to determine when a pod is healthy and when it is ready to receive +traffic. + +**Liveness probes** restart containers that become unhealthy: + +```yaml +livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 +``` + +**Readiness probes** gate traffic until the application is ready: + +```yaml +readinessProbe: + httpGet: + path: /ready + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 + timeoutSeconds: 3 + successThreshold: 1 + failureThreshold: 3 +``` + +All standard Kubernetes probe types are supported (`httpGet`, `tcpSocket`, `exec`, and `grpc` on recent Kubernetes +versions). Probes are configured under the corresponding `livenessProbe` and `readinessProbe` sections in values. + +### Service and Port Configuration + +Ports and Services control how HTTP APIs and metrics endpoints are exposed: + +```yaml +service: + enabled: true + type: ClusterIP + +ports: + - containerPort: 8080 + name: http + protocol: TCP + servicePort: 80 + - containerPort: 5556 + name: metrics + protocol: TCP + servicePort: 5556 +``` + +Port mapping reference: + +- `jmx.port` → JMX remote port (default `5555`). +- `prometheus.jmx.port` → JMX exporter metrics port (default `5556`). +- Additional entries in `ports[]` → application-specific ports (e.g. HTTP APIs, custom metrics endpoints). + +### Monitoring Configuration Examples + +**Full monitoring stack** (JMX exporter, probes, Service, annotations): + +```yaml +prometheus: + jmx: + enabled: true + port: 5556 + metricRules: + - pattern: "kafka.streams<>(.+):" + name: kafka_streams_$2 + labels: + client_id: "$1" + +readinessProbe: + httpGet: + path: /ready + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + +livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 60 + periodSeconds: 30 + +service: + enabled: true + type: ClusterIP + +ports: + - containerPort: 8080 + name: http + servicePort: 80 + - containerPort: 5556 + name: metrics + servicePort: 5556 + +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/path: "/metrics" + prometheus.io/port: "5556" +``` + +**Development/debug configuration with JMX only**: + +```yaml +jmx: + enabled: true + port: 5555 + host: localhost + +prometheus: + jmx: + enabled: false + +livenessProbe: + tcpSocket: + port: 5555 + initialDelaySeconds: 30 + periodSeconds: 30 +``` + +**Minimal production configuration** with annotations and resource limits: + +```yaml +prometheus: + jmx: + enabled: true + resources: + requests: + cpu: 10m + memory: 100Mi + limits: + cpu: 100m + memory: 100Mi + +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/port: "5556" +``` + +### Application-Specific Considerations + +Kafka Streams applications expose metrics under several JMX domains, including `kafka.streams`, `kafka.producer`, and +`kafka.consumer`. Commonly monitored metrics include: + +- `kafka.streams.state`: Overall application state (running, rebalancing, error). +- `kafka.streams.commit-latency-avg`: Average commit latency. +- `kafka.consumer.records-lag-max`: Maximum records lag per partition. +- `kafka.producer.record-send-rate`: Producer throughput. + +Producer and consumer applications (via `producer-app` and `consumer-app` charts) use the same `prometheus.jmx` +structure +but may differ in availability patterns (for example, Jobs vs Deployments). + +### Troubleshooting + +**Common issues**: + +- No metrics endpoint: + - Ensure `prometheus.jmx.enabled: true`. +- Connection refused on JMX port: + - Ensure `jmx.enabled: true` and the port is exposed. +- Empty metrics response: + - Review `metricRules` patterns; overly restrictive rules may filter out all metrics. +- High exporter CPU usage: + - Avoid `pattern: ".*"` in production; use targeted patterns instead. +- Pod not ready: + - Validate liveness/readiness probe configuration and the corresponding application endpoints. + +**Verifying metrics export**: + +```bash +kubectl port-forward 5556:5556 +curl http://localhost:5556/metrics +``` + +**Debugging JMX connection**: + +```bash +kubectl port-forward 5555:5555 +jconsole localhost:5555 +``` + +If connection fails, verify that JMX is enabled, the port is mapped in `ports`, and the JVM has been started with the +correct JMX system properties. diff --git a/docs/docs/user/testing.md b/docs/docs/user/testing.md new file mode 100644 index 000000000..2608cc125 --- /dev/null +++ b/docs/docs/user/testing.md @@ -0,0 +1,327 @@ +# Testing + +The `streams-bootstrap` testing tools provide utilities for testing Kafka Streams, Consumer, Producer and Consumer-Producer +applications, covering both unit-level and integration-style scenarios. + +They abstract common test concerns such as Kafka infrastructure setup, Schema Registry integration, application +lifecycle handling, and consumer group verification, and are designed to work with real Kafka clusters as well as +schema-aware test environments. + +## TestApplicationRunner + +`TestApplicationRunner` is a test utility for running, configuring, and verifying Kafka applications in integration and system tests. + +It abstracts away repetitive setup such as: +- bootstrap servers +- Schema Registry +- Kafka client configuration +- CLI argument wiring +- lifecycle commands (`run`, `clean`, `reset`) + +Typical use cases: +- end-to-end tests +- containerized test environments +- embedded Kafka setups +- CI pipelines + +## Typical Usage + +```java +TestApplicationRunner runner = + TestApplicationRunner.create("localhost:9092") + .withSchemaRegistry() + .withStateDir(tempDir) + .withNoStateStoreCaching(); +``` + +All applications executed via this runner automatically inherit this configuration. + +Bootstrap Servers +- Passed via `--bootstrap-servers` +- Also set directly on the application instance + +Kafka Configuration +- All provided Kafka properties are injected +- Passed via `--kafka-config key=value` +- Also merged into `app.setKafkaConfig(...)` + +Schema Registry (optional) +- Passed via `--schema-registry-url` +- Only configured when explicitly enabled + +--- + +### Configuring Kafka for Tests + +```java +runner = runner.withKafkaConfig(Map.of("auto.offset.reset", "earliest")); +``` + +Behavior: +- merged with existing configuration +- immutable after creation +- overrides application defaults + +--- + +#### Kafka Streams–Specific Helpers + +##### Configure State Directory + +```java +runner = runner.withStateDir(tempDir); +``` + +Sets: +``` +state.dir = +``` + +Use this to: +- isolate test runs +- avoid state leakage between tests + +--- + +##### Disable State Store Caching + +```java +runner = runner.withNoStateStoreCaching(); +``` + +Sets: +``` +statestore.cache.max.bytes = 0 +``` + +Useful when: +- asserting exact record counts +- debugging processor behavior +- avoiding cache-related timing issues + +--- + +#### Consumer-Specific Helpers + +##### Configure Session Timeout + +```java +runner = runner.withSessionTimeout(Duration.ofSeconds(5)); +``` + +Sets: +``` +session.timeout.ms = 5000 +``` + +Useful for: +- fast consumer group rebalancing +- deterministic failure testing + +--- + +### Schema Registry Support + +#### Enable a Test Schema Registry + +```java +runner = runner.withSchemaRegistry(); +``` + +Creates: +- isolated in-memory Schema Registry +- random scope to avoid collisions +- transparent integration for the application + +--- + +#### Use a Custom TestSchemaRegistry + +```java +TestSchemaRegistry registry = new TestSchemaRegistry(); +runner = runner.withSchemaRegistry(registry); +``` + +Use this when: +- sharing schemas across applications +- inspecting registered schemas during tests + +--- + +### Running Applications + +#### CLI + +```java +CompletableFuture exitCode = + runner.run(app, "--some-flag"); +``` + +- invokes `startApplicationWithoutExit` +- returns application exit code + +--- + +#### Runnable + +```java +CompletableFuture execution = runner.run(app); +``` + +- calls `onApplicationStart()` +- runs application directly +- suitable for long-running tests + +--- + +### Cleaning and Resetting Applications + +#### Clean + +```java +runner.clean(app); +``` + +or + +```java +runner.clean(app, "--custom-arg"); +``` + +Used to: +- delete Kafka topics +- clean local state +- execute cleanup hooks + +--- + +#### Reset + +Supported for: +- Streams applications +- Consumer applications +- Consumer–Producer applications + +```java +runner.reset(streamsApp); +``` + +--- + +### Consumer Group Verification + +```java +ConsumerGroupVerifier verifier = runner.verify(streamsApp); +``` + +Allows you to: +- assert consumer group existence +- check stability +- inspect committed offsets + +--- + +### Creating Test Clients + +```java +KafkaTestClient client = runner.newTestClient(); +``` + +Provides: +- AdminClient access +- Producer/Consumer helpers +- runtime-aware configuration + +--- + +## TestApplicationTopologyFactory + +`TestApplicationTopologyFactory` is a test helper for Kafka Streams applications that integrates with Fluent Kafka Streams Tests. + +It allows you to: +- derive a `TestTopology` from a real application +- reuse production topology and configuration +- inject test-specific runtime settings + +--- + +### Typical Usage + +```java +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(); +``` + +or without Schema Registry: + +```java +TestApplicationTopologyFactory factory = new TestApplicationTopologyFactory(); +``` + +--- + +### Schema Registry Support + +#### Automatic Schema Registry + +```java +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(); +``` + +- random isolated scope +- no cross-test collisions +- safe for parallel execution + +--- + +#### Custom Schema Registry + +```java +TestSchemaRegistry registry = new TestSchemaRegistry(); +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(registry); +``` + +--- + +### Modifying Kafka Configuration + +```java +factory = factory.with(Map.of("commit.interval.ms", 100)); +``` + +- merged into runtime configuration +- applies only to tests +- does not mutate application + +--- + +### Creating a TestTopology + +```java +TestTopology topology = factory.createTopology(app); +``` + +Execution flow: +1. application prepared +2. runtime configuration injected +3. topology extracted +4. `TestTopology` created + +--- + +### JUnit 5 Integration + +```java +TestTopologyExtension extension = factory.createTopologyExtension(app); +``` + +--- + +### Accessing Kafka Properties + +```java +Map props = factory.getKafkaProperties(app); +``` +--- diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 1b2fed48d..1f1e55a32 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -21,5 +21,26 @@ plugins: markdown_extensions: - attr_list nav: - - Index: index.md + - User guide: + - What is streams-bootstrap: index.md + - Changelog: user/changelog.md + - Getting Started: + - Setup: user/getting-started/setup.md + - Quick start: user/getting-started/quick-start.md + - Concepts: + - Common concepts: user/concepts/common.md + - Streams concepts: user/concepts/streams.md + - Producer concepts: user/concepts/producer.md + - Consumer concepts: user/concepts/consumer.md + - Consumer-Producer concepts: user/concepts/consumer-producer.md + - Testing: user/testing.md + - Monitoring: user/monitoring.md + - Extensions: + - Large messages: user/extensions/large-messages.md + - Deployment: + - Local deployment: user/deployment/local.md + - Kubernetes: user/deployment/kubernetes.md + - Examples: + - Word count: user/examples/word-count.md + - Interactive queries: user/examples/interactive-queries.md - Javadoc: javadoc/index.html