@@ -17,6 +17,7 @@ import scala.annotation.tailrec
1717import akka .contrib .pattern .ClusterSingletonManager
1818import akka .contrib .pattern .ClusterSingletonProxy
1919import com .typesafe .config .ConfigFactory
20+ import scala .io .StdIn
2021
2122object ActivePassive {
2223 import ReplicationProtocol ._
@@ -37,6 +38,8 @@ object ActivePassive {
3738 private val toReplicate = Queue .empty[Replicate ]
3839 private var replicating = TreeMap .empty[Int , (Replicate , Int )]
3940
41+ private var rejected = 0
42+
4043 val cluster = Cluster (context.system)
4144
4245 import context .dispatcher
@@ -45,9 +48,10 @@ object ActivePassive {
4548
4649 log.info(" taking over from local replica" )
4750 localReplica ! TakeOver (self)
48-
51+
4952 def receive = {
5053 case InitialState (m, s) =>
54+ log.info(" took over at sequence {}" , s)
5155 theStore = m
5256 seqNr = Iterator from s
5357 context.become(running)
@@ -56,11 +60,12 @@ object ActivePassive {
5660 }
5761
5862 val running : Receive = {
59- case Put (key, value, replyTo) =>
63+ case p @ Put (key, value, replyTo) =>
6064 if (toReplicate.size < MaxOutstanding ) {
6165 toReplicate.enqueue(Replicate (seqNr.next, key, value, replyTo))
6266 replicate()
6367 } else {
68+ rejected += 1
6469 replyTo ! PutRejected (key, value)
6570 }
6671 case Get (key, replyTo) =>
@@ -69,13 +74,17 @@ object ActivePassive {
6974 replicating.valuesIterator foreach {
7075 case (replicate, count) => disseminate(replicate)
7176 }
77+ if (rejected > 0 ) {
78+ log.info(" rejected {} PUT requests" , rejected)
79+ rejected = 0
80+ }
7281 case Replicated (confirm) =>
7382 replicating.get(confirm) match {
7483 case None => // already removed
7584 case Some ((rep, 1 )) =>
7685 replicating -= confirm
7786 theStore += rep.key -> rep.value
78- rep.replyTo ! PutConfirmed (rep.key)
87+ rep.replyTo ! PutConfirmed (rep.key, rep.value )
7988 case Some ((rep, n)) =>
8089 replicating += confirm -> (rep, n - 1 )
8190 }
@@ -106,7 +115,7 @@ object ActivePassive {
106115 private val applied = Queue .empty[Replicate ]
107116 private var waiting = TreeMap .empty[Int , Replicate ]
108117
109- val name = self.path.address .toString
118+ val name = Cluster (context.system).selfAddress .toString.replaceAll( " [:/] " , " _ " )
110119 val cluster = Cluster (context.system)
111120 val random = new Random
112121
@@ -116,26 +125,30 @@ object ActivePassive {
116125 expectedSeq = s + 1
117126 }
118127 log.info(" started at sequence {}" , expectedSeq)
119-
128+
120129 override def postStop (): Unit = {
121130 log.info(" stopped at sequence {}" , expectedSeq)
122131 }
123132
124133 def receive = {
125134 case TakeOver (active) =>
135+ log.info(" active replica starting at sequence {}" , expectedSeq)
126136 active ! InitialState (theStore, expectedSeq)
127137 case Replicate (s, _, _, replyTo) if s < expectedSeq =>
128138 replyTo ! Replicated (s)
129139 case r : Replicate =>
130140 waiting += r.seq -> r
131141 consolidate()
132142 case GetSingle (s, replyTo) =>
143+ log.info(" GetSingle from {}" , replyTo)
133144 if (applied.nonEmpty && applied.head.seq <= s && applied.last.seq >= s)
134145 replyTo ! applied.find(_.seq == s).get
135146 else if (s < expectedSeq) replyTo ! InitialState (theStore, expectedSeq)
136147 case GetFull (replyTo) =>
148+ log.info(" sending full info to {}" , replyTo)
137149 replyTo ! InitialState (theStore, expectedSeq)
138150 case InitialState (m, s) if s > expectedSeq =>
151+ log.info(" received newer state at sequence {} (was at {})" , s, expectedSeq)
139152 theStore = m
140153 expectedSeq = s + 1
141154 persist(name, s, m)
@@ -163,12 +176,12 @@ object ActivePassive {
163176 }
164177 waiting = waiting.drop(prefix)
165178 }
166-
179+
167180 // cap the size of the applied buffer
168181 applied.drop(Math .max(0 , applied.size - 10 ))
169182
170183 // check if we fell behind by too much
171- if (waiting.lastKey - expectedSeq > 10 ) {
184+ if (waiting.nonEmpty && (waiting. lastKey - expectedSeq > 10 ) ) {
172185 val outstanding = (expectedSeq to waiting.lastKey).iterator.filterNot(waiting.contains).toList
173186 if (outstanding.size <= 3 ) outstanding foreach askAround
174187 else askAroundFullState()
@@ -180,17 +193,23 @@ object ActivePassive {
180193 random.shuffle(cluster.state.members.iterator.map(_.address).toSeq).take(n)
181194 }
182195 private def askAround (seq : Int ): Unit = {
196+ log.info(" asking around for sequence number {}" , seq)
183197 getMembers(askAroundCount).foreach(addr => replicaOn(addr) ! GetSingle (seq, self))
184198 }
185199 private def askAroundFullState (): Unit = {
200+ log.info(" asking for full data" )
186201 getMembers(1 ).foreach(addr => replicaOn(addr) ! GetFull (self))
187202 }
188203 private def replicaOn (addr : Address ): ActorSelection =
189204 context.actorSelection(self.path.toStringWithAddress(addr))
190205 }
191-
206+
192207 val commonConfig = ConfigFactory .parseString("""
193208 akka.actor.provider = akka.cluster.ClusterActorRefProvider
209+ akka.remote.netty.tcp {
210+ host = "127.0.0.1"
211+ port = 0
212+ }
194213 akka.cluster {
195214 gossip-interval = 100ms
196215 failure-detector {
@@ -199,51 +218,126 @@ object ActivePassive {
199218 }
200219 }
201220 """ )
202- def roleConfig (name : String ) = ConfigFactory .parseString(s """ akka.cluster.roles = [" $name"] """ )
221+ def roleConfig (name : String , port : Option [Int ]) = {
222+ val roles = ConfigFactory .parseString(s """ akka.cluster.roles = [" $name"] """ )
223+ port match {
224+ case None => roles
225+ case Some (p) =>
226+ ConfigFactory .parseString(s """ akka.remote.netty.tcp.port = $p""" )
227+ .withFallback(roles)
228+ }
229+ }
203230
204- def start (n : Int ): ActorSystem = {
205- val system = ActorSystem (s " node $n " , roleConfig(" backend" ) withFallback commonConfig)
231+ def start (port : Option [ Int ] ): ActorSystem = {
232+ val system = ActorSystem (" ActivePassive " , roleConfig(" backend" , port ) withFallback commonConfig)
206233 val localReplica = system.actorOf(Props (new Passive (3 , 3 .seconds)), " passive" )
207234 val managerProps =
208- ClusterSingletonManager .props(Props (new Active (localReplica, 2 , 10 )), " active" , PoisonPill , Some (" backend" ))
235+ ClusterSingletonManager .props(Props (new Active (localReplica, 2 , 120 )), " active" , PoisonPill ,
236+ role = Some (" backend" ), retryInterval = 150 .millis)
209237 val manager = system.actorOf(managerProps, " activeManager" )
210238 system
211239 }
212240
213241 def main (args : Array [String ]): Unit = {
214- val systems = Array .tabulate (5 )(start(_ ))
242+ val systems = Array .fill (5 )(start(None ))
215243 val seedNode = Cluster (systems(0 )).selfAddress
216244 systems foreach (Cluster (_).join(seedNode))
217245
218- val sys = ActorSystem (" ActivePassive" , commonConfig)
246+ val sys = ActorSystem (" ActivePassive" , ConfigFactory .parseString( " akka.loglevel=INFO " ) withFallback commonConfig)
219247 Cluster (sys).join(seedNode)
248+
249+ awaitMembers(sys, systems.length + 1 )
250+
251+ val proxy = sys.actorOf(ClusterSingletonProxy .props(" /user/activeManager/active" , Some (" backend" )), " proxy" )
252+
253+ val useStorage = sys.actorOf(Props (new UseStorage (proxy)), " useStorage" )
254+ useStorage ! Run (0 )
255+
256+ sys.actorOf(Props (new Actor {
257+ def receive = {
258+ case Run =>
259+ StdIn .readLine()
260+ useStorage ! Stop
261+ }
262+ })) ! Run
263+
264+ Thread .sleep(10000 )
220265
221- while (Cluster (sys).state.members.size < 6 ) {
266+ val rnd = new Random
267+ while (! terminate) {
268+ Thread .sleep(5000 )
269+ val sysidx = rnd.nextInt(systems.length)
270+ val oldsys = systems(sysidx)
271+ val port = Cluster (oldsys).selfAddress.port
272+ oldsys.shutdown()
273+ oldsys.awaitTermination()
274+ val newsys = start(port)
275+ val seed = Cluster (if (sysidx == 0 ) systems(1 ) else systems(0 )).selfAddress
276+ Cluster (newsys).join(seed)
277+ systems(sysidx) = newsys
278+ awaitMembers(sys, systems.length + 1 )
279+ }
280+
281+ Thread .sleep(3000 )
282+
283+ sys.shutdown()
284+ systems foreach (_.shutdown())
285+ }
286+
287+ private def awaitMembers (sys : ActorSystem , count : Int ): Unit = {
288+ while (Cluster (sys).state.members.size < count) {
222289 Thread .sleep(500 )
223290 print('.' )
224291 Console .flush()
225292 }
226293 println(" cluster started" )
227-
228- val path = sys / " user" / " activeManager" / " active"
229- val proxy = sys.actorOf(ClusterSingletonProxy .props(path.toString, Some (" backend" )))
230-
231- sys.actorOf(Props (new UseStorage (proxy)), " useStorage" ) ! Run
232-
233- while (! terminate) {
234-
235- }
236294 }
237-
238- private case object Run
295+
296+ private case class Run ( round : Int )
239297 private case object Stop
240298 @ volatile private var terminate = false
241-
242- private class UseStorage (db : ActorRef ) extends Actor {
299+
300+ private class UseStorage (db : ActorRef ) extends Actor with ActorLogging {
301+ val N = 200
302+ var theStore = Map .empty[String , JsValue ]
303+ val keys = (1 to N ).map(i => f " $i%03d " )
304+ var outstanding = Set .empty[String ]
305+ val rnd = new Random
306+ var lastOutstandingCount = 0
307+
243308 def receive = {
244- case Run =>
245-
246- self ! Run
309+ case Run (0 ) =>
310+ db ! Get (" initial" , self)
311+ case GetResult (" initial" , _) =>
312+ self ! Run (1 )
313+ case Run (round) =>
314+ if (round % 100 == 0 ) log.info(" round {}" , round)
315+ val nowOutstanding = outstanding.size
316+ if (nowOutstanding != lastOutstandingCount) {
317+ lastOutstandingCount = nowOutstanding
318+ log.info(" {} outstanding" , nowOutstanding)
319+ }
320+ for (k <- keys) {
321+ db ! Get (k, self)
322+ if (! outstanding.contains(k) && rnd.nextBoolean()) {
323+ db ! Put (k, JsNumber (round), self)
324+ outstanding += k
325+ }
326+ }
327+ context.system.scheduler.scheduleOnce(100 .millis, self, Run (round + 1 ))(context.dispatcher)
328+ case GetResult (key, value) =>
329+ if (outstanding.contains(key)) {
330+ outstanding -= key
331+ value foreach (theStore += key -> _)
332+ } else if (value != theStore.get(key)) {
333+ log.warning(" returned wrong value for key {}: {} (expected {})" , key, value, theStore.get(key))
334+ context.stop(self)
335+ }
336+ case PutConfirmed (key, value) =>
337+ outstanding -= key
338+ theStore += key -> value
339+ case PutRejected (key, value) =>
340+ outstanding -= key
247341 case Stop => context.stop(self)
248342 }
249343 override def postStop (): Unit = terminate = true
0 commit comments