Skip to content

Commit 871a69d

Browse files
committed
multi-master replication
- update to Akka 2.4-M2 for DistributedData - add Status CRDT and tests
1 parent f824254 commit 871a69d

File tree

6 files changed

+151
-37
lines changed

6 files changed

+151
-37
lines changed

chapter12/build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
import Build._
22

3-
libraryDependencies ++= Seq(akkaContrib, playJson, sbtIO, akkaTestkit, junit, scalatest)
3+
libraryDependencies ++= Seq(akkaContrib, akkaDData, playJson, sbtIO, akkaTestkit, junit, scalatest)

chapter12/src/main/scala/com/reactivedesignpatterns/chapter12/ActivePassive.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ import scala.collection.immutable.TreeMap
1414
import scala.concurrent.forkjoin.ThreadLocalRandom
1515
import scala.util.Random
1616
import scala.annotation.tailrec
17-
import akka.contrib.pattern.ClusterSingletonManager
18-
import akka.contrib.pattern.ClusterSingletonProxy
17+
import akka.cluster.singleton.ClusterSingletonManager
18+
import akka.cluster.singleton.ClusterSingletonProxy
19+
import akka.cluster.singleton.ClusterSingletonManagerSettings
20+
import akka.cluster.singleton.ClusterSingletonProxySettings
1921
import com.typesafe.config.ConfigFactory
2022
import scala.io.StdIn
2123

@@ -33,8 +35,8 @@ object ActivePassive {
3335
class Active(localReplica: ActorRef, replicationFactor: Int, maxQueueSize: Int) extends Actor with Stash with ActorLogging {
3436
private val MaxOutstanding = maxQueueSize / 2
3537

36-
private var theStore = Map.empty[String, JsValue]
37-
private var seqNr = Iterator from 0
38+
private var theStore: Map[String, JsValue] = _
39+
private var seqNr: Iterator[Int] = _
3840
private val toReplicate = Queue.empty[Replicate]
3941
private var replicating = TreeMap.empty[Int, (Replicate, Int)]
4042

@@ -146,9 +148,11 @@ object ActivePassive {
146148
case Replicate(s, _, _, replyTo) if s - expectedSeq < 0 =>
147149
replyTo ! Replicated(s)
148150
case r: Replicate if r.seq == expectedSeq =>
151+
val nextStore = theStore + (r.key -> r.value)
152+
persist(name, expectedSeq, nextStore)
149153
r.replyTo ! Replicated(r.seq)
150154
applied.enqueue(r)
151-
context.become(upToDate(theStore + (r.key -> r.value), r.seq + 1))
155+
context.become(upToDate(nextStore, expectedSeq + 1))
152156
case r: Replicate =>
153157
if (r.seq - expectedSeq > maxLag)
154158
fallBehind(expectedSeq, TreeMap(r.seq -> r))
@@ -283,9 +287,12 @@ object ActivePassive {
283287
def start(port: Option[Int]): ActorSystem = {
284288
val system = ActorSystem("ActivePassive", roleConfig("backend", port) withFallback commonConfig)
285289
val localReplica = system.actorOf(Props(new Passive(3, 3.seconds, 100)), "passive")
290+
val settings = ClusterSingletonManagerSettings(system)
291+
.withSingletonName("active")
292+
.withRole("backend")
293+
.withHandOverRetryInterval(150.millis)
286294
val managerProps =
287-
ClusterSingletonManager.props(Props(new Active(localReplica, 2, 120)), "active", PoisonPill,
288-
role = Some("backend"), retryInterval = 150.millis)
295+
ClusterSingletonManager.props(Props(new Active(localReplica, 2, 120)), PoisonPill, settings)
289296
val manager = system.actorOf(managerProps, "activeManager")
290297
system
291298
}
@@ -300,7 +307,10 @@ object ActivePassive {
300307

301308
awaitMembers(sys, systems.length + 1)
302309

303-
val proxy = sys.actorOf(ClusterSingletonProxy.props("/user/activeManager/active", Some("backend")), "proxy")
310+
val proxySettings = ClusterSingletonProxySettings(sys)
311+
.withRole("backend")
312+
.withSingletonName("active")
313+
val proxy = sys.actorOf(ClusterSingletonProxy.props("/user/activeManager", proxySettings), "proxy")
304314

305315
val useStorage = sys.actorOf(Props(new UseStorage(proxy)), "useStorage")
306316
useStorage ! Run(0)
Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,65 @@
11
package com.reactivedesignpatterns.chapter12
22

3-
class MultiMasterCRDT {
3+
import akka.cluster.ddata.ReplicatedData
44

5-
import akka.actor._
6-
7-
case class Greet(whom: String)
8-
9-
class Greeter extends Actor {
10-
def receive = {
11-
case Greet(whom) =>
12-
sender() ! s"Hello $whom!"
13-
val delegate = context.actorOf(grumpyProps)
14-
context.become(grumpy(delegate))
15-
}
16-
def grumpy(delegate: ActorRef): Receive = {
17-
case g: Greet => delegate forward g
18-
}
5+
object MultiMasterCRDT {
6+
7+
sealed trait Status extends ReplicatedData {
8+
type T = Status
9+
def merge(that: Status): Status = mergeStatus(this, that)
10+
11+
def predecessors: Set[Status]
12+
def successors: Set[Status]
13+
}
14+
15+
case object New extends Status {
16+
lazy val predecessors: Set[Status] = Set.empty
17+
lazy val successors: Set[Status] = Set(Scheduled)
18+
}
19+
case object Scheduled extends Status {
20+
lazy val predecessors: Set[Status] = Set(New)
21+
lazy val successors: Set[Status] = Set(Executing, Cancelled)
22+
}
23+
case object Executing extends Status {
24+
lazy val predecessors: Set[Status] = Set(Scheduled)
25+
lazy val successors: Set[Status] = Set(Aborted)
26+
}
27+
case object Finished extends Status {
28+
lazy val predecessors: Set[Status] = Set(Executing, Aborted)
29+
lazy val successors: Set[Status] = Set.empty
30+
}
31+
case object Cancelled extends Status {
32+
lazy val predecessors: Set[Status] = Set(New, Scheduled)
33+
lazy val successors: Set[Status] = Set(Aborted)
34+
}
35+
case object Aborted extends Status {
36+
lazy val predecessors: Set[Status] = Set(Cancelled, Executing)
37+
lazy val successors: Set[Status] = Set(Finished)
38+
}
39+
40+
def mergeStatus(left: Status, right: Status): Status = {
41+
/*
42+
* Keep the left Status in hand and determine whether it is a predecessor of
43+
* the candidate, moving on to the candidate’s successor if not successful.
44+
* The list of exclusions is used to avoid performing already determined
45+
* unsuccessful comparisons again.
46+
*/
47+
def innerLoop(candidate: Status, exclude: Set[Status]): Status =
48+
if (isSuccessor(candidate, left, exclude)) {
49+
candidate
50+
} else {
51+
val nextExclude = exclude + candidate
52+
val branches = candidate.successors.map(succ => innerLoop(succ, nextExclude))
53+
branches.reduce((l, r) => if (isSuccessor(l, r, nextExclude)) r else l)
54+
}
55+
def isSuccessor(candidate: Status, fixed: Status, exclude: Set[Status]): Boolean =
56+
if (candidate == fixed) true
57+
else {
58+
val toSearch = candidate.predecessors -- exclude
59+
toSearch.exists(pred => isSuccessor(pred, fixed, exclude))
60+
}
61+
62+
innerLoop(right, Set.empty)
1963
}
20-
21-
val grumpyProps = Props(new Actor {
22-
def receive = {
23-
case Greet(whom) =>
24-
sender() ! s"Go away, $whom!"
25-
}
26-
})
2764

2865
}

chapter12/src/main/scala/com/reactivedesignpatterns/chapter12/MultiMasterRAFT.scala

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.reactivedesignpatterns.chapter12
2+
3+
import org.scalatest.WordSpec
4+
import org.scalatest.Matchers
5+
import org.scalactic.ConversionCheckedTripleEquals
6+
7+
class MultiMasterCRDTSpec extends WordSpec with Matchers with ConversionCheckedTripleEquals {
8+
import MultiMasterCRDT._
9+
10+
"A CRDT" must {
11+
val allStatus = List(New, Scheduled, Executing, Finished, Cancelled, Aborted)
12+
13+
"have a correct mergeStatus function" in {
14+
mergeStatus(New, New) should ===(New)
15+
mergeStatus(New, Scheduled) should ===(Scheduled)
16+
mergeStatus(New, Executing) should ===(Executing)
17+
mergeStatus(New, Finished) should ===(Finished)
18+
mergeStatus(New, Cancelled) should ===(Cancelled)
19+
mergeStatus(New, Aborted) should ===(Aborted)
20+
mergeStatus(Scheduled, New) should ===(Scheduled)
21+
mergeStatus(Scheduled, Scheduled) should ===(Scheduled)
22+
mergeStatus(Scheduled, Executing) should ===(Executing)
23+
mergeStatus(Scheduled, Finished) should ===(Finished)
24+
mergeStatus(Scheduled, Cancelled) should ===(Cancelled)
25+
mergeStatus(Scheduled, Aborted) should ===(Aborted)
26+
mergeStatus(Executing, New) should ===(Executing)
27+
mergeStatus(Executing, Scheduled) should ===(Executing)
28+
mergeStatus(Executing, Executing) should ===(Executing)
29+
mergeStatus(Executing, Finished) should ===(Finished)
30+
mergeStatus(Executing, Cancelled) should ===(Aborted)
31+
mergeStatus(Executing, Aborted) should ===(Aborted)
32+
mergeStatus(Finished, New) should ===(Finished)
33+
mergeStatus(Finished, Scheduled) should ===(Finished)
34+
mergeStatus(Finished, Executing) should ===(Finished)
35+
mergeStatus(Finished, Finished) should ===(Finished)
36+
mergeStatus(Finished, Cancelled) should ===(Finished)
37+
mergeStatus(Finished, Aborted) should ===(Finished)
38+
mergeStatus(Cancelled, New) should ===(Cancelled)
39+
mergeStatus(Cancelled, Scheduled) should ===(Cancelled)
40+
mergeStatus(Cancelled, Executing) should ===(Aborted)
41+
mergeStatus(Cancelled, Finished) should ===(Finished)
42+
mergeStatus(Cancelled, Cancelled) should ===(Cancelled)
43+
mergeStatus(Cancelled, Aborted) should ===(Aborted)
44+
mergeStatus(Aborted, New) should ===(Aborted)
45+
mergeStatus(Aborted, Scheduled) should ===(Aborted)
46+
mergeStatus(Aborted, Executing) should ===(Aborted)
47+
mergeStatus(Aborted, Finished) should ===(Finished)
48+
mergeStatus(Aborted, Cancelled) should ===(Aborted)
49+
mergeStatus(Aborted, Aborted) should ===(Aborted)
50+
}
51+
52+
"have a symmetrical mergeStatus function" in {
53+
for {
54+
left <- allStatus
55+
right <- allStatus
56+
} withClue(s"mergeStatus($left, $right): ") {
57+
mergeStatus(left, right) should ===(mergeStatus(right, left))
58+
}
59+
}
60+
61+
"merge Finished always to Finished" in {
62+
for {
63+
other <- allStatus
64+
} withClue(s"mergeStatus(Finished, $other): ") {
65+
mergeStatus(Finished, other) should ===(Finished)
66+
}
67+
}
68+
69+
}
70+
71+
}

project/Build.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import sbt._
22

33
object Build {
4-
val akkaVersion = "2.3.11"
4+
val akkaVersion = "2.4-M2"
55

66
val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
77
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test"
88
val akkaContrib = "com.typesafe.akka" %% "akka-contrib" % akkaVersion
9+
val akkaDData = "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
910

1011
val sbtIO = "org.scala-sbt" %% "io" % "0.13.8"
1112

0 commit comments

Comments
 (0)