@@ -32,6 +32,7 @@ object ActiveActive {
3232
3333 def initialized : Receive = {
3434 case SeqCommand (seq, cmd, replyTo) =>
35+ // tracking of sequence numbers and resends is elided here
3536 cmd match {
3637 case Put (key, value, r) =>
3738 map += key -> value
@@ -53,18 +54,19 @@ object ActiveActive {
5354 def add (res : SeqResult ): ReplyState
5455 def isFinished : Boolean = missing.isEmpty
5556 }
56- private case class Unknown (deadline : Deadline , replies : Set [SeqResult ], missing : Set [ActorRef ]) extends ReplyState {
57+ private case class Unknown (deadline : Deadline , replies : Set [SeqResult ], missing : Set [ActorRef ], quorum : Int ) extends ReplyState {
5758 override def add (res : SeqResult ): ReplyState = {
58- val quorum = (missing.size + 1 ) / 2
5959 val nextReplies = replies + res
60+ val nextMissing = missing - res.replica
6061 if (nextReplies.size >= quorum) {
6162 val answer = replies.toSeq.groupBy(_.res).collectFirst { case (k, s) if s.size >= quorum => s.head }
6263 if (answer.isDefined) {
6364 val right = answer.get
6465 val wrong = replies.collect { case SeqResult (_, res, replica, _) if res != right => replica }
65- Known (deadline, right, wrong, missing - res.replica)
66- } else Unknown (deadline, nextReplies, missing - res.replica)
67- } else Unknown (deadline, nextReplies, missing - res.replica)
66+ Known (deadline, right, wrong, nextMissing)
67+ } else if (nextMissing.isEmpty) Known .fromUnknown(deadline, nextReplies)
68+ else Unknown (deadline, nextReplies, nextMissing, quorum)
69+ } else Unknown (deadline, nextReplies, nextMissing, quorum)
6870 }
6971 }
7072 private case class Known (deadline : Deadline , reply : SeqResult , wrong : Set [ActorRef ], missing : Set [ActorRef ]) extends ReplyState {
@@ -73,6 +75,18 @@ object ActiveActive {
7375 Known (deadline, reply, nextWrong, missing - res.replica)
7476 }
7577 }
78+ private object Known {
79+ def fromUnknown (deadline : Deadline , replies : Set [SeqResult ]): Known = {
80+ // did not reach consensus on this one, pick simple majority
81+ val counts = replies.groupBy(_.res)
82+ val biggest = counts.iterator.map(_._2.size).max
83+ val winners = counts.collectFirst {
84+ case (res, win) if win.size == biggest => win
85+ }.get
86+ val losers = (replies -- winners).map(_.replica)
87+ Known (deadline, winners.head, losers, Set .empty)
88+ }
89+ }
7690
7791 class Coordinator (N : Int ) extends Actor {
7892 private var replicas = (1 to N ).map(_ => newReplica()).toSet
@@ -81,7 +95,7 @@ object ActiveActive {
8195 private var nextReply = 0
8296
8397 override def supervisorStrategy = SupervisorStrategy .stoppingStrategy
84-
98+
8599 private def newReplica (): ActorRef =
86100 context.watch(context.actorOf(Replica .props))
87101
@@ -92,7 +106,7 @@ object ActiveActive {
92106 case cmd : Command =>
93107 val c = SeqCommand (seqNr.next, cmd, self)
94108 replicas foreach (_ ! c)
95- replies += c.seq -> Unknown (5 seconds fromNow, Set .empty, replicas)
109+ replies += c.seq -> Unknown (5 seconds fromNow, Set .empty, replicas, (replicas.size + 1 ) / 2 )
96110 case res : SeqResult if replies.contains(res.seq) && replicas.contains(res.replica) =>
97111 val prevState = replies(res.seq)
98112 val nextState = prevState.add(res)
@@ -111,18 +125,10 @@ object ActiveActive {
111125 val expired = replies.iterator.takeWhile(_._2.deadline <= now)
112126 for ((seq, state) <- expired) {
113127 state match {
114- case Unknown (deadline, received, missing) =>
115- // did not reach consensus on this one, pick simple majority
116- val counts = received.groupBy(_.res)
117- val biggest = counts.iterator.map(_._2.size).max
118- val winners = counts.collectFirst {
119- case (res, win) if win.size == biggest => win
120- }.get
121- val losers = (received -- winners).map(_.replica)
122- // don’t wait for further responses
123- replies += seq -> Known (deadline, winners.head, losers, Set .empty)
128+ case Unknown (deadline, received, _, _) =>
129+ val forced = Known .fromUnknown(deadline, received)
130+ replies += seq -> forced
124131 case Known (deadline, reply, wrong, missing) =>
125- // don’t wait for further responses
126132 replies += seq -> Known (deadline, reply, wrong, Set .empty)
127133 }
128134 }
0 commit comments