@@ -7,6 +7,7 @@ import akka.actor._
77import play .api .libs .json .JsValue
88import scala .collection .immutable .TreeMap
99import scala .concurrent .duration ._
10+ import scala .annotation .tailrec
1011
1112object ActiveActive {
1213 import ReplicationProtocol ._
@@ -50,7 +51,6 @@ object ActiveActive {
5051 def deadline : Deadline
5152 def missing : Set [ActorRef ]
5253 def add (res : SeqResult ): ReplyState
53- def isKnown : Boolean
5454 def isFinished : Boolean = missing.isEmpty
5555 }
5656 private case class Unknown (deadline : Deadline , replies : Set [SeqResult ], missing : Set [ActorRef ]) extends ReplyState {
@@ -66,88 +66,94 @@ object ActiveActive {
6666 } else Unknown (deadline, nextReplies, missing - res.replica)
6767 } else Unknown (deadline, nextReplies, missing - res.replica)
6868 }
69- override def isKnown = false
7069 }
7170 private case class Known (deadline : Deadline , reply : SeqResult , wrong : Set [ActorRef ], missing : Set [ActorRef ]) extends ReplyState {
7271 override def add (res : SeqResult ): ReplyState = {
7372 val nextWrong = if (res.res == reply.res) wrong else wrong + res.replica
7473 Known (deadline, reply, nextWrong, missing - res.replica)
7574 }
76- override def isKnown = true
7775 }
7876
7977 class Coordinator (N : Int ) extends Actor {
80- private var replicas = (1 to N ).map(_ => context.actorOf( Replica .props )).toSet
78+ private var replicas = (1 to N ).map(_ => newReplica( )).toSet
8179 private val seqNr = Iterator from 0
8280 private var replies = TreeMap .empty[Int , ReplyState ]
8381 private var nextReply = 0
8482
83+ override def supervisorStrategy = SupervisorStrategy .stoppingStrategy
84+
85+ private def newReplica (): ActorRef =
86+ context.watch(context.actorOf(Replica .props))
87+
88+ // schedule timeout messages for quiescent periods
8589 context.setReceiveTimeout(1 .second)
8690
87- def receive = {
91+ def receive = ( {
8892 case cmd : Command =>
8993 val c = SeqCommand (seqNr.next, cmd, self)
9094 replicas foreach (_ ! c)
9195 replies += c.seq -> Unknown (5 seconds fromNow, Set .empty, replicas)
92- doTimeouts()
9396 case res : SeqResult if replies.contains(res.seq) && replicas.contains(res.replica) =>
9497 val prevState = replies(res.seq)
9598 val nextState = prevState.add(res)
96- // potentially send reply if quorum of replies has been received now
97- nextState match {
98- case Known (seq, reply, _, _) if ! prevState.isKnown && seq == nextReply =>
99- reply.replyTo ! reply.res
100- nextReply += 1
101- case _ =>
102- }
103- // clean up state
104- if (nextState.isFinished) {
105- dispose(nextState)
106- replies -= res.seq
107- } else {
108- replies += res.seq -> nextState
109- doTimeouts()
110- }
111- case ReceiveTimeout => doTimeouts()
99+ replies += res.seq -> nextState
100+ case Terminated (ref) =>
101+ replaceReplica(ref, terminate = false )
102+ case ReceiveTimeout =>
103+ }: Receive ) andThen { _ =>
104+ doTimeouts()
105+ sendReplies()
106+ evictFinished()
112107 }
113108
114109 private def doTimeouts (): Unit = {
115110 val now = Deadline .now
116111 val expired = replies.iterator.takeWhile(_._2.deadline <= now)
117- expired.map(_._2).foreach(dispose)
112+ for ((seq, state) <- expired) {
113+ state match {
114+ case Unknown (deadline, received, missing) =>
115+ // did not reach consensus on this one, pick simple majority
116+ val counts = received.groupBy(_.res)
117+ val biggest = counts.iterator.map(_._2.size).max
118+ val winners = counts.collectFirst {
119+ case (res, win) if win.size == biggest => win
120+ }.get
121+ val losers = (received -- winners).map(_.replica)
122+ // don’t wait for further responses
123+ replies += seq -> Known (deadline, winners.head, losers, Set .empty)
124+ case Known (deadline, reply, wrong, missing) =>
125+ // don’t wait for further responses
126+ replies += seq -> Known (deadline, reply, wrong, Set .empty)
127+ }
128+ }
118129 }
119130
120- /**
121- * The given reply state has been removed from the replies map and is now
122- * being disposed of. This means that we need to act upon wrong replies
123- * from replicas.
124- *
125- * If there are replicas for which no reply has been recorded yet, we
126- * ignore them. If they reply incorrectly later they will be replaced then.
127- * GC pauses are tolerated: do not kick out replicas just for being slow.
128- */
129- private def dispose (state : ReplyState ): Unit =
130- state match {
131- case Unknown (_, replies, _) =>
132- // did not reach consensus on this one, pick simple majority
133- val counts = replies.toList.groupBy(_.res)
134- val biggest = counts.iterator.map(_._2.size).max
135- val winners = counts.collectFirst {
136- case (res, win) if win.size == biggest => win
137- }.get
138- val losers = replicas -- winners.iterator.map(_.replica).toSet
139- losers foreach replaceReplica
140- case Known (_, _, wrong, _) =>
141- wrong foreach replaceReplica
131+ @ tailrec private def sendReplies (): Unit =
132+ replies.get(nextReply) match {
133+ case Some (k @ Known (_, reply, _, _)) =>
134+ reply.replyTo ! reply.res
135+ nextReply += 1
136+ sendReplies()
137+ case _ =>
142138 }
143139
144- private def replaceReplica (r : ActorRef ): Unit = {
145- replicas -= r
146- r ! PoisonPill
147- val newReplica = context.actorOf(Replica .props)
148- replicas.head ! SendInitialData (newReplica)
149- replicas += newReplica
150- }
140+ @ tailrec private def evictFinished (): Unit =
141+ replies.headOption match {
142+ case Some ((seq, k @ Known (_, _, wrong, _))) if k.isFinished =>
143+ wrong foreach (replaceReplica(_, terminate = true ))
144+ replies -= seq
145+ evictFinished()
146+ case _ =>
147+ }
148+
149+ private def replaceReplica (r : ActorRef , terminate : Boolean ): Unit =
150+ if (replicas contains r) {
151+ replicas -= r
152+ if (terminate) r ! PoisonPill
153+ val replica = newReplica()
154+ replicas.head ! SendInitialData (replica)
155+ replicas += replica
156+ }
151157 }
152158
153159}
0 commit comments