Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,14 @@ trait SnapshotStoreCapabilityFlags extends CapabilityFlags {
protected def supportsMetadata: CapabilityFlag
}
//#snapshot-store-flags

//#durable-state-store-flags
trait DurableStateStoreCapabilityFlags extends CapabilityFlags {

/**
* When `true` enables tests which check if the durable state store properly rejects
* a `deleteObject` call when the revision does not match the stored revision.
*/
protected def supportsDeleteWithRevisionCheck: CapabilityFlag
}
//#durable-state-store-flags
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2014-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.persistence.japi.state

import scala.collection.immutable

import org.apache.pekko
import pekko.persistence.CapabilityFlag
import pekko.persistence.state.DurableStateStoreSpec

import org.scalatest.{ Args, ConfigMap, Filter, Status, Suite, TestData }

import com.typesafe.config.Config

/**
* JAVA API
*
* This spec aims to verify custom pekko-persistence [[pekko.persistence.state.DurableStateStore]]
* implementations.
* Plugin authors are highly encouraged to include it in their plugin's test suites.
*
* In case your durable state store plugin needs some kind of setup or teardown, override the
* `beforeAll` or `afterAll` methods (don't forget to call `super` in your overridden methods).
*
* @see [[pekko.persistence.state.DurableStateStoreSpec]]
*/
class JavaDurableStateStoreSpec(config: Config) extends DurableStateStoreSpec(config) {
override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off()

override def runTests(testName: Option[String], args: Args): Status =
super.runTests(testName, args)

override def runTest(testName: String, args: Args): Status =
super.runTest(testName, args)

override def run(testName: Option[String], args: Args): Status =
super.run(testName, args)

override def testDataFor(testName: String, theConfigMap: ConfigMap): TestData =
super.testDataFor(testName, theConfigMap)

override def testNames: Set[String] =
super.testNames

override def tags: Map[String, Set[String]] =
super.tags

override def rerunner: Option[String] =
super.rerunner

override def expectedTestCount(filter: Filter): Int =
super.expectedTestCount(filter)

override def suiteId: String =
super.suiteId

override def suiteName: String =
super.suiteName

override def runNestedSuites(args: Args): Status =
super.runNestedSuites(args)

override def nestedSuites: immutable.IndexedSeq[Suite] =
super.nestedSuites
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2014-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.persistence.state

import scala.concurrent.Await
import scala.concurrent.duration._

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.persistence.CapabilityFlag
import pekko.persistence.DurableStateStoreCapabilityFlags
import pekko.persistence.PluginSpec
import pekko.persistence.scalatest.{ MayVerb, OptionalTests }
import pekko.persistence.state.scaladsl.DurableStateUpdateStore

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

object DurableStateStoreSpec {
val config: Config = ConfigFactory.empty()
}

/**
* This spec aims to verify custom pekko-persistence [[DurableStateStore]] implementations.
* Plugin authors are highly encouraged to include it in their plugin's test suites.
*
* In case your durable state store plugin needs some kind of setup or teardown, override the `beforeAll`
* or `afterAll` methods (don't forget to call `super` in your overridden methods).
*
* For a Java and JUnit consumable version of the TCK please refer to
* [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]].
*
* @see [[pekko.persistence.japi.state.JavaDurableStateStoreSpec]]
*/
abstract class DurableStateStoreSpec(config: Config)
extends PluginSpec(config)
with MayVerb
with OptionalTests
with DurableStateStoreCapabilityFlags {

implicit lazy val system: ActorSystem =
ActorSystem("DurableStateStoreSpec", config.withFallback(DurableStateStoreSpec.config))

override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off()

/**
* Returns the `DurableStateUpdateStore` under test. By default, this uses the plugin
* configured under `pekko.persistence.state.plugin` in the provided config.
*/
def durableStateStore(): DurableStateUpdateStore[Any] =
DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateUpdateStore[Any]]("")

private val timeout = 3.seconds

"A durable state store" must {
"not find a non-existing object" in {
val result = Await.result(durableStateStore().getObject(pid), timeout)
result.value shouldBe None
}

"persist a state and retrieve it" in {
val value = s"state-${pid}"
Await.result(durableStateStore().upsertObject(pid, 1L, value, ""), timeout)
val result = Await.result(durableStateStore().getObject(pid), timeout)
result.value shouldBe Some(value)
result.revision shouldBe 1L
}

"update a state" in {
val store = durableStateStore()
val value1 = s"state-1-${pid}"
val value2 = s"state-2-${pid}"
Await.result(store.upsertObject(pid, 1L, value1, ""), timeout)
Await.result(store.upsertObject(pid, 2L, value2, ""), timeout)
val result = Await.result(store.getObject(pid), timeout)
result.value shouldBe Some(value2)
result.revision shouldBe 2L
}

"delete a state" in {
val store = durableStateStore()
val value = s"state-${pid}"
Await.result(store.upsertObject(pid, 1L, value, ""), timeout)
Await.result(store.deleteObject(pid, 2L), timeout)
val result = Await.result(store.getObject(pid), timeout)
result.value shouldBe None
}

"handle different persistence IDs independently" in {
val store = durableStateStore()
val pid2 = pid + "-2"
val value1 = s"state-${pid}"
val value2 = s"state-${pid2}"
Await.result(store.upsertObject(pid, 1L, value1, ""), timeout)
Await.result(store.upsertObject(pid2, 1L, value2, ""), timeout)

val result1 = Await.result(store.getObject(pid), timeout)
val result2 = Await.result(store.getObject(pid2), timeout)

result1.value shouldBe Some(value1)
result2.value shouldBe Some(value2)
}
}

"A durable state store optionally".may {
optional(flag = supportsDeleteWithRevisionCheck) {
"fail to delete a state when the revision does not match" in {
val store = durableStateStore()
val value = s"state-${pid}"
Await.result(store.upsertObject(pid, 1L, value, ""), timeout)
val deleteResult = store.deleteObject(pid, 99L)
intercept[Exception] {
Await.result(deleteResult, timeout)
}
// The original state should still be accessible
val result = Await.result(store.getObject(pid), timeout)
result.value shouldBe Some(value)
result.revision shouldBe 1L
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.testkit.state.scaladsl

import org.apache.pekko
import pekko.persistence.CapabilityFlag
import pekko.persistence.state.DurableStateStoreSpec
import pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin

import com.typesafe.config.ConfigFactory

object PersistenceTestKitDurableStateStoreTCKSpec {
val config = PersistenceTestKitDurableStateStorePlugin.config.withFallback(ConfigFactory.parseString("""
pekko.loglevel = DEBUG
"""))
}

class PersistenceTestKitDurableStateStoreTCKSpec
extends DurableStateStoreSpec(PersistenceTestKitDurableStateStoreTCKSpec.config) {
override protected def supportsDeleteWithRevisionCheck: CapabilityFlag = CapabilityFlag.off()
}
Loading