Skip to content

Commit fe93dd9

Browse files
authored
Merge pull request #22 from nkcoder/feature/kafka
[Feature] Publish events to Kafka
2 parents 3403ecf + 8763fe6 commit fe93dd9

File tree

20 files changed

+347
-101
lines changed

20 files changed

+347
-101
lines changed

CLAUDE.md

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ role-based access control (ADMIN/MEMBER), and PostgreSQL persistence with Flyway
2020
- **Java 25** with virtual threads (Project Loom)
2121
- **Spring Boot 4.0** with Spring Framework 7.0
2222
- **Spring Modulith 2.0** for modular architecture
23+
- **Apache Kafka** for event externalization (Spring Modulith integration)
2324
- **PostgreSQL** with Flyway migrations
2425
- **gRPC** alongside REST APIs
2526

@@ -187,7 +188,8 @@ notification ──→ shared ←── user
187188
**Notification Module** (`org.nkcoder.notification`):
188189

189190
- `NotificationService` - Public API for sending notifications
190-
- `application/UserEventListener` - Listens to UserRegisteredEvent
191+
- `application/ApplicationEventListener` - In-process listener for domain events (sends emails)
192+
- `application/KafkaEventListener` - Kafka consumer for externalized events
191193

192194
**Shared Module** (`org.nkcoder.shared`):
193195

@@ -329,34 +331,54 @@ PATCH /api/users/{userId}/password - Reset password (admin only)
329331

330332
### Event-Driven Communication
331333

332-
Modules communicate via domain events using Spring Modulith's event infrastructure:
334+
Modules communicate via domain events using Spring Modulith's event infrastructure with **Kafka externalization**.
335+
336+
**Event Externalization**: Domain events marked with `@Externalized` are automatically published to Kafka topics:
337+
338+
| Event | Kafka Topic | Description |
339+
|-------|-------------|-------------|
340+
| `UserRegisteredEvent` | `user.registered` | Published when user completes registration |
341+
| `OtpRequestedEvent` | `user.otp.requested` | Published when user requests OTP |
333342

334343
**Publishing Events** (in User module):
335344

336345
```java
337346
// In AuthApplicationService after registration
338-
domainEventPublisher.publish(new UserRegisteredEvent(user.getId(),user.
347+
domainEventPublisher.publish(new UserRegisteredEvent(user.getId(), user.getEmail(), user.getName()));
348+
```
339349

340-
getEmail(),user.
350+
**Event Definition with Kafka Externalization**:
341351

342-
getName()));
352+
```java
353+
@Externalized("user.registered") // Kafka topic name
354+
public record UserRegisteredEvent(UUID userId, String email, String userName, LocalDateTime occurredOn)
355+
implements DomainEvent {}
343356
```
344357

345358
**Listening to Events** (in Notification module):
346359

347360
```java
348-
361+
// In-process listener (immediate, same JVM)
349362
@Component
350-
public class UserEventListener {
363+
public class ApplicationEventListener {
351364
@ApplicationModuleListener
352365
public void onUserRegistered(UserRegisteredEvent event) {
353366
notificationService.sendWelcomeEmail(event.email(), event.userName());
354367
}
355368
}
369+
370+
// Kafka consumer (for external/distributed processing)
371+
@Component
372+
public class KafkaEventListener {
373+
@KafkaListener(topics = "user.registered", groupId = "notification-service")
374+
public void onUserRegistered(String message) {
375+
// Decode Base64 and deserialize JSON
376+
}
377+
}
356378
```
357379

358380
**Event Publication Table**: Spring Modulith persists events to `event_publication` table for reliable delivery (
359-
transactional outbox pattern).
381+
transactional outbox pattern). Events are stored before being sent to Kafka, ensuring at-least-once delivery.
360382

361383
### Configuration Management
362384

@@ -378,6 +400,7 @@ JWT_REFRESH_SECRET=<min 64 bytes for HS512>
378400
JWT_ACCESS_EXPIRES_IN=15m
379401
JWT_REFRESH_EXPIRES_IN=7d
380402
CLIENT_URL=http://localhost:3000
403+
SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
381404
```
382405

383406
**Configuration Binding**:
@@ -564,9 +587,10 @@ class ModulithArchitectureTest {
564587

565588
1. Create event record in `shared/kernel/domain/event/` (if cross-module) or `{module}/domain/event/` (if
566589
module-internal)
567-
2. Inject `DomainEventPublisher` in your service
568-
3. Call `domainEventPublisher.publish(event)` after business logic
569-
4. Create `@ApplicationModuleListener` in consuming module
590+
2. Add `@Externalized("topic-name")` annotation to publish to Kafka
591+
3. Inject `DomainEventPublisher` in your service
592+
4. Call `domainEventPublisher.publish(event)` after business logic
593+
5. Create `@ApplicationModuleListener` in consuming module (in-process) and/or `@KafkaListener` (Kafka consumer)
570594

571595
**Database Schema Change**:
572596

@@ -604,11 +628,20 @@ class ModulithArchitectureTest {
604628
- Cross-module events go in `shared.kernel.domain.event/`
605629
- Use `@ApplicationModuleListener` for reliable event handling (auto-retry, persistence)
606630

631+
**Kafka Integration**:
632+
633+
- Events with `@Externalized` annotation are automatically published to Kafka topics
634+
- Consumer group: `notification-service`
635+
- Messages are Base64-encoded JSON
636+
- Kafka ports: `9092` (internal Docker), `29092` (external/host)
637+
- Topics are auto-created on first publish
638+
607639
**Future Microservice Extraction**:
608640
When ready to extract a module as a microservice:
609641

610-
1. Events become messages (Kafka/RabbitMQ)
642+
1. Events are already externalized to Kafka - no change needed
611643
2. REST/gRPC calls replace direct method calls
612644
3. Module's `infrastructure/` adapters change, domain stays the same
613645
4. Database can be separated per module
646+
5. Kafka consumers in extracted service continue to receive events
614647

Dockerfile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# Multi-stage Dockerfile for Spring Boot Application
33
# =============================================================================
44
# Build: docker build -t user-service .
5-
# Run: docker run -p 3001:3001 -p 9090:9090 user-service
5+
# Run: docker run -p 8080:8080 -p 9090:9090 user-service
66
# =============================================================================
77

88
# -----------------------------------------------------------------------------
@@ -14,7 +14,7 @@ WORKDIR /app
1414

1515
# Copy Gradle wrapper and build files first (for layer caching)
1616
COPY gradle/ gradle/
17-
COPY gradlew build.gradle.kts ./
17+
COPY gradlew build.gradle.kts settings.gradle.kts gradle.properties ./
1818

1919
# Make gradlew executable
2020
RUN chmod +x ./gradlew
@@ -49,11 +49,11 @@ RUN chown -R appuser:appgroup /app
4949
USER appuser
5050

5151
# Expose REST and gRPC ports
52-
EXPOSE 3001 9090
52+
EXPOSE 8080 9090
5353

5454
# Health check
5555
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
56-
CMD wget --no-verbose --tries=1 --spider http://localhost:3001/actuator/health || exit 1
56+
CMD wget --no-verbose --tries=1 --spider http://localhost:8080/actuator/health || exit 1
5757

5858
# JVM optimizations for containers
5959
ENV JAVA_OPTS="-XX:+UseContainerSupport \

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ A comprehensive user authentication and management service featuring OAuth2, OTP
1313
| **Passwordless** | One-Time Password (OTP) login flow via email |
1414
| **Governance** | Role-based access control (MEMBER, ADMIN), profile management |
1515
| **Architecture** | **Modular Monolith** (Spring Modulith), DDD, Event-driven communication |
16+
| **Messaging** | **Apache Kafka** for event externalization (Spring Modulith integration)|
1617
| **Performance** | **Java 25 Virtual Threads**, gRPC for high-speed communication |
1718

1819
## Documentation Hub
@@ -28,7 +29,7 @@ A comprehensive user authentication and management service featuring OAuth2, OTP
2829

2930
### Prerequisites
3031
- **Java 25** & Gradle 8+
31-
- PostgreSQL 17 or Docker
32+
- PostgreSQL 17, Apache Kafka, or Docker
3233

3334
### Running Locally
3435
1. Copy `.env.example` to `.env` and configure secrets (JWT, Mail, OAuth2).

auto/docker_logs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
#!/usr/bin/env sh
22

3-
docker compose -f docker-compose-all.yml logs -f --tail 100
3+
docker compose -f docker-compose-all.yml logs -f --tail 50

build.gradle.kts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependencies {
4242
implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:3.0.0")
4343
implementation("org.springframework.modulith:spring-modulith-starter-core")
4444
implementation("org.springframework.modulith:spring-modulith-starter-jpa")
45+
implementation("org.springframework.modulith:spring-modulith-events-kafka")
4546

4647
// Database
4748
implementation("org.springframework.boot:spring-boot-starter-flyway")
@@ -58,6 +59,9 @@ dependencies {
5859
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
5960
developmentOnly("org.springframework.boot:spring-boot-docker-compose")
6061

62+
// Messaging
63+
implementation("org.springframework.kafka:spring-kafka")
64+
6165
// Testing
6266
testImplementation("org.springframework.boot:spring-boot-starter-webmvc-test")
6367
testImplementation("org.springframework.boot:spring-boot-starter-webflux-test") // For WebTestClient
@@ -67,7 +71,9 @@ dependencies {
6771
testImplementation("org.junit.jupiter:junit-jupiter:5.13.3")
6872
testImplementation("org.testcontainers:junit-jupiter")
6973
testImplementation("org.testcontainers:postgresql")
74+
testImplementation("org.testcontainers:kafka")
7075
testImplementation("org.springframework.modulith:spring-modulith-starter-test")
76+
testImplementation("org.springframework.kafka:spring-kafka-test")
7177

7278
// gRPC and Protobuf
7379
implementation("io.grpc:grpc-netty-shaded:1.77.0")
@@ -124,7 +130,7 @@ tasks.register("runLocal") {
124130
// JVM optimization for microservices
125131
tasks.named<org.springframework.boot.gradle.tasks.run.BootRun>("bootRun") {
126132
jvmArgs = listOf(
127-
"-Xms256m", "-Xmx512m", "-XX:+UseG1GC", "-XX:+UseStringDeduplication"
133+
"-Xms512m", "-Xmx1024m", "-XX:+UseG1GC", "-XX:+UseStringDeduplication"
128134
)
129135
// Pass environment variables at execution time (not configuration time)
130136
// This ensures .env variables sourced by auto/run are available

docker-compose-all.yml

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
# =============================================================================
44
# Simulates dev/prod environment with all services running in containers.
55
#
6-
# Usage:
7-
# Start: docker compose -f docker-compose-all.yml up -d
8-
# Stop: docker compose -f docker-compose-all.yml down
9-
# Logs: docker compose -f docker-compose-all.yml logs -f user-service
10-
# Rebuild: docker compose -f docker-compose-all.yml up -d --build
11-
#
126
# For production, use external secrets management (Vault, AWS Secrets Manager)
137
# instead of environment variables in this file.
8+
#
9+
# For container communications: App, kafka and PostgreSQL are all running inside Docker.
10+
# Kafka has two listeners: 9092 (internal) and 29092 (external)
11+
# Container app connects via `kafka:9092` (Docker network)
12+
# Host debugging via `localhost:29092`
13+
# KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:29092
14+
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
1415
# =============================================================================
1516

1617
services:
@@ -23,11 +24,13 @@ services:
2324
dockerfile: Dockerfile
2425
container_name: user-application
2526
ports:
26-
- "3001:3001" # REST API
27+
- "8080:8080" # REST API
2728
- "9090:9090" # gRPC API
2829
depends_on:
2930
postgres:
3031
condition: service_healthy
32+
kafka:
33+
condition: service_healthy
3134
environment:
3235
# Profile: use 'dev' for development simulation, 'prod' for production
3336
- SPRING_PROFILES_ACTIVE=dev
@@ -37,6 +40,9 @@ services:
3740
- DATABASE_USERNAME=app_user
3841
- DATABASE_PASSWORD=${DB_PASSWORD:-changeme_in_production}
3942

43+
# Kafka connection
44+
- SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
45+
4046
# JWT secrets - MUST be overridden in production!
4147
# Generate with: openssl rand -base64 64
4248
- JWT_ACCESS_SECRET=${JWT_ACCESS_SECRET:-dev-only-access-secret-key-must-be-at-least-64-bytes-long-for-hs512}
@@ -47,8 +53,18 @@ services:
4753

4854
# JVM options for container environment
4955
- JAVA_OPTS=-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0
56+
57+
# Mail
58+
- MAIL_USERNAME=${MAIL_USERNAME}
59+
- MAIL_PASSWORD=${MAIL_PASSWORD}
60+
61+
# OAuth 2
62+
- GOOGLE_CLIENT_ID=${GOOGLE_CLIENT_ID}
63+
- GOOGLE_CLIENT_SECRET=${GOOGLE_CLIENT_SECRET}
64+
- GITHUB_CLIENT_ID=${GITHUB_CLIENT_ID}
65+
- GITHUB_CLIENT_SECRET=${GITHUB_CLIENT_SECRET}
5066
healthcheck:
51-
test: [ "CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:3001/actuator/health" ]
67+
test: [ "CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8080/actuator/health" ]
5268
interval: 30s
5369
timeout: 10s
5470
retries: 3
@@ -65,6 +81,48 @@ services:
6581
networks:
6682
- app-network
6783

84+
# ---------------------------------------------------------------------------
85+
# Apache Kafka
86+
# ---------------------------------------------------------------------------
87+
kafka:
88+
image: apache/kafka:4.1.1
89+
container_name: user-application-kafka
90+
hostname: kafka
91+
ports:
92+
- "29092:29092" # External port for debugging (remove in production)
93+
environment:
94+
KAFKA_NODE_ID: 1
95+
KAFKA_PROCESS_ROLES: broker,controller
96+
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:29092
97+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:29092
98+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
99+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
100+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
101+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
102+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
103+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
104+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
105+
KAFKA_LOG_DIRS: /var/lib/kafka/data
106+
volumes:
107+
- kafka_data:/var/lib/kafka/data
108+
healthcheck:
109+
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 > /dev/null 2>&1"]
110+
interval: 10s
111+
timeout: 10s
112+
retries: 5
113+
start_period: 30s
114+
restart: unless-stopped
115+
deploy:
116+
resources:
117+
limits:
118+
cpus: '1'
119+
memory: 1G
120+
reservations:
121+
cpus: '0.25'
122+
memory: 512M
123+
networks:
124+
- app-network
125+
68126
# ---------------------------------------------------------------------------
69127
# PostgreSQL Database
70128
# ---------------------------------------------------------------------------
@@ -102,6 +160,8 @@ services:
102160
volumes:
103161
postgres_data:
104162
name: user-service-postgres-data
163+
kafka_data:
164+
name: user-service-kafka-data
105165

106166
networks:
107167
app-network:

docker-compose.yml

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,12 @@
55
# spring.docker.compose.enabled=true (in application-local.yml)
66
#
77
# This file is auto-detected and managed by Spring Boot.
8-
# The application runs on your host machine, only PostgreSQL runs in Docker.
8+
# The application runs on your host machine, only PostgreSQL and Kafka runs in Docker.
99
#
10-
# Usage:
11-
# ./gradlew bootRun --args='--spring.profiles.active=local'
12-
# (Spring Boot automatically starts/stops this compose file)
13-
#
14-
# Manual control:
15-
# Start: docker compose up -d
16-
# Stop: docker compose down
17-
# Reset: docker compose down -v (removes data volume)
10+
# For Kafka:
11+
# Kafka has one listener on port 29092, host app connects via `localhost:29092`
12+
# AFKA_LISTENERS: PLAINTEXT://:29092
13+
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
1814
# =============================================================================
1915

2016
services:
@@ -35,6 +31,36 @@ services:
3531
timeout: 5s
3632
retries: 5
3733

34+
kafka:
35+
image: apache/kafka:4.1.1
36+
container_name: user-service-kafka
37+
hostname: kafka
38+
ports:
39+
- "29092:29092"
40+
environment:
41+
KAFKA_NODE_ID: 1
42+
KAFKA_PROCESS_ROLES: broker,controller
43+
KAFKA_LISTENERS: PLAINTEXT://:29092,CONTROLLER://:9093
44+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
45+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
46+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
47+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
48+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
49+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
50+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
51+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
52+
KAFKA_LOG_DIRS: /var/lib/kafka/data
53+
volumes:
54+
- kafka_data:/var/lib/kafka/data
55+
healthcheck:
56+
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:29092 > /dev/null 2>&1"]
57+
interval: 10s
58+
timeout: 10s
59+
retries: 5
60+
start_period: 30s
61+
3862
volumes:
3963
postgres_data:
4064
name: user-service-local-postgres-data
65+
kafka_data:
66+
name: user-service-local-kafka-data

0 commit comments

Comments
 (0)