Skip to content

Commit dbfdd5d

Browse files
committed
feat(consensus): WAL-backed client table with
commit_min/commit_max split
1 parent f35d7b3 commit dbfdd5d

17 files changed

Lines changed: 854 additions & 326 deletions

File tree

core/binary_protocol/src/consensus/message.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ where
5252
fn fragments(&self) -> &[Frozen<MESSAGE_ALIGN>];
5353
}
5454

55-
#[derive(Debug)]
55+
#[derive(Debug, Clone)]
5656
pub struct RequestBacking {
5757
owned: Owned<MESSAGE_ALIGN>,
5858
}
@@ -362,6 +362,18 @@ where
362362
}
363363
}
364364

365+
impl<H> Clone for Message<H, RequestBacking>
366+
where
367+
H: ConsensusHeader,
368+
{
369+
fn clone(&self) -> Self {
370+
Self {
371+
backing: self.backing.clone(),
372+
_marker: PhantomData,
373+
}
374+
}
375+
}
376+
365377
impl<H> Clone for Message<H, ResponseBacking>
366378
where
367379
H: ConsensusHeader,
Lines changed: 86 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,17 @@ pub struct ClientEntry {
111111
pub reply: Message<ReplyHeader>,
112112
}
113113

114-
/// Result of checking a request against the clients table.
114+
/// Result of checking a request against the client table.
115115
pub enum RequestStatus {
116-
/// Request not seen before proceed with consensus.
116+
/// Request not seen before, proceed with consensus.
117117
New,
118-
/// Request already committed re-send cached reply.
118+
/// Exact request already committed, re-send cached reply.
119119
Duplicate(Message<ReplyHeader>),
120-
/// Request is in the pipeline awaiting commit drop (client should wait).
120+
/// Request is in the pipeline awaiting commit, drop (client should wait).
121121
InProgress,
122+
/// Request number is older than the client's latest committed request.
123+
/// Already handled in a prior commit cycle, drop silently.
124+
Stale,
122125
}
123126

124127
/// VSR client-table: tracks per-client request state for duplicate detection,
@@ -142,11 +145,17 @@ pub enum RequestStatus {
142145
/// The `pending` map is local notification state not replicated, not
143146
/// serialized, not part of the deterministic committed state.
144147
///
145-
/// ## TODO
146-
/// Checkpoint serialization: the slot array is laid out for deterministic
147-
/// encode/decode to disk.
148+
/// ## Known gaps
149+
///
150+
/// - **Message repair**: If a backup never received a prepare (lost message),
151+
/// `commit_journal` stops at the gap. The client table will be missing
152+
/// entries for ops beyond the gap until the message repair protocol is
153+
/// implemented and the missing prepare is retransmitted.
154+
///
155+
/// - **Checkpoint serialization**: The slot array is laid out for deterministic
156+
/// encode/decode to disk, but serialization is not yet implemented.
148157
#[derive(Debug)]
149-
pub struct ClientsTable {
158+
pub struct ClientTable {
150159
/// `None` means the slot is free.
151160
/// Deterministic iteration order for eviction and serialization.
152161
slots: Vec<Option<ClientEntry>>,
@@ -158,7 +167,7 @@ pub struct ClientsTable {
158167
pending: HashMap<ClientRequest, Notify>,
159168
}
160169

161-
impl ClientsTable {
170+
impl ClientTable {
162171
#[must_use]
163172
pub fn new(max_clients: usize) -> Self {
164173
let mut slots = Vec::with_capacity(max_clients);
@@ -173,14 +182,17 @@ impl ClientsTable {
173182
/// Check a request against the table.
174183
///
175184
/// Returns:
176-
/// - [`RequestStatus::New`] — not seen before, proceed with consensus
177-
/// - [`RequestStatus::Duplicate`] — already committed, re-send cached reply
178-
/// - [`RequestStatus::InProgress`] — stale, already pending, or already committed
185+
/// - [`RequestStatus::New`]: not seen before, proceed with consensus
186+
/// - [`RequestStatus::Duplicate`]: already committed, re-send cached reply
187+
/// - [`RequestStatus::InProgress`]: in the pipeline awaiting commit
188+
/// - [`RequestStatus::Stale`]: older than the client's latest committed request
179189
///
180190
/// # Panics
181191
/// Panics if the internal index points to an empty slot (invariant violation).
182192
#[must_use]
183193
pub fn check_request(&self, client_id: u128, request: u64) -> RequestStatus {
194+
assert!(client_id != 0, "client_id 0 is reserved for internal use");
195+
184196
// Check if already pending in the pipeline.
185197
let key = ClientRequest { client_id, request };
186198
if self.pending.contains_key(&key) {
@@ -194,7 +206,7 @@ impl ClientsTable {
194206
let committed_request = entry.reply.header().request;
195207

196208
if request < committed_request {
197-
return RequestStatus::InProgress;
209+
return RequestStatus::Stale;
198210
}
199211
if request == committed_request {
200212
return RequestStatus::Duplicate(entry.reply.clone());
@@ -207,7 +219,7 @@ impl ClientsTable {
207219
///
208220
/// Returns a [`Notify`] the caller can `.notified().await` on. The `Notify`
209221
/// is cloned via `Rc`, so the caller can hold it across `.await` points
210-
/// without borrowing the `ClientsTable`.
222+
/// without borrowing the `ClientTable`.
211223
///
212224
/// Called after `check_request` returns `New`, before submitting the request
213225
/// to the consensus pipeline.
@@ -238,11 +250,30 @@ impl ClientsTable {
238250
/// # Panics
239251
/// Panics if the internal index points to an empty slot (invariant violation).
240252
pub fn commit_reply(&mut self, client_id: u128, reply: Message<ReplyHeader>) {
253+
assert!(client_id != 0, "client_id 0 is reserved for internal use");
254+
assert_eq!(
255+
client_id,
256+
reply.header().client,
257+
"commit_reply: client_id mismatch (arg={client_id}, header={})",
258+
reply.header().client
259+
);
241260
let request = reply.header().request;
242261

243262
if let Some(&slot_idx) = self.index.get(&client_id) {
244-
// Update existing slot in place.
245263
let slot = self.slots[slot_idx].as_mut().expect("index/slot mismatch");
264+
// Monotonicity: both commit (op) and request must not regress.
265+
assert!(
266+
reply.header().commit >= slot.reply.header().commit,
267+
"commit_reply: commit regression for client {client_id}: {} -> {}",
268+
slot.reply.header().commit,
269+
reply.header().commit
270+
);
271+
assert!(
272+
reply.header().request >= slot.reply.header().request,
273+
"commit_reply: request regression for client {client_id}: {} -> {}",
274+
slot.reply.header().request,
275+
reply.header().request
276+
);
246277
slot.reply = reply;
247278
} else {
248279
// Need a free slot. Evict if full.
@@ -267,6 +298,10 @@ impl ClientsTable {
267298
/// Iterates the fixed-size slot array (deterministic order), so all replicas
268299
/// with the same committed state evict the same client. Ties on commit number
269300
/// are broken by slot index (lowest index wins), which is also deterministic.
301+
///
302+
/// **Dedup caveat**: until checkpoint serialization is implemented, eviction
303+
/// breaks at-most-once semantics for the evicted client — a retransmission
304+
/// after eviction will be treated as `New` and re-executed.
270305
fn evict_oldest(&mut self) {
271306
let mut evictee: Option<(usize, u64)> = None; // (slot_idx, commit)
272307

@@ -312,6 +347,21 @@ impl ClientsTable {
312347
pub fn pending_count(&self) -> usize {
313348
self.pending.len()
314349
}
350+
351+
/// Wake and clear all pending waiters (e.g. during view change).
352+
///
353+
/// Stale pending entries from a previous view must not survive into the
354+
/// new view - `check_request` would return `InProgress` for the orphaned
355+
/// keys, silently dropping valid client retries.
356+
///
357+
/// Waiters are notified before removal so that any `.notified().await`
358+
/// callers unblock and can detect the view change (e.g. retry or error).
359+
pub fn clear_pending(&mut self) {
360+
for notify in self.pending.values() {
361+
notify.notify();
362+
}
363+
self.pending.clear();
364+
}
315365
}
316366

317367
#[cfg(test)]
@@ -321,10 +371,11 @@ mod tests {
321371

322372
fn make_reply_for(client: u128, request: u64, commit: u64) -> Message<ReplyHeader> {
323373
let header_size = std::mem::size_of::<ReplyHeader>();
324-
let mut buffer = bytes::BytesMut::zeroed(header_size);
325-
let header =
326-
bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(&mut buffer[..header_size])
327-
.expect("zeroed bytes are valid");
374+
let mut msg = Message::<ReplyHeader>::new(header_size);
375+
let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(
376+
&mut msg.as_mut_slice()[..header_size],
377+
)
378+
.expect("zeroed bytes are valid");
328379
*header = ReplyHeader {
329380
client,
330381
request,
@@ -333,11 +384,11 @@ mod tests {
333384
operation: Operation::SendMessages,
334385
..ReplyHeader::default()
335386
};
336-
Message::<ReplyHeader>::from_bytes(buffer.freeze()).expect("test reply must be valid")
387+
msg
337388
}
338389

339390
fn make_reply(request: u64, commit: u64) -> Message<ReplyHeader> {
340-
make_reply_for(0, request, commit)
391+
make_reply_for(1, request, commit)
341392
}
342393

343394
// Notify tests
@@ -392,17 +443,17 @@ mod tests {
392443
assert!(fut2.as_mut().poll(&mut cx).is_pending());
393444
}
394445

395-
// ClientsTable tests
446+
// ClientTable tests
396447

397448
#[test]
398449
fn check_request_new() {
399-
let table = ClientsTable::new(10);
450+
let table = ClientTable::new(10);
400451
assert!(matches!(table.check_request(1, 1), RequestStatus::New));
401452
}
402453

403454
#[test]
404455
fn check_request_duplicate_after_commit() {
405-
let mut table = ClientsTable::new(10);
456+
let mut table = ClientTable::new(10);
406457
table.commit_reply(1, make_reply(1, 10));
407458

408459
match table.check_request(1, 1) {
@@ -415,18 +466,15 @@ mod tests {
415466

416467
#[test]
417468
fn check_request_stale() {
418-
let mut table = ClientsTable::new(10);
469+
let mut table = ClientTable::new(10);
419470
table.commit_reply(1, make_reply(5, 10));
420471

421-
assert!(matches!(
422-
table.check_request(1, 3),
423-
RequestStatus::InProgress
424-
));
472+
assert!(matches!(table.check_request(1, 3), RequestStatus::Stale));
425473
}
426474

427475
#[test]
428476
fn check_request_in_progress_while_pending() {
429-
let mut table = ClientsTable::new(10);
477+
let mut table = ClientTable::new(10);
430478
let _notify = table.register_pending(1, 1);
431479

432480
assert!(matches!(
@@ -437,7 +485,7 @@ mod tests {
437485

438486
#[test]
439487
fn commit_caches_reply() {
440-
let mut table = ClientsTable::new(10);
488+
let mut table = ClientTable::new(10);
441489
table.commit_reply(1, make_reply(1, 10));
442490

443491
let cached = table.get_reply(1).expect("should have cached reply");
@@ -446,7 +494,7 @@ mod tests {
446494

447495
#[test]
448496
fn commit_updates_existing_entry() {
449-
let mut table = ClientsTable::new(10);
497+
let mut table = ClientTable::new(10);
450498
table.commit_reply(1, make_reply(1, 10));
451499
table.commit_reply(1, make_reply(2, 20));
452500

@@ -457,7 +505,7 @@ mod tests {
457505

458506
#[test]
459507
fn register_and_commit_notifies() {
460-
let mut table = ClientsTable::new(10);
508+
let mut table = ClientTable::new(10);
461509
let notify = table.register_pending(1, 1);
462510

463511
assert_eq!(table.pending_count(), 1);
@@ -475,7 +523,7 @@ mod tests {
475523

476524
#[test]
477525
fn eviction_removes_oldest_commit() {
478-
let mut table = ClientsTable::new(2);
526+
let mut table = ClientTable::new(2);
479527

480528
table.commit_reply(100, make_reply_for(100, 1, 10));
481529
table.commit_reply(200, make_reply_for(200, 1, 20));
@@ -489,7 +537,7 @@ mod tests {
489537

490538
#[test]
491539
fn eviction_is_deterministic_by_slot_index() {
492-
let mut table = ClientsTable::new(2);
540+
let mut table = ClientTable::new(2);
493541

494542
table.commit_reply(100, make_reply_for(100, 1, 10));
495543
table.commit_reply(200, make_reply_for(200, 1, 10));
@@ -502,15 +550,15 @@ mod tests {
502550

503551
#[test]
504552
fn new_request_after_commit_is_new() {
505-
let mut table = ClientsTable::new(10);
553+
let mut table = ClientTable::new(10);
506554
table.commit_reply(1, make_reply(1, 10));
507555

508556
assert!(matches!(table.check_request(1, 2), RequestStatus::New));
509557
}
510558

511559
#[test]
512560
fn slot_reuse_after_eviction() {
513-
let mut table = ClientsTable::new(1);
561+
let mut table = ClientTable::new(1);
514562

515563
table.commit_reply(100, make_reply_for(100, 1, 10));
516564
table.commit_reply(200, make_reply_for(200, 1, 20));
@@ -523,7 +571,7 @@ mod tests {
523571
#[test]
524572
#[should_panic(expected = "already has a pending waiter")]
525573
fn register_pending_twice_panics() {
526-
let mut table = ClientsTable::new(10);
574+
let mut table = ClientTable::new(10);
527575
let _n1 = table.register_pending(1, 1);
528576
let _n2 = table.register_pending(1, 1);
529577
}

0 commit comments

Comments
 (0)