diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala index acac5ef412b..cc2f57a00c9 100644 --- a/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/CapabilityFlags.scala @@ -86,3 +86,14 @@ trait SnapshotStoreCapabilityFlags extends CapabilityFlags { protected def supportsMetadata: CapabilityFlag } //#snapshot-store-flags + +//#durable-state-store-flags +trait DurableStateStoreCapabilityFlags extends CapabilityFlags { + + /** + * When `true` enables tests which check if the durable state store properly rejects + * a `deleteObject` call when the revision does not match the stored revision. + */ + protected def supportsDeleteWithRevisionCheck: CapabilityFlag +} +//#durable-state-store-flags diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala new file mode 100644 index 00000000000..4a21a26c457 --- /dev/null +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/japi/state/JavaDurableStateStoreSpec.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2014-2022 Lightbend Inc. + */ + +package org.apache.pekko.persistence.japi.state + +import scala.collection.immutable + +import org.apache.pekko +import pekko.persistence.CapabilityFlag +import pekko.persistence.state.DurableStateStoreSpec + +import org.scalatest.{ Args, ConfigMap, Filter, Status, Suite, TestData } + +import com.typesafe.config.Config + +/** + * JAVA API + * + * This spec aims to verify custom pekko-persistence [[pekko.persistence.state.DurableStateStore]] + * implementations. + * Plugin authors are highly encouraged to include it in their plugin's test suites. + * + * In case your durable state store plugin needs some kind of setup or teardown, override the + * `beforeAll` or `afterAll` methods (don't forget to call `super` in your overridden methods). + * + * @see [[pekko.persistence.state.DurableStateStoreSpec]] + */ +class JavaDurableStateStoreSpec(config: Config) extends DurableStateStoreSpec(config) { + override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off() + + override def runTests(testName: Option[String], args: Args): Status = + super.runTests(testName, args) + + override def runTest(testName: String, args: Args): Status = + super.runTest(testName, args) + + override def run(testName: Option[String], args: Args): Status = + super.run(testName, args) + + override def testDataFor(testName: String, theConfigMap: ConfigMap): TestData = + super.testDataFor(testName, theConfigMap) + + override def testNames: Set[String] = + super.testNames + + override def tags: Map[String, Set[String]] = + super.tags + + override def rerunner: Option[String] = + super.rerunner + + override def expectedTestCount(filter: Filter): Int = + super.expectedTestCount(filter) + + override def suiteId: String = + super.suiteId + + override def suiteName: String = + super.suiteName + + override def runNestedSuites(args: Args): Status = + super.runNestedSuites(args) + + override def nestedSuites: immutable.IndexedSeq[Suite] = + super.nestedSuites +} diff --git a/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala new file mode 100644 index 00000000000..7242133ef95 --- /dev/null +++ b/persistence-tck/src/main/scala/org/apache/pekko/persistence/state/DurableStateStoreSpec.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2014-2022 Lightbend Inc. + */ + +package org.apache.pekko.persistence.state + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.apache.pekko +import pekko.actor.ActorSystem +import pekko.persistence.CapabilityFlag +import pekko.persistence.DurableStateStoreCapabilityFlags +import pekko.persistence.PluginSpec +import pekko.persistence.scalatest.{ MayVerb, OptionalTests } +import pekko.persistence.state.scaladsl.DurableStateUpdateStore + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +object DurableStateStoreSpec { + val config: Config = ConfigFactory.empty() +} + +/** + * This spec aims to verify custom pekko-persistence [[DurableStateStore]] implementations. + * Plugin authors are highly encouraged to include it in their plugin's test suites. + * + * In case your durable state store plugin needs some kind of setup or teardown, override the `beforeAll` + * or `afterAll` methods (don't forget to call `super` in your overridden methods). + * + * For a Java and JUnit consumable version of the TCK please refer to + * [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]]. + * + * @see [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]] + */ +abstract class DurableStateStoreSpec(config: Config) + extends PluginSpec(config) + with MayVerb + with OptionalTests + with DurableStateStoreCapabilityFlags { + + implicit lazy val system: ActorSystem = + ActorSystem("DurableStateStoreSpec", config.withFallback(DurableStateStoreSpec.config)) + + override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off() + + /** + * Returns the `DurableStateUpdateStore` under test. By default, this uses the plugin + * configured under `pekko.persistence.state.plugin` in the provided config. + */ + def durableStateStore(): DurableStateUpdateStore[Any] = + DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateUpdateStore[Any]]("") + + private val timeout = 3.seconds + + "A durable state store" must { + "not find a non-existing object" in { + val result = Await.result(durableStateStore().getObject(pid), timeout) + result.value shouldBe None + } + + "persist a state and retrieve it" in { + val value = s"state-${pid}" + Await.result(durableStateStore().upsertObject(pid, 1L, value, "test-tag"), timeout) + val result = Await.result(durableStateStore().getObject(pid), timeout) + result.value shouldBe Some(value) + result.revision shouldBe 1L + } + + "update a state" in { + val store = durableStateStore() + val value1 = s"state-1-${pid}" + val value2 = s"state-2-${pid}" + Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout) + Await.result(store.upsertObject(pid, 2L, value2, "test-tag"), timeout) + val result = Await.result(store.getObject(pid), timeout) + result.value shouldBe Some(value2) + result.revision shouldBe 2L + } + + "delete a state" in { + val store = durableStateStore() + val value = s"state-${pid}" + Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout) + Await.result(store.deleteObject(pid, 2L), timeout) + val result = Await.result(store.getObject(pid), timeout) + result.value shouldBe None + } + + "handle different persistence IDs independently" in { + val store = durableStateStore() + val pid2 = pid + "-2" + val value1 = s"state-${pid}" + val value2 = s"state-${pid2}" + Await.result(store.upsertObject(pid, 1L, value1, "test-tag"), timeout) + Await.result(store.upsertObject(pid2, 1L, value2, "test-tag"), timeout) + + val result1 = Await.result(store.getObject(pid), timeout) + val result2 = Await.result(store.getObject(pid2), timeout) + + result1.value shouldBe Some(value1) + result2.value shouldBe Some(value2) + } + } + + "A durable state store optionally".may { + optional(flag = supportsDeleteWithRevisionCheck) { + "fail to delete a state when the revision does not match" in { + val store = durableStateStore() + val value = s"state-${pid}" + Await.result(store.upsertObject(pid, 1L, value, "test-tag"), timeout) + val deleteResult = store.deleteObject(pid, 99L) + intercept[Exception] { + Await.result(deleteResult, timeout) + } + // The original state should still be accessible + val result = Await.result(store.getObject(pid), timeout) + result.value shouldBe Some(value) + result.revision shouldBe 1L + } + } + } +} diff --git a/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala new file mode 100644 index 00000000000..e24d5fbbbc2 --- /dev/null +++ b/persistence-testkit/src/test/scala/org/apache/pekko/persistence/testkit/state/scaladsl/PersistenceTestKitDurableStateStoreTCKSpec.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.testkit.state.scaladsl + +import org.apache.pekko +import pekko.persistence.CapabilityFlag +import pekko.persistence.state.DurableStateStoreSpec +import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin + +import com.typesafe.config.ConfigFactory + +object PersistenceTestKitDurableStateStoreTCKSpec { + val config = PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString(""" + pekko.loglevel = DEBUG + """)) +} + +class PersistenceTestKitDurableStateStoreTCKSpec + extends DurableStateStoreSpec(PersistenceTestKitDurableStateStoreTCKSpec.config) { + override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off() +}