@@ -109,36 +109,51 @@ object ActivePassive {
109109 private case class GetFull (replyTo : ActorRef )
110110 private case object DoConsolidate
111111
112- class Passive (askAroundCount : Int , askAroundInterval : FiniteDuration ) extends Actor with ActorLogging {
113- private var theStore : Map [String , JsValue ] = _
114- private var expectedSeq : Int = 0
112+ class Passive (askAroundCount : Int , askAroundInterval : FiniteDuration , maxLag : Int ) extends Actor with ActorLogging {
115113 private val applied = Queue .empty[Replicate ]
116- private var waiting = TreeMap .empty[Int , Replicate ]
114+ private var awaitingInitialState = Option .empty[ActorRef ]
117115
118116 val name = Cluster (context.system).selfAddress.toString.replaceAll(" [:/]" , " _" )
119117 val cluster = Cluster (context.system)
120118 val random = new Random
121119
122- readPersisted(name) match {
120+ private var tickTask = Option .empty[Cancellable ]
121+ def scheduleTick () = {
122+ tickTask foreach (_.cancel())
123+ tickTask = Some (context.system.scheduler.scheduleOnce(askAroundInterval, self, DoConsolidate )(context.dispatcher))
124+ }
125+
126+ def receive = readPersisted(name) match {
123127 case Database (s, kv) =>
124- theStore = kv
125- expectedSeq = s + 1
128+ log.info( " started at sequence {} " , s)
129+ upToDate(kv, s + 1 )
126130 }
127- log.info(" started at sequence {}" , expectedSeq)
128131
129132 override def postStop (): Unit = {
130- log.info(" stopped at sequence {} " , expectedSeq )
133+ log.info(" stopped" )
131134 }
132135
133- def receive = {
136+ def caughtUp (theStore : Map [String , JsValue ], expectedSeq : Int ): Unit = {
137+ awaitingInitialState foreach (_ ! InitialState (theStore, expectedSeq))
138+ awaitingInitialState = None
139+ context.become(upToDate(theStore, expectedSeq))
140+ }
141+
142+ def upToDate (theStore : Map [String , JsValue ], expectedSeq : Int ): Receive = {
134143 case TakeOver (active) =>
135144 log.info(" active replica starting at sequence {}" , expectedSeq)
136145 active ! InitialState (theStore, expectedSeq)
137- case Replicate (s, _, _, replyTo) if s < expectedSeq =>
146+ case Replicate (s, _, _, replyTo) if s - expectedSeq < 0 =>
138147 replyTo ! Replicated (s)
148+ case r : Replicate if r.seq == expectedSeq =>
149+ r.replyTo ! Replicated (r.seq)
150+ applied.enqueue(r)
151+ context.become(upToDate(theStore + (r.key -> r.value), r.seq + 1 ))
139152 case r : Replicate =>
140- waiting += r.seq -> r
141- consolidate()
153+ if (r.seq - expectedSeq > maxLag)
154+ fallBehind(expectedSeq, TreeMap (r.seq -> r))
155+ else
156+ missingSomeUpdates(theStore, expectedSeq, Set .empty, TreeMap (r.seq -> r))
142157 case GetSingle (s, replyTo) =>
143158 log.info(" GetSingle from {}" , replyTo)
144159 if (applied.nonEmpty && applied.head.seq <= s && applied.last.seq >= s)
@@ -147,46 +162,83 @@ object ActivePassive {
147162 case GetFull (replyTo) =>
148163 log.info(" sending full info to {}" , replyTo)
149164 replyTo ! InitialState (theStore, expectedSeq)
150- case InitialState (m, s) if s > expectedSeq =>
151- log.info(" received newer state at sequence {} (was at {})" , s, expectedSeq)
152- theStore = m
153- expectedSeq = s + 1
154- persist(name, s, m)
155- waiting.to(s).valuesIterator foreach (r => r.replyTo ! Replicated (r.seq))
156- waiting = waiting.from(expectedSeq)
157- consolidate()
158- case DoConsolidate =>
159- // this is scheduled when asking around for state so that eventually we
160- // will get all updates
161- consolidate()
162165 }
163166
164- private def consolidate (): Unit = {
167+ def missingSomeUpdates (theStore : Map [String , JsValue ], expectedSeq : Int , prevOutstanding : Set [Int ], waiting : TreeMap [Int , Replicate ]): Unit = {
168+ val askFor = (expectedSeq to waiting.lastKey).iterator
169+ .filterNot(seq => waiting.contains(seq) || prevOutstanding.contains(seq)).toList
170+ askFor foreach askAround
171+ if (prevOutstanding.isEmpty) scheduleTick()
172+ val outstanding = prevOutstanding ++ askFor
173+ context.become {
174+ case Replicate (s, _, _, replyTo) if s < expectedSeq =>
175+ replyTo ! Replicated (s)
176+ case r : Replicate =>
177+ consolidate(theStore, expectedSeq, outstanding - r.seq, waiting + (r.seq -> r))
178+ case TakeOver (active) =>
179+ log.info(" delaying active replica takeOver, at seq {} while highest is {}" , expectedSeq, waiting.lastKey)
180+ awaitingInitialState = Some (active)
181+ case GetSingle (s, replyTo) =>
182+ log.info(" GetSingle from {}" , replyTo)
183+ if (applied.nonEmpty && applied.head.seq <= s && applied.last.seq >= s)
184+ replyTo ! applied.find(_.seq == s).get
185+ else if (s < expectedSeq) replyTo ! InitialState (theStore, expectedSeq)
186+ case GetFull (replyTo) =>
187+ log.info(" sending full info to {}" , replyTo)
188+ replyTo ! InitialState (theStore, expectedSeq)
189+ case DoConsolidate =>
190+ outstanding foreach askAround
191+ scheduleTick()
192+ }
193+ }
194+
195+ def fallBehind (expectedSeq : Int , _waiting : TreeMap [Int , Replicate ]): Unit = {
196+ askAroundFullState()
197+ scheduleTick()
198+ var waiting = _waiting
199+ context.become {
200+ case Replicate (s, _, _, replyTo) if s < expectedSeq =>
201+ replyTo ! Replicated (s)
202+ case r : Replicate =>
203+ waiting += (r.seq -> r)
204+ case TakeOver (active) =>
205+ log.info(" delaying active replica takeOver, at seq {} while highest is {}" , expectedSeq, waiting.lastKey)
206+ awaitingInitialState = Some (active)
207+ case InitialState (m, s) if s > expectedSeq =>
208+ log.info(" received newer state at sequence {} (was at {})" , s, expectedSeq)
209+ persist(name, s, m)
210+ waiting.to(s).valuesIterator foreach (r => r.replyTo ! Replicated (r.seq))
211+ val nextWaiting = waiting.from(expectedSeq)
212+ consolidate(m, s + 1 , Set .empty, nextWaiting)
213+ case DoConsolidate =>
214+ askAroundFullState()
215+ scheduleTick()
216+ }
217+ }
218+
219+ private val matches = (p : (Int , Int )) => p._1 == p._2
220+
221+ private def consolidate (theStore : Map [String , JsValue ], expectedSeq : Int , askedFor : Set [Int ], waiting : TreeMap [Int , Replicate ]): Unit = {
165222 // calculate applicable prefix length
166- def matches (p : (Int , Int )) = p._1 == p._2
167223 val prefix = waiting.keysIterator.zip(Iterator from expectedSeq).takeWhile(matches).size
168224
169- if (prefix > 0 ) {
170- waiting.valuesIterator.take(prefix) foreach { replicate =>
171- theStore += replicate.key -> replicate.value
172- expectedSeq = replicate.seq + 1
173- persist(name, replicate.seq, theStore)
174- replicate.replyTo ! Replicated (replicate.seq)
175- applied.enqueue(replicate)
176- }
177- waiting = waiting.drop(prefix)
225+ val nextStore = waiting.valuesIterator.take(prefix).foldLeft(theStore) { (store, replicate) =>
226+ persist(name, replicate.seq, theStore)
227+ replicate.replyTo ! Replicated (replicate.seq)
228+ applied.enqueue(replicate)
229+ store + (replicate.key -> replicate.value)
178230 }
231+ val nextWaiting = waiting.drop(prefix)
232+ val nextExpectedSeq = expectedSeq + prefix
179233
180234 // cap the size of the applied buffer
181- applied.drop(Math .max(0 , applied.size - 10 ))
182-
183- // check if we fell behind by too much
184- if (waiting.nonEmpty && (waiting.lastKey - expectedSeq > 10 )) {
185- val outstanding = (expectedSeq to waiting.lastKey).iterator.filterNot(waiting.contains).toList
186- if (outstanding.size <= 3 ) outstanding foreach askAround
187- else askAroundFullState()
188- context.system.scheduler.scheduleOnce(askAroundInterval, self, DoConsolidate )(context.dispatcher)
189- }
235+ applied.drop(Math .max(0 , applied.size - maxLag))
236+
237+ if (nextWaiting.nonEmpty) {
238+ // check if we fell behind by too much
239+ if (nextWaiting.lastKey - nextExpectedSeq > maxLag) fallBehind(nextExpectedSeq, nextWaiting)
240+ else missingSomeUpdates(nextStore, nextExpectedSeq, askedFor, nextWaiting)
241+ } else caughtUp(nextStore, nextExpectedSeq)
190242 }
191243
192244 private def getMembers (n : Int ): Seq [Address ] = {
@@ -230,7 +282,7 @@ object ActivePassive {
230282
231283 def start (port : Option [Int ]): ActorSystem = {
232284 val system = ActorSystem (" ActivePassive" , roleConfig(" backend" , port) withFallback commonConfig)
233- val localReplica = system.actorOf(Props (new Passive (3 , 3 .seconds)), " passive" )
285+ val localReplica = system.actorOf(Props (new Passive (3 , 3 .seconds, 100 )), " passive" )
234286 val managerProps =
235287 ClusterSingletonManager .props(Props (new Active (localReplica, 2 , 120 )), " active" , PoisonPill ,
236288 role = Some (" backend" ), retryInterval = 150 .millis)
@@ -262,7 +314,7 @@ object ActivePassive {
262314 })) ! Run
263315
264316 Thread .sleep(10000 )
265-
317+
266318 val rnd = new Random
267319 while (! terminate) {
268320 Thread .sleep(5000 )
@@ -283,7 +335,7 @@ object ActivePassive {
283335 sys.shutdown()
284336 systems foreach (_.shutdown())
285337 }
286-
338+
287339 private def awaitMembers (sys : ActorSystem , count : Int ): Unit = {
288340 while (Cluster (sys).state.members.size < count) {
289341 Thread .sleep(500 )
0 commit comments