11package com .reactivedesignpatterns .chapter12
22
3- import akka .cluster .ddata .ReplicatedData
3+ import akka .actor ._
4+ import akka .cluster .ddata ._
5+ import scala .concurrent .duration ._
6+ import akka .cluster .Cluster
7+ import com .typesafe .config .ConfigFactory
48
59object MultiMasterCRDT {
10+
11+ private var statusMap = Map .empty[String , Status ]
612
7- sealed trait Status extends ReplicatedData {
13+ final case class Status ( val name : String )( _pred : => Set [ Status ], _succ : => Set [ Status ]) extends ReplicatedData {
814 type T = Status
915 def merge (that : Status ): Status = mergeStatus(this , that)
16+
17+ lazy val predecessors = _pred
18+ lazy val successors = _succ
1019
11- def predecessors : Set [ Status ]
12- def successors : Set [ Status ]
20+ if ( ! statusMap.contains(name)) statusMap += name -> this
21+ private def readResolve : AnyRef = statusMap(name)
1322 }
1423
15- case object New extends Status {
16- lazy val predecessors : Set [Status ] = Set .empty
17- lazy val successors : Set [Status ] = Set (Scheduled )
18- }
19- case object Scheduled extends Status {
20- lazy val predecessors : Set [Status ] = Set (New )
21- lazy val successors : Set [Status ] = Set (Executing , Cancelled )
22- }
23- case object Executing extends Status {
24- lazy val predecessors : Set [Status ] = Set (Scheduled )
25- lazy val successors : Set [Status ] = Set (Aborted )
26- }
27- case object Finished extends Status {
28- lazy val predecessors : Set [Status ] = Set (Executing , Aborted )
29- lazy val successors : Set [Status ] = Set .empty
30- }
31- case object Cancelled extends Status {
32- lazy val predecessors : Set [Status ] = Set (New , Scheduled )
33- lazy val successors : Set [Status ] = Set (Aborted )
34- }
35- case object Aborted extends Status {
36- lazy val predecessors : Set [Status ] = Set (Cancelled , Executing )
37- lazy val successors : Set [Status ] = Set (Finished )
38- }
24+ val New : Status = Status (" new" )(Set .empty, Set (Scheduled , Cancelled ))
25+ val Scheduled : Status = Status (" scheduled" )(Set (New ), Set (Executing , Cancelled ))
26+ val Executing : Status = Status (" executing" )(Set (Scheduled ), Set (Aborted , Finished ))
27+ val Finished : Status = Status (" finished" )(Set (Executing , Aborted ), Set .empty)
28+ val Cancelled : Status = Status (" cancelled" )(Set (New , Scheduled ), Set (Aborted ))
29+ val Aborted : Status = Status (" aborted" )(Set (Cancelled , Executing ), Set (Finished ))
3930
4031 def mergeStatus (left : Status , right : Status ): Status = {
4132 /*
@@ -61,5 +52,131 @@ object MultiMasterCRDT {
6152
6253 innerLoop(right, Set .empty)
6354 }
55+
56+ object StorageComponent extends Key [ORMap [Status ]](" StorageComponent" )
57+
58+ case class Submit (job : String )
59+ case class Cancel (job : String )
60+ case class Execute (job : String )
61+ case class Finish (job : String )
62+ case object PrintStatus
63+
64+ class ClientInterface extends Actor with ActorLogging {
65+ val replicator = DistributedData (context.system).replicator
66+ implicit val cluster = Cluster (context.system)
67+
68+ def receive = {
69+ case Submit (job) =>
70+ log.info(" submitting job {}" , job)
71+ replicator ! Replicator .Update (StorageComponent , ORMap .empty[Status ], Replicator .WriteMajority (5 .seconds), Some (s " submit $job" ))(_ + (job -> New ))
72+ case Cancel (job) =>
73+ log.info(" cancelling job {}" , job)
74+ replicator ! Replicator .Update (StorageComponent , ORMap .empty[Status ], Replicator .WriteMajority (5 .seconds), Some (s " cancel $job" ))(_ + (job -> Cancelled ))
75+ case r : Replicator .UpdateResponse [_] =>
76+ log.info(" received update result: {}" , r)
77+ case PrintStatus =>
78+ replicator ! Replicator .Get (StorageComponent , Replicator .ReadMajority (5 .seconds))
79+ case g : Replicator .GetSuccess [_] =>
80+ log.info(" overall status: {}" , g.get(StorageComponent ))
81+ }
82+ }
83+
84+ class Executor extends Actor with ActorLogging {
85+ val replicator = DistributedData (context.system).replicator
86+ implicit val cluster = Cluster (context.system)
87+
88+ var lastState = Map .empty[String , Status ]
89+
90+ replicator ! Replicator .Subscribe (StorageComponent , self)
91+
92+ def receive = {
93+ case Execute (job) =>
94+ log.info(" executing job {}" , job)
95+ replicator ! Replicator .Update (StorageComponent , ORMap .empty[Status ], Replicator .WriteMajority (5 .seconds), Some (s " submit $job" )) { map =>
96+ if (map.get(job) == Some (New )) map + (job -> Executing )
97+ else map
98+ }
99+ case Finish (job) =>
100+ log.info(" job {} finished" , job)
101+ replicator ! Replicator .Update (StorageComponent , ORMap .empty[Status ], Replicator .WriteMajority (5 .seconds), Some (s " cancel $job" ))(_ + (job -> Finished ))
102+ case r : Replicator .UpdateResponse [_] =>
103+ log.info(" received update result: {}" , r)
104+ case ch : Replicator .Changed [_] =>
105+ val current = ch.get(StorageComponent ).entries
106+ for {
107+ (job, status) <- current.iterator
108+ if (status == Aborted )
109+ if (lastState.get(job) != Some (Aborted ))
110+ } log.info(" aborting job {}" , job)
111+ lastState = current
112+ }
113+ }
114+
115+ val commonConfig = ConfigFactory .parseString("""
116+ akka.actor.provider = akka.cluster.ClusterActorRefProvider
117+ akka.remote.netty.tcp {
118+ host = "127.0.0.1"
119+ port = 0
120+ }
121+ akka.cluster {
122+ gossip-interval = 100ms
123+ failure-detector {
124+ heartbeat-interval = 100ms
125+ acceptable-heartbeat-pause = 500ms
126+ }
127+ distributed-data.gossip-interval = 100ms
128+ }
129+ """ )
64130
65- }
131+ object sleep
132+ implicit object waitConvert extends DurationConversions .Classifier [sleep.type ] {
133+ type R = Unit
134+ def convert (d : FiniteDuration ): Unit = Thread .sleep(d.toMillis)
135+ }
136+
137+ def main (args : Array [String ]): Unit = {
138+ val sys1 = ActorSystem (" MultiMasterCRDT" , commonConfig)
139+ val addr1 = Cluster (sys1).selfAddress
140+ Cluster (sys1).join(addr1)
141+
142+ val sys2 = ActorSystem (" MultiMasterCRDT" , commonConfig)
143+ Cluster (sys2).join(addr1)
144+
145+ awaitMembers(sys1, 2 )
146+
147+ val clientInterface = sys1.actorOf(Props (new ClientInterface ), " clientInterface" )
148+ val executor = sys2.actorOf(Props (new Executor ), " executor" )
149+
150+ clientInterface ! Submit (" alpha" )
151+ clientInterface ! Submit (" beta" )
152+ clientInterface ! Submit (" gamma" )
153+ clientInterface ! Submit (" delta" )
154+ 1 second sleep
155+ executor ! Execute (" alpha" )
156+ executor ! Execute (" gamma" )
157+ clientInterface ! Cancel (" delta" )
158+ 1 second sleep
159+ clientInterface ! Cancel (" alpha" )
160+ clientInterface ! Cancel (" beta" )
161+ executor ! Execute (" beta" )
162+ 1 second sleep
163+ clientInterface ! Cancel (" gamma" )
164+ 1 second sleep
165+ executor ! Finish (" gamma" )
166+ 3 seconds sleep
167+ clientInterface ! PrintStatus
168+ 1 second sleep
169+
170+ sys1.terminate()
171+ sys2.terminate()
172+ }
173+
174+ private def awaitMembers (sys : ActorSystem , count : Int ): Unit = {
175+ while (Cluster (sys).state.members.size < count) {
176+ Thread .sleep(500 )
177+ print('.' )
178+ Console .flush()
179+ }
180+ println(" cluster started" )
181+ }
182+ }
0 commit comments