From ab1c2b50aa19c8afcc9058447d76ab533a7143f6 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 3 Apr 2026 09:36:59 +0000 Subject: [PATCH 1/3] Add DurableStateStore TCK to persistence-tck module Agent-Logs-Url: https://github.com/pjfanning/incubator-pekko/sessions/77803e79-3631-4111-98a8-4a1cf6f7beca Co-authored-by: pjfanning <11783444+pjfanning@users.noreply.github.com> --- .../pekko/persistence/CapabilityFlags.scala | 11 ++ .../state/JavaDurableStateStoreSpec.scala | 72 ++++++++++ .../state/DurableStateStoreSpec.scala | 129 ++++++++++++++++++ ...tenceTestKitDurableStateStoreTCKSpec.scala | 28 ++++ 4 files changed, 240 insertions(+) create mode 100644 persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala create mode 100644 persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala create mode 100644 persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala index acac5ef412b..cc2f57a00c9 100644 --- a/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala @@ -86,3 +86,14 @@ trait SnapshotStoreCapabilityFlags extends CapabilityFlags { protected def supportsMetadata: CapabilityFlag } //#snapshot-store-flags + +//#durable-state-store-flags +trait DurableStateStoreCapabilityFlags extends CapabilityFlags { + + /** + * When `true` enables tests which check if the durable state store properly rejects + * a `deleteObject` call when the revision does not match the stored revision. + */ + protected def supportsDeleteWithRevisionCheck: CapabilityFlag +} +//#durable-state-store-flags diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala new file mode 100644 index 00000000000..8223355ac03 --- /dev/null +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.persistence.japi.state + +import scala.collection.immutable + +import org.apache.pekko +import pekko.persistence.CapabilityFlag +import pekko.persistence.state.DurableStateStoreSpec + +import org.scalatest.{ Args, ConfigMap, Filter, Status, Suite, TestData } + +import com.typesafe.config.Config + +/** + * JAVA API + * + * This spec aims to verify custom pekko-persistence [[pekko.persistence.state.DurableStateStore]] + * implementations. + * Plugin authors are highly encouraged to include it in their plugin's test suites. + * + * In case your durable state store plugin needs some kind of setup or teardown, override the + * `beforeAll` or `afterAll` methods (don't forget to call `super` in your overridden methods). + * + * @see [[pekko.persistence.state.DurableStateStoreSpec]] + */ +class JavaDurableStateStoreSpec(config: Config) extends DurableStateStoreSpec(config) { + override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off() + + override def runTests(testName: Option[String], args: Args): Status = + super.runTests(testName, args) + + override def runTest(testName: String, args: Args): Status = + super.runTest(testName, args) + + override def run(testName: Option[String], args: Args): Status = + super.run(testName, args) + + override def testDataFor(testName: String, theConfigMap: ConfigMap): TestData = + super.testDataFor(testName, theConfigMap) + + override def testNames: Set[String] = + super.testNames + + override def tags: Map[String, Set[String]] = + super.tags + + override def rerunner: Option[String] = + super.rerunner + + override def expectedTestCount(filter: Filter): Int = + super.expectedTestCount(filter) + + override def suiteId: String = + super.suiteId + + override def suiteName: String = + super.suiteName + + override def runNestedSuites(args: Args): Status = + super.runNestedSuites(args) + + override def nestedSuites: immutable.IndexedSeq[Suite] = + super.nestedSuites +} diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala new file mode 100644 index 00000000000..1fe32eee1a6 --- /dev/null +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.persistence.state + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.persistence.CapabilityFlag +import pekko.persistence.DurableStateStoreCapabilityFlags +import pekko.persistence.PluginSpec +import pekko.persistence.scalatest.{ MayVerb, OptionalTests } +import pekko.persistence.state.scaladsl.DurableStateUpdateStore + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +object DurableStateStoreSpec { + val config: Config = ConfigFactory.empty() +} + +/** + * This spec aims to verify custom pekko-persistence [[DurableStateStore]] implementations. + * Plugin authors are highly encouraged to include it in their plugin's test suites. + * + * In case your durable state store plugin needs some kind of setup or teardown, override the `beforeAll` + * or `afterAll` methods (don't forget to call `super` in your overridden methods). + * + * For a Java and JUnit consumable version of the TCK please refer to + * [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]]. + * + * @see [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]] + */ +abstract class DurableStateStoreSpec(config: Config) + extends PluginSpec(config) + with MayVerb + with OptionalTests + with DurableStateStoreCapabilityFlags { + + implicit lazy val system: ActorSystem = + ActorSystem("DurableStateStoreSpec", config.withFallback(DurableStateStoreSpec.config)) + + override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off() + + /** + * Returns the `DurableStateUpdateStore` under test. By default, this uses the plugin + * configured under `pekko.persistence.state.plugin` in the provided config. + */ + def durableStateStore(): DurableStateUpdateStore[Any] = + DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateUpdateStore[Any]]("") + + private val timeout = 3.seconds + + "A durable state store" must { + "not find a non-existing object" in { + val result = Await.result(durableStateStore().getObject(pid), timeout) + result.value shouldBe None + } + + "persist a state and retrieve it" in { + val value = s"state-${pid}" + Await.result(durableStateStore().upsertObject(pid, 1L, value, ""), timeout) + val result = Await.result(durableStateStore().getObject(pid), timeout) + result.value shouldBe Some(value) + result.revision shouldBe 1L + } + + "update a state" in { + val store = durableStateStore() + val value1 = s"state-1-${pid}" + val value2 = s"state-2-${pid}" + Await.result(store.upsertObject(pid, 1L, value1, ""), timeout) + Await.result(store.upsertObject(pid, 2L, value2, ""), timeout) + val result = Await.result(store.getObject(pid), timeout) + result.value shouldBe Some(value2) + result.revision shouldBe 2L + } + + "delete a state" in { + val store = durableStateStore() + val value = s"state-${pid}" + Await.result(store.upsertObject(pid, 1L, value, ""), timeout) + Await.result(store.deleteObject(pid, 2L), timeout) + val result = Await.result(store.getObject(pid), timeout) + result.value shouldBe None + } + + "handle different persistence IDs independently" in { + val store = durableStateStore() + val pid2 = pid + "-2" + val value1 = s"state-${pid}" + val value2 = s"state-${pid2}" + Await.result(store.upsertObject(pid, 1L, value1, ""), timeout) + Await.result(store.upsertObject(pid2, 1L, value2, ""), timeout) + + val result1 = Await.result(store.getObject(pid), timeout) + val result2 = Await.result(store.getObject(pid2), timeout) + + result1.value shouldBe Some(value1) + result2.value shouldBe Some(value2) + } + } + + "A durable state store optionally".may { + optional(flag = supportsDeleteWithRevisionCheck) { + "fail to delete a state when the revision does not match" in { + val store = durableStateStore() + val value = s"state-${pid}" + Await.result(store.upsertObject(pid, 1L, value, ""), timeout) + val deleteResult = store.deleteObject(pid, 99L) + intercept[Exception] { + Await.result(deleteResult, timeout) + } + // The original state should still be accessible + val result = Await.result(store.getObject(pid), timeout) + result.value shouldBe Some(value) + result.revision shouldBe 1L + } + } + } +} diff --git a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala new file mode 100644 index 00000000000..f8fd06f6fed --- /dev/null +++ b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +package org.apache.pekko.persistence.testkit.state.scaladsl + +import org.apache.pekko +import pekko.persistence.CapabilityFlag +import pekko.persistence.state.DurableStateStoreSpec +import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin + +import com.typesafe.config.ConfigFactory + +object PersistenceTestKitDurableStateStoreTCKSpec { + val config = PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(""" + pekko.loglevel = DEBUG + """)) +} + +class PersistenceTestKitDurableStateStoreTCKSpec + extends DurableStateStoreSpec(PersistenceTestKitDurableStateStoreTCKSpec.config) { + override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off() +} From 0e299a4679b3fbfbfbff156514905b4080d3b775 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 3 Apr 2026 12:52:27 +0200 Subject: [PATCH 2/3] license headers --- .../japi/state/JavaDurableStateStoreSpec.scala | 4 ++++ .../persistence/state/DurableStateStoreSpec.scala | 4 ++++ ...ersistenceTestKitDurableStateStoreTCKSpec.scala | 14 +++++++++++--- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala index 8223355ac03..4a21a26c457 100644 --- a/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala @@ -7,6 +7,10 @@ * This file is part of the Apache Pekko project, which was derived from Akka. */ +/* + * Copyright (C) 2014-2022 Lightbend Inc. + */ + package org.apache.pekko.persistence.japi.state import scala.collection.immutable diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala index 1fe32eee1a6..a9d5c907175 100644 --- a/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala @@ -7,6 +7,10 @@ * This file is part of the Apache Pekko project, which was derived from Akka. */ +/* + * Copyright (C) 2014-2022 Lightbend Inc. + */ + package org.apache.pekko.persistence.state import scala.concurrent.Await diff --git a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala index f8fd06f6fed..e24d5fbbbc2 100644 --- a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala +++ b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala @@ -1,10 +1,18 @@ /* * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * This file is part of the Apache Pekko project, which was derived from Akka. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.pekko.persistence.testkit.state.scaladsl From 0989b360368e681779b8b55b2885328d766369ef Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 5 Apr 2026 21:47:01 +0200 Subject: [PATCH 3/3] use non-empty tags in tests due to issues with Oracle JDBC tests --- .../persistence/state/DurableStateStoreSpec.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala index a9d5c907175..7242133ef95 100644 --- a/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala @@ -71,7 +71,7 @@ abstract class DurableStateStoreSpec(config: Config) "persist a state and retrieve it" in { val value = s"state-${pid}" - Await.result(durableStateStore().upsertObject(pid, 1L, value, ""), timeout) + Await.result(durableStateStore().upsertObject(pid, 1L, value, "test-tag"), timeout) val result = Await.result(durableStateStore().getObject(pid), timeout) result.value shouldBe Some(value) result.revision shouldBe 1L @@ -81,8 +81,8 @@ abstract class DurableStateStoreSpec(config: Config) val store = durableStateStore() val value1 = s"state-1-${pid}" val value2 = s"state-2-${pid}" - Await.result(store.upsertObject(pid, 1L, value1, ""), timeout) - Await.result(store.upsertObject(pid, 2L, value2, ""), timeout) + Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout) + Await.result(store.upsertObject(pid, 2L, value2, "test-tag"), timeout) val result = Await.result(store.getObject(pid), timeout) result.value shouldBe Some(value2) result.revision shouldBe 2L @@ -91,7 +91,7 @@ abstract class DurableStateStoreSpec(config: Config) "delete a state" in { val store = durableStateStore() val value = s"state-${pid}" - Await.result(store.upsertObject(pid, 1L, value, ""), timeout) + Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout) Await.result(store.deleteObject(pid, 2L), timeout) val result = Await.result(store.getObject(pid), timeout) result.value shouldBe None @@ -102,8 +102,8 @@ abstract class DurableStateStoreSpec(config: Config) val pid2 = pid + "-2" val value1 = s"state-${pid}" val value2 = s"state-${pid2}" - Await.result(store.upsertObject(pid, 1L, value1, ""), timeout) - Await.result(store.upsertObject(pid2, 1L, value2, ""), timeout) + Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout) + Await.result(store.upsertObject(pid2, 1L, value2, "test-tag"), timeout) val result1 = Await.result(store.getObject(pid), timeout) val result2 = Await.result(store.getObject(pid2), timeout) @@ -118,7 +118,7 @@ abstract class DurableStateStoreSpec(config: Config) "fail to delete a state when the revision does not match" in { val store = durableStateStore() val value = s"state-${pid}" - Await.result(store.upsertObject(pid, 1L, value, ""), timeout) + Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout) val deleteResult = store.deleteObject(pid, 99L) intercept[Exception] { Await.result(deleteResult, timeout)