Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*.sh text eol=lf
*.conf text eol=lf
gradlew text eol=lf
*.api text eol=lf

# These files are text and should be normalized (Convert crlf <=> lf)
*.kt text
Expand Down
3 changes: 3 additions & 0 deletions .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ springBoot = "3.5.10"
kotlin = "1.9.25" # Keep in sync with Spring - https://docs.spring.io/spring-boot/docs/current/reference/html/dependency-versions.html
kotlinCollectionsImmutable = "0.3.8"
jsonPatch = "1.13"
jmh = "1.37"
jmh-plugin = "0.7.3"
kotlinpoet = "2.2.0"
ktfmt = "0.61"
fory = "0.15.0"

# Caplin Dependencies
datasource = "8.0.10-1695-5ddb3798"
Expand All @@ -28,6 +31,10 @@ vanniktech-maven-publish-plugin = "0.31.0"
kotlin-collections-immutable = { module = "org.jetbrains.kotlinx:kotlinx-collections-immutable", version.ref = "kotlinCollectionsImmutable" }
json-patch = { module = "com.github.java-json-tools:json-patch", version.ref = "jsonPatch" }
kotlinpoet = { module = "com.squareup:kotlinpoet", version.ref = "kotlinpoet" }
fory-core = { module = "org.apache.fory:fory-core", version.ref = "fory" }
fory-kotlin = { module = "org.apache.fory:fory-kotlin", version.ref = "fory" }
jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" }
jmh-generator = { module = "org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" }

# Caplin Dependencies
datasource = { module = "com.caplin.platform.integration.java:datasource", version.ref = "datasource" }
Expand All @@ -54,4 +61,5 @@ spring-boot-dependencies = { module = "org.springframework.boot:spring-boot-depe
spring-boot-configuration-processor = { module = "org.springframework.boot:spring-boot-configuration-processor", version.ref = "springBoot" }

[plugins]
spring-boot = { id = "org.springframework.boot", version.ref = "springBoot" }
spring-boot = { id = "org.springframework.boot", version.ref = "springBoot" }
jmh = { id = "me.champeau.jmh", version.ref = "jmh-plugin" }
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ class BindTest :
delay(1)

verify { cachedMessageFactory.createContainerMessage("/SUBJECT/1") }
verify { cachedMessageFactory.createGenericMessage("/SUBJECT/1-items/1") }

sharedFlow.subscriptionCount.value shouldBeEqual 0

Expand Down
66 changes: 54 additions & 12 deletions util/api/datasourcex-util.api
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@ public final class com/caplin/integration/datasourcex/util/ReadWriteLock {
public final fun writeUnlock ()V
}

public final class com/caplin/integration/datasourcex/util/SerializablePersistentMapKt {
public static final fun serializable (Lkotlinx/collections/immutable/PersistentMap;)Lkotlinx/collections/immutable/PersistentMap;
}

public final class com/caplin/integration/datasourcex/util/SerializablePersistentSetKt {
public static final fun serializable (Lkotlinx/collections/immutable/PersistentSet;)Lkotlinx/collections/immutable/PersistentSet;
}

public abstract interface class com/caplin/integration/datasourcex/util/SimpleDataSourceConfig {
public abstract fun getExtraConfig ()Ljava/lang/String;
public abstract fun getLocalLabel ()Ljava/lang/String;
Expand Down Expand Up @@ -190,16 +182,53 @@ public abstract interface class com/caplin/integration/datasourcex/util/flow/Flo
public final class com/caplin/integration/datasourcex/util/flow/FlowMapKt {
public static final fun mutableFlowMapOf ()Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
public static final fun mutableFlowMapOf ([Lkotlin/Pair;)Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
public static final fun runningFoldToMapFlowMapStreamEvent (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun simpleToFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun toFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun toMutableFlowMap (Ljava/util/Map;)Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent {
}

public final class com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$Cleared : com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent {
public static final field INSTANCE Lcom/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$Cleared;
public fun toString ()Ljava/lang/String;
}

public final class com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$EventUpdate : com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent {
public static final synthetic fun box-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)Lcom/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$EventUpdate;
public static fun constructor-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)Z
public final fun getEvent ()Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;
public fun hashCode ()I
public static fun hashCode-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)I
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;
}

public final class com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$InitialState : com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent {
public static final synthetic fun box-impl (Ljava/util/Map;)Lcom/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$InitialState;
public static fun constructor-impl (Ljava/util/Map;)Ljava/util/Map;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/util/Map;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/util/Map;Ljava/util/Map;)Z
public final fun getMap ()Ljava/util/Map;
public fun hashCode ()I
public static fun hashCode-impl (Ljava/util/Map;)I
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Ljava/util/Map;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Ljava/util/Map;
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/LoadingCompletingSharedFlowCache {
public abstract fun get (Ljava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/CompletingSharedFlow;
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/MapEvent : java/io/Serializable {
public abstract interface class com/caplin/integration/datasourcex/util/flow/MapEvent {
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent : com/caplin/integration/datasourcex/util/flow/MapEvent {
Expand Down Expand Up @@ -257,6 +286,7 @@ public final class com/caplin/integration/datasourcex/util/flow/MapEventKt {

public abstract interface class com/caplin/integration/datasourcex/util/flow/MapFlow {
public abstract fun asFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public abstract fun asFlowWithState ()Lkotlinx/coroutines/flow/Flow;
public abstract fun valueFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

Expand Down Expand Up @@ -294,7 +324,7 @@ public final class com/caplin/integration/datasourcex/util/flow/RetryKt {
public static synthetic fun retryWithExponentialBackoff$default (Lkotlinx/coroutines/flow/Flow;JJLkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/SetEvent : java/io/Serializable {
public abstract interface class com/caplin/integration/datasourcex/util/flow/SetEvent {
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/SetEvent$EntryEvent : com/caplin/integration/datasourcex/util/flow/SetEvent {
Expand Down Expand Up @@ -337,7 +367,7 @@ public final class com/caplin/integration/datasourcex/util/flow/SetEventKt {
public static final fun toEvents (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/SimpleMapEvent : java/io/Serializable {
public abstract interface class com/caplin/integration/datasourcex/util/flow/SimpleMapEvent {
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/SimpleMapEvent$EntryEvent : com/caplin/integration/datasourcex/util/flow/SimpleMapEvent {
Expand Down Expand Up @@ -393,7 +423,7 @@ public final class com/caplin/integration/datasourcex/util/flow/TimeoutKt {
public static final fun timeoutFirstOrNull (Lkotlinx/coroutines/flow/Flow;Ljava/time/Duration;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface class com/caplin/integration/datasourcex/util/flow/ValueOrCompletion : java/io/Serializable {
public abstract interface class com/caplin/integration/datasourcex/util/flow/ValueOrCompletion {
public abstract fun map (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -430,3 +460,15 @@ public final class com/caplin/integration/datasourcex/util/flow/ValueOrCompletio
public static final fun materializeUnboxed (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
}

public final class com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModuleKt {
public static final fun registerDataSourceSerializers (Lorg/apache/fory/Fory;)Lorg/apache/fory/Fory;
}

public final class com/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModule : com/fasterxml/jackson/databind/module/SimpleModule {
public static final field INSTANCE Lcom/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModule;
}

public final class com/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModuleKt {
public static final fun registerDataSourceModule (Lcom/fasterxml/jackson/databind/ObjectMapper;)Lcom/fasterxml/jackson/databind/ObjectMapper;
}

15 changes: 14 additions & 1 deletion util/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
plugins { `common-library` }
plugins {
`common-library`
alias(libs.plugins.jmh)
}

description = "Utility classes for DataSource extensions"

Expand All @@ -15,12 +18,22 @@ dependencies {
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation(libs.kotlin.collections.immutable)

compileOnly(libs.fory.core)
compileOnly(libs.fory.kotlin)

testRuntimeOnly("org.slf4j:slf4j-simple")

testImplementation("org.springframework:spring-core") // For testing the RegexPathMatcher
testImplementation(libs.turbine)
testImplementation(libs.kotest.assertions)
testImplementation(libs.kotest.runner)
testImplementation(libs.fory.core)
testImplementation(libs.fory.kotlin)

jmh(libs.jmh.core)
jmh(libs.jmh.generator)
}

jmh { duplicateClassesStrategy.set(DuplicatesStrategy.EXCLUDE) }

dokka { dokkaSourceSets.configureEach { includes.from("README.md") } }
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package com.caplin.integration.datasourcex.util.flow

import com.caplin.integration.datasourcex.util.flow.MapEvent.EntryEvent.Upsert
import com.caplin.integration.datasourcex.util.flow.MapEvent.Populated
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.runBlocking
import org.openjdk.jmh.annotations.*

/**
* Benchmarks for [FlowMap] implementation, focusing on mutation performance, lookup efficiency, and
* Flow-based state reconstruction.
*/
@State(Scope.Benchmark)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(1)
open class FlowMapBenchmark {

private lateinit var flowMap: MutableFlowMap<String, String>

@Setup
fun setup() {
flowMap = mutableFlowMapOf()
}

/**
* Measures the throughput of single [MutableFlowMap.put] operations on an initially empty map.
*/
@Benchmark
fun putSingle() {
flowMap.put("key", "value")
}

/**
* Measures the throughput of a [MutableFlowMap.put] followed by a [MutableFlowMap.remove] on the
* same key, exercising the event emission and state update logic.
*/
@Benchmark
fun putAndRemove() {
flowMap.put("key", "value")
flowMap.remove("key")
}

/**
* Measures the throughput of [MutableFlowMap.putAll] with a small map, which triggers multiple
* events in a single state update.
*/
@Benchmark
fun putAllSmall() {
flowMap.putAll(mapOf("1" to "A", "2" to "B", "3" to "C"))
}

/** State holder for a [FlowMap] pre-populated with 1,000 entries. */
@State(Scope.Benchmark)
open class PopulatedFlowMapState {
lateinit var flowMap: MutableFlowMap<String, Int>

@Setup
fun setup() {
flowMap = mutableFlowMapOf()
repeat(1000) { flowMap.put("key$it", it) }
}
}

/** State holder for measuring mutation throughput with multiple active subscribers. */
@State(Scope.Benchmark)
open class ActiveSubscriberState {
@Param("1", "10", "100") var subscriberCount: Int = 0

lateinit var flowMap: MutableFlowMap<String, String>
lateinit var scope: CoroutineScope

@Setup
fun setup() {
flowMap = mutableFlowMapOf()
// Using Dispatchers.Default for subscribers to simulate real-world processing
scope = CoroutineScope(Dispatchers.Default)
repeat(subscriberCount) { flowMap.asFlow().launchIn(scope) }
}

@TearDown
fun tearDown() {
scope.cancel()
}
}

/**
* Measures the throughput of [MutableFlowMap.put] when there are multiple active subscribers
* collecting from the map. This identifies contention or overhead in the event dispatching logic.
*/
@Benchmark
fun putWithSubscribers(state: ActiveSubscriberState) {
state.flowMap.put("key", "value")
}

/** Measures the throughput of retrieving a value from a large, pre-populated [FlowMap]. */
@Benchmark
fun getFromLargeMap(state: PopulatedFlowMapState): Int? {
return state.flowMap["key500"]
}

/**
* Measures the time taken to collect the initial state of a large [FlowMap] via [FlowMap.asFlow].
*/
@Benchmark
fun asFlowCollection(state: PopulatedFlowMapState) = runBlocking {
state.flowMap
.asFlow()
.takeWhile { it != Populated }
.collect {
// just collect
}
}

/**
* Measures the time taken to collect the initial state of a large [FlowMap] via
* [FlowMap.asFlowWithState]. This avoids emitting individual Upsert events, making it much
* faster.
*/
@Benchmark
fun asFlowWithStateCollection(state: PopulatedFlowMapState) = runBlocking {
state.flowMap.asFlowWithState().take(1).collect {
// just collect
}
}

/**
* Measures the time taken to collect the initial state of a large [FlowMap] via [FlowMap.asFlow]
* when a predicate is applied, exercising the filtering logic within the flow.
*/
@Benchmark
fun asFlowWithPredicateCollection(state: PopulatedFlowMapState) = runBlocking {
state.flowMap
.asFlow { _, value -> value % 2 == 0 }
.takeWhile { it != Populated }
.collect {
// just collect
}
}

/**
* Measures the overhead of reconstructing a [FlowMap] from a stream of events using
* [toFlowMapIn].
*/
@Benchmark
fun toFlowMapInBenchmark() = runBlocking {
val events = flow {
repeat(100) { emit(Upsert("key$it", null, it)) }
emit(Populated)
}
val scope = CoroutineScope(Dispatchers.Default)
events.toFlowMapIn(scope)
scope.cancel()
}
}
Loading
Loading