From 1ce002fe1a821a0d19e39ea5b1eeda15f6ad766f Mon Sep 17 00:00:00 2001 From: yawzhang Date: Thu, 15 Jan 2026 17:04:03 +0800 Subject: [PATCH] save AI-friendly knowledge This PR aims to preserve knowledge generated during regular debugging and work, making it easier for AI to understand our architecture without having to read the code every time it needs to solve a problem. Let's save them here for supplementation and correction. No need to merge it now, as it is still in a very early and rough stage --- .../knowledges/homestore_checkpoint_system.md | 381 ++++++++++++++++++ docs/knowledges/replication_service_cp.md | 379 +++++++++++++++++ 2 files changed, 760 insertions(+) create mode 100644 docs/knowledges/homestore_checkpoint_system.md create mode 100644 docs/knowledges/replication_service_cp.md diff --git a/docs/knowledges/homestore_checkpoint_system.md b/docs/knowledges/homestore_checkpoint_system.md new file mode 100644 index 000000000..b80e456dd --- /dev/null +++ b/docs/knowledges/homestore_checkpoint_system.md @@ -0,0 +1,381 @@ +# HomeStore Checkpoint (CP) System Architecture + +## Overview +HomeStore's checkpoint system provides transactional consistency by coordinating flush operations across multiple consumers (INDEX_SVC, REPLICATION_SVC, BLK_DATA_SVC, LOG_SVC, SEALER). + +## Key Components + +### 1. CP (Checkpoint) Object +**Location**: `HomeStore/src/include/homestore/checkpoint/cp.hpp` + +**Structure**: +```cpp +class CP { + cp_id_t m_cp_id; + std::atomic m_cp_status; + CPManager* m_cp_mgr; + std::array, (size_t)cp_consumer_t::SENTINEL> m_contexts; + folly::SharedPromise m_comp_promise; + Clock::time_point m_cp_start_time; +}; +``` + +**CP States**: +- `cp_trigger`: CP flush triggered +- `cp_flush_prepare`: Switchover completed, preparing to flush +- `cp_io_ready`: New CP ready for IO operations +- `cp_flushing`: Currently flushing +- `cp_flush_done`: Flush completed +- `cp_cleaning`: Cleanup in progress + +### 2. CPContext Base Class +**Location**: `HomeStore/src/include/homestore/checkpoint/cp_mgr.hpp` + +```cpp +class CPContext { +protected: + CP* m_cp; + folly::Promise m_flush_comp; // Used by collectAll in cp_flush +public: + CPContext(CP* cp) : m_cp{cp} {} + void complete(bool status) { m_flush_comp.setValue(status); } + folly::Future get_future() { return m_flush_comp.getFuture(); } + virtual ~CPContext() = default; +}; +``` + +**Important**: Each consumer creates its own CPContext subclass to hold consumer-specific CP state. + +### 3. CPManager +**Location**: `HomeStore/src/lib/checkpoint/cp_mgr.cpp` + +**Key Members**: +```cpp +class CPManager { + CP* m_cur_cp; // RCU-protected pointer + std::mutex m_trigger_cp_mtx; // Serializes CP trigger operations + std::atomic m_in_flush_phase; // Prevents concurrent flushes + bool m_pending_trigger_cp; // Back-to-back CP flag + folly::SharedPromise m_pending_trigger_cp_comp; // For queued CP + std::vector m_cp_io_fibers; // 2 blocking IO fibers + std::array, SENTINEL> m_cp_cb_table; +}; +``` + +## CP Lifecycle + +### Phase 1: Trigger (do_trigger_cp_flush) +**File**: `cp_mgr.cpp:201-270` + +```cpp +folly::Future CPManager::do_trigger_cp_flush(bool force, bool flush_on_shutdown) { + std::unique_lock lk(m_trigger_cp_mtx); + + // Back-to-back CP handling + if (m_in_flush_phase) { + if (force && (!m_cp_shutdown_initiated || flush_on_shutdown)) { + if (!m_pending_trigger_cp) { + m_pending_trigger_cp = true; + m_pending_trigger_cp_comp = std::move(folly::SharedPromise{}); + } + return m_pending_trigger_cp_comp.getFuture(); + } + return folly::makeFuture(false); + } + m_in_flush_phase = true; + + auto cur_cp = cp_guard(); // Get current CP + auto new_cp = new CP(this); + new_cp->m_cp_id = cur_cp->m_cp_id + 1; + + // Phase 1a: Switchover SEALER first + auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER]; + if (sealer_cp) { + new_cp->m_contexts[(size_t)cp_consumer_t::SEALER] = + std::move(sealer_cp->on_switchover_cp(cur_cp.get(), new_cp)); + } + + // Phase 1b: Switchover other consumers + for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) { + if (svcid == (size_t)cp_consumer_t::SEALER) continue; + auto& consumer = m_cp_cb_table[svcid]; + if (consumer) { + new_cp->m_contexts[svcid] = std::move(consumer->on_switchover_cp(cur_cp.get(), new_cp)); + } + } + + // Phase 1c: Setup promise and switch m_cur_cp pointer + if (m_pending_trigger_cp) { + cur_cp->m_comp_promise = std::move(m_pending_trigger_cp_comp); + m_pending_trigger_cp = false; + } else { + cur_cp->m_comp_promise = std::move(folly::SharedPromise{}); + } + ret_fut = cur_cp->m_comp_promise.getFuture(); + + cur_cp->m_cp_status = cp_status_t::cp_flush_prepare; + new_cp->m_cp_status = cp_status_t::cp_io_ready; + + // RCU pointer swap - guarantees all readers see new CP + rcu_xchg_pointer(&m_cur_cp, new_cp); + synchronize_rcu(); // Wait for all RCU readers to finish + + lk.unlock(); + return ret_fut; +} +``` + +**Key Synchronization**: +- `m_trigger_cp_mtx`: Serializes all CP trigger operations +- `m_in_flush_phase`: Set to `true` during entire CP flush, prevents concurrent triggers +- `rcu_xchg_pointer` + `synchronize_rcu()`: Safe pointer switching without locking read path + +### Phase 2: Flush (cp_start_flush) +**File**: `cp_mgr.cpp:272-293` + +```cpp +void CPManager::cp_start_flush(CP* cp) { + std::vector> futs; + cp->m_cp_status = cp_status_t::cp_flushing; + + // Collect futures from all consumers (except SEALER) + for (size_t svcid = 0; svcid < (size_t)cp_consumer_t::SENTINEL; svcid++) { + if (svcid == (size_t)cp_consumer_t::SEALER) continue; + auto& consumer = m_cp_cb_table[svcid]; + bool participated = (cp->m_contexts[svcid] != nullptr); + if (consumer && participated) { + futs.emplace_back(std::move(consumer->cp_flush(cp))); + } + } + + // Wait for all consumers to flush + folly::collectAllUnsafe(futs).thenValue([this, cp](auto) { + // SEALER (REPLICATION_SVC) flushes last synchronously + auto& sealer_cp = m_cp_cb_table[(size_t)cp_consumer_t::SEALER]; + bool participated = (cp->m_contexts[(size_t)cp_consumer_t::SEALER] != nullptr); + if (sealer_cp && participated) { + sealer_cp->cp_flush(cp).wait(); + } + on_cp_flush_done(cp); + }); +} +``` + +**Important**: +- `collectAllUnsafe` holds references to futures from `cp->m_contexts[]->get_future()` +- SEALER flushes last because it updates cp_lsn (other components must flush up to this LSN) + +### Phase 3: Completion (on_cp_flush_done) +**File**: `cp_mgr.cpp:295-331` + +```cpp +void CPManager::on_cp_flush_done(CP* cp) { + cp->m_cp_status = cp_status_t::cp_flush_done; + + iomanager.run_on_forget(pick_blocking_io_fiber(), [this, cp]() { + ++(m_sb->m_last_flushed_cp); + m_sb.write(); + + cleanup_cp(cp); // Call all consumers' cp_cleanup() + + auto promise = std::move(cp->m_comp_promise); + delete cp; // Destructs m_contexts[] in array order: [0]→[1]→[2]→[3] + + bool trigger_back_2_back_cp{false}; + { + std::unique_lock lk(m_trigger_cp_mtx); + m_in_flush_phase = false; // Release flush lock + trigger_back_2_back_cp = m_pending_trigger_cp; + } + + promise.setValue(true); // Notify waiters + + if (trigger_back_2_back_cp) { + trigger_cp_flush(false); // Start queued CP + } + }); +} +``` + +**Critical Details**: +- Runs on `pick_blocking_io_fiber()` - randomly picks 1 of 2 cp_io fibers +- `delete cp` destructs `m_contexts[]` in array index order +- `m_in_flush_phase = false` allows next CP to proceed +- Promise is set AFTER releasing lock to avoid race + +### Phase 4: Cleanup +**File**: `cp_mgr.cpp:333-338` + +```cpp +void CPManager::cleanup_cp(CP* cp) { + cp->m_cp_status = cp_status_t::cp_cleaning; + for (auto& consumer : m_cp_cb_table) { + if (consumer) { consumer->cp_cleanup(cp); } + } +} +``` + +## Back-to-Back CP Mechanism + +**Scenario**: Multiple `trigger_cp_flush(force=true)` calls while CP is flushing + +**Behavior**: +1. First call starts CP=N, sets `m_in_flush_phase = true` +2. Second call (while CP=N flushing) sets `m_pending_trigger_cp = true`, queues CP=N+1 +3. Third call (while still flushing) reuses same `m_pending_trigger_cp_comp` promise +4. When CP=N completes, `on_cp_flush_done` checks `m_pending_trigger_cp` and auto-triggers CP=N+1 +5. Only the LAST queued CP is triggered (earlier ones are merged) + +**Code Flow**: +``` +T1: trigger_cp_flush(force=true) → Start CP=443 +T2: trigger_cp_flush(force=true) → Queue CP=444 (m_pending_trigger_cp = true) +T3: trigger_cp_flush(force=true) → Reuse CP=444 promise (still m_pending_trigger_cp = true) +T4: CP=443 completes → on_cp_flush_done → trigger_cp_flush(false) → Start CP=444 +``` + +## CP IO Fibers + +**Creation**: `cp_mgr.cpp:340-372` + +```cpp +void CPManager::start_cp_thread() { + auto const num_fibers = HS_DYNAMIC_CONFIG(generic.cp_io_fibers); // default: 2 + iomanager.create_reactor("cp_io", iomgr::INTERRUPT_LOOP, num_fibers, [this, ctx](bool is_started) { + if (is_started) { + auto v = iomanager.sync_io_capable_fibers(); + m_cp_io_fibers.insert(m_cp_io_fibers.end(), v.begin(), v.end()); + } + }); +} + +iomgr::io_fiber_t CPManager::pick_blocking_io_fiber() const { + static thread_local std::random_device s_rd{}; + static thread_local std::default_random_engine s_re{s_rd()}; + static auto rand_fiber = std::uniform_int_distribution(0, m_cp_io_fibers.size() - 1); + return m_cp_io_fibers[rand_fiber(s_re)]; // Randomly pick one of 2 fibers +} +``` + +**Key Points**: +- 2 fibers by default (configurable via `generic.cp_io_fibers`) +- Created with `INTERRUPT_LOOP` mode for blocking IO operations +- `pick_blocking_io_fiber()` randomly distributes tasks across fibers +- **Implication**: Multiple CPs' `on_cp_flush_done` can execute concurrently on different fibers + +## RCU (Read-Copy-Update) for m_cur_cp + +**Purpose**: Lock-free read access to current CP pointer + +**Read Path** (`cp_mgr.cpp:160-171`): +```cpp +CP* CPManager::cp_io_enter() { + rcu_read_lock(); + auto cp = get_cur_cp(); // Read m_cur_cp + if (!cp) { + rcu_read_unlock(); + return nullptr; + } + cp_ref(cp); // Increment refcount + rcu_read_unlock(); + return cp; +} +``` + +**Write Path** (`cp_mgr.cpp:260-261`): +```cpp +rcu_xchg_pointer(&m_cur_cp, new_cp); +synchronize_rcu(); // Wait for all readers in RCU critical section to exit +``` + +**Guarantee**: After `synchronize_rcu()` returns, no thread is accessing old CP without holding a reference (via `cp_ref()`). + +## Consumer Order + +### Switchover Order (on_switchover_cp) +1. **SEALER** (REPLICATION_SVC) - First +2. Other consumers: INDEX_SVC, BLK_DATA_SVC, LOG_SVC + +### Flush Order (cp_flush) +1. INDEX_SVC, BLK_DATA_SVC, LOG_SVC - Parallel (via `collectAllUnsafe`) +2. **SEALER** (REPLICATION_SVC) - Last, synchronous + +### Cleanup Order (cp_cleanup) +- All consumers in `m_cp_cb_table` order + +### Destruction Order (CP::~CP) +- `m_contexts[]` destructs in array index order: + - [0] INDEX_SVC + - [1] BLK_DATA_SVC + - [2] LOG_SVC + - [3] REPLICATION_SVC (SEALER) + +## Important Invariants + +1. **No Concurrent Flushes**: `m_trigger_cp_mtx` + `m_in_flush_phase` ensure only one CP flushes at a time +2. **No Concurrent Triggers**: `m_trigger_cp_mtx` serializes all `do_trigger_cp_flush` calls +3. **Safe CP Pointer Switch**: RCU + refcounting ensure old CP not accessed after switchover +4. **Back-to-Back Merging**: Only last queued force-flush CP is triggered +5. **SEALER Last**: REPLICATION_SVC always flushes last to establish cp_lsn watermark + +## Common Issues + +### Issue 1: Returning Wrong Future in cp_flush +**Symptom**: CP completes immediately even if work is pending +**Cause**: Returning `folly::makeFuture(true)` instead of `CPContext::get_future()` +**Impact**: `collectAllUnsafe` completes prematurely, may trigger cleanup while work ongoing + +### Issue 2: Concurrent Access to CPContext +**Symptom**: Use-after-free, memory corruption +**Cause**: `pick_blocking_io_fiber()` schedules cleanup on different fiber than switchover +**Solution**: Ensure synchronization between `on_switchover_cp` and `cp_cleanup` + +### Issue 3: Deadlock in Signal Handler +**Symptom**: Process hangs instead of exiting on crash +**Cause**: Signal handler calls malloc-dependent logging (spdlog) while malloc holds arena lock +**Solution**: Use async-signal-safe functions only in signal handlers + +## Related Configuration + +- `generic.cp_io_fibers`: Number of CP IO fibers (default: 2) +- `generic.repl_dev_cleanup_interval_sec`: Interval for repl_dev GC timer +- Various consumer-specific CP flush intervals + +## Example Timeline (Back-to-Back CP) + +``` +T0: CP=443 working +T1: destroy_pg(1) → trigger_cp_flush(force=true) + → m_in_flush_phase=true, queue CP=444 + +T2: CP=443 flush done + → on_cp_flush_done(443) on Fiber A + → cleanup_cp(443) + → delete CP=443 + → m_in_flush_phase = false + → trigger_cp_flush(false) → Start CP=444 + +T3: do_trigger_cp_flush(444) + → on_switchover_cp(444, 445) // CRITICAL: Happens BEFORE CP=444 starts flushing! + → Creates CP=445 contexts + → RaftReplServiceCPHandler::on_switchover_cp(444, 445) + → Accesses CP=444's m_cp_ctx_map to save state + → cp_start_flush(444) + +T4: destroy_pg(2) → trigger_cp_flush(force=true) + → m_in_flush_phase=true, queue CP=445 + +T5: CP=444 flush done (all consumers skip) + → on_cp_flush_done(444) on Fiber B // May be different fiber! + → cleanup_cp(444) + → delete CP=444 + → m_in_flush_phase = false + → trigger CP=445 + +T6: CP=445 starts, completes, cleanup + → delete CP=445 + → ~m_contexts[0] (INDEX_SVC) + → ~m_contexts[1] (BLK_DATA_SVC) // May corrupt memory here + → ~m_contexts[2] (LOG_SVC) + → ~m_contexts[3] (REPLICATION_SVC) // CRASH: m_cp_ctx_map corrupted +``` diff --git a/docs/knowledges/replication_service_cp.md b/docs/knowledges/replication_service_cp.md new file mode 100644 index 000000000..6c6021cf5 --- /dev/null +++ b/docs/knowledges/replication_service_cp.md @@ -0,0 +1,379 @@ +# Replication Service Checkpoint Implementation + +## Overview +RaftReplService implements checkpoint (CP) callbacks to persist replication state across all ReplDev instances during HomeStore checkpoint operations. + +## Key Classes + +### 1. ReplSvcCPContext +**Location**: `HomeStore/src/lib/replication/service/raft_repl_service.h:132-141` + +```cpp +class ReplSvcCPContext : public CPContext { + std::shared_mutex m_cp_map_mtx; // ⚠️ DECLARED BUT NEVER USED! + std::map> m_cp_ctx_map; // Raw pointer as key + +public: + ReplSvcCPContext(CP* cp) : CPContext(cp){}; + virtual ~ReplSvcCPContext() = default; + + int add_repl_dev_ctx(ReplDev* dev, cshared dev_ctx); + cshared get_repl_dev_ctx(ReplDev* dev); +}; +``` + +**Critical Bug**: `m_cp_map_mtx` is declared but never locked in `add_repl_dev_ctx()` or `get_repl_dev_ctx()`! + +**Implementation** (`raft_repl_service.cpp:760-770`): +```cpp +int ReplSvcCPContext::add_repl_dev_ctx(ReplDev* dev, cshared dev_ctx) { + m_cp_ctx_map.emplace(dev, dev_ctx); // ⚠️ NO LOCK PROTECTION! + return 0; +} + +cshared ReplSvcCPContext::get_repl_dev_ctx(ReplDev* dev) { + if (m_cp_ctx_map.count(dev) == 0) { + return std::make_shared(); + } + return m_cp_ctx_map[dev]; // ⚠️ NO LOCK PROTECTION! +} +``` + +**Data Structure**: `m_cp_ctx_map` is a `std::map` (red-black tree) with: +- **Key**: Raw `ReplDev*` pointer +- **Value**: `shared_ptr` + +### 2. ReplDevCPContext +**Purpose**: Holds per-ReplDev checkpoint state (LSN, etc.) + +```cpp +struct ReplDevCPContext { + repl_lsn_t lsn; // Last LSN to checkpoint + // Other repl_dev specific CP state +}; +``` + +### 3. RaftReplServiceCPHandler +**Location**: `HomeStore/src/lib/replication/service/raft_repl_service.h:143-153` + +Implements the CPCallbacks interface for REPLICATION_SVC consumer. + +## CP Callback Implementations + +### on_switchover_cp() +**Location**: `raft_repl_service.cpp:773-790` + +```cpp +std::unique_ptr RaftReplServiceCPHandler::on_switchover_cp(CP* cur_cp, CP* new_cp) { + // checking if cur_cp == nullptr as on_switchover_cp will be called when registering the cp handler + if (cur_cp != nullptr) { + // Add cp info from all devices to current cp. + // We dont need taking cp_guard as cp_mgr already taken it in do_trigger_cp_flush + auto cur_cp_ctx = s_cast(cur_cp->context(cp_consumer_t::REPLICATION_SVC)); + + repl_service().iterate_repl_devs([cur_cp, cur_cp_ctx](cshared& repl_dev) { + // Collect the LSN of each repl dev and put it into current CP + auto dev_ctx = std::static_pointer_cast(repl_dev)->get_cp_ctx(cur_cp); + cur_cp_ctx->add_repl_dev_ctx(repl_dev.get(), std::move(dev_ctx)); // Line 784 + }); + } + // create new ctx for new_cp + auto ctx = std::make_unique(new_cp); + return ctx; +} +``` + +**Key Points**: +- Called during `do_trigger_cp_flush` while holding `m_trigger_cp_mtx` +- Populates **cur_cp's** m_cp_ctx_map, not new_cp's +- Iterates all repl_devs and collects their CP state +- Comment says "We dont need taking cp_guard" - assumes CPManager lock provides protection + +**Execution Context**: Runs on thread calling `trigger_cp_flush`, protected by `m_trigger_cp_mtx` + +### cp_flush() +**Location**: `raft_repl_service.cpp:792-799` + +```cpp +folly::Future RaftReplServiceCPHandler::cp_flush(CP* cp) { + auto cp_ctx = s_cast(cp->context(cp_consumer_t::REPLICATION_SVC)); + + repl_service().iterate_repl_devs([cp, cp_ctx](cshared& repl_dev) { + auto dev_ctx = cp_ctx->get_repl_dev_ctx(repl_dev.get()); // Line 795 + std::static_pointer_cast(repl_dev)->cp_flush(cp, dev_ctx); + }); + + return folly::makeFuture(true); // ⚠️ BUG: Should return cp_ctx->get_future() +} +``` + +**Known Bug**: Returns immediate future instead of `cp_ctx->get_future()`. However: +- All work is actually completed synchronously before return +- `RaftReplDev::cp_flush()` is synchronous +- Returning `true` is semantically correct for this implementation + +**Why It Works**: Unlike async implementations, this handler completes all flush work before returning, so immediate `true` is safe. + +### cp_cleanup() +**Location**: `raft_repl_service.cpp:801-804` + +```cpp +void RaftReplServiceCPHandler::cp_cleanup(CP* cp) { + repl_service().iterate_repl_devs( + [cp](cshared& repl_dev) { + std::static_pointer_cast(repl_dev)->cp_cleanup(cp); + }); +} +``` + +**Execution Context**: Called from `CPManager::cleanup_cp()` which runs on `pick_blocking_io_fiber()`. + +## Critical Race Condition (Unconfirmed) + +### Hypothetical Scenario +``` +Thread A (do_trigger_cp_flush): + ├─ Acquires m_trigger_cp_mtx + ├─ on_switchover_cp(CP=444, CP=445) + │ └─ cur_cp_ctx->add_repl_dev_ctx(...) // Modifies CP=444's m_cp_ctx_map + ├─ Releases m_trigger_cp_mtx + └─ cp_start_flush(444) + +Thread B (on_cp_flush_done for CP=443): + ├─ run_on_forget(pick_blocking_io_fiber(), [...]) + │ ├─ cleanup_cp(443) + │ ├─ delete CP=443 + │ ├─ Sets m_in_flush_phase = false + │ └─ trigger_cp_flush(false) → Starts CP=444 + │ └─ Acquires m_trigger_cp_mtx + │ └─ on_switchover_cp(444, 445) + │ └─ add_repl_dev_ctx(...) // ⚠️ Concurrent access? +``` + +**Why This May Not Be a Race**: +- `m_trigger_cp_mtx` serializes all `do_trigger_cp_flush` calls +- `on_switchover_cp` only runs while holding this lock +- `cleanup_cp` runs AFTER CP flush completes, when switchover for next CP already done +- **However**: With 2 cp_io fibers, cleanup can run concurrently with next CP's switchover + +### Actual Issue: Memory Corruption During Destruction + +**Observed Evidence** (from GDB): +``` +CP=445 destruction: + ~m_contexts[0] (INDEX_SVC) → OK + ~m_contexts[1] (BLK_DATA_SVC) → Triggers folly::collectAll::Context cleanup + → Corrupts memory (ASan markers appear) + ~m_contexts[2] (LOG_SVC) → OK + ~m_contexts[3] (REPLICATION_SVC) → m_cp_ctx_map already corrupted + → CRASH in ~Promise() +``` + +**Red-Black Tree Corruption**: +- `m_cp_ctx_map` uses `std::map` (red-black tree) +- Tree nodes found with ASan markers: `0x0000610000000000` (heap-left-redzone) +- Parent pointers corrupted: `0x2252feb743377eb5` (invalid) + +## ReplDev Lifecycle vs CP + +### ReplDev Destruction +**Location**: `HomeObject/src/lib/homestore_backend/hs_pg_manager.cpp:589-613` + +```cpp +bool HSHomeObject::pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine) { + mark_pg_destroyed(pg_id); + destroy_shards(pg_id); + destroy_hs_resources(pg_id); + destroy_pg_index_table(pg_id); + destroy_pg_superblk(pg_id); // Triggers CP flush with force=true + + // ⚠️ NOTE: Does NOT destroy ReplDev or remove from m_rd_map! + return true; +} +``` + +**Key Point**: PG destroy does NOT destroy ReplDev. The ReplDev pointer remains valid and in `m_rd_map`. + +**Implication**: Using `ReplDev*` as map key is safe during CP lifecycle, no dangling pointers. + +### When ReplDev is Actually Destroyed +**Location**: `HomeStore/src/lib/replication/service/raft_repl_service.cpp:699-735` + +```cpp +void RaftReplService::gc_repl_devs() { + std::vector groups_to_leave; + { + std::shared_lock lg(m_rd_map_mtx); + for (auto it = m_rd_map.begin(); it != m_rd_map.end(); ++it) { + auto rdev = std::dynamic_pointer_cast(it->second); + if (rdev->is_destroy_pending() && + (get_elapsed_time_sec(rdev->destroyed_time()) >= + HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec))) { + groups_to_leave.push_back(rdev->group_id()); + } + } + } + + for (const auto& group_id : groups_to_leave) { + m_msg_mgr->leave_group(group_id); + m_repl_app->destroy_repl_dev_listener(group_id); + { + std::unique_lock lg(m_rd_map_mtx); + m_rd_map.erase(group_id); // Finally remove from map + } + } +} +``` + +**Timing**: ReplDev destroyed after configurable delay (default: seconds), long after PG destroy completes. + +## Synchronization Analysis + +### Protection Mechanisms + +1. **m_trigger_cp_mtx**: Serializes CP trigger operations + - Held during entire `on_switchover_cp` execution + - Prevents concurrent switchover calls + +2. **m_in_flush_phase**: Prevents concurrent CP flushes + - Set to `true` at start of `do_trigger_cp_flush` + - Set to `false` in `on_cp_flush_done` + +3. **RCU for m_cur_cp**: Lock-free read access + - `rcu_xchg_pointer` + `synchronize_rcu` for safe pointer swap + - Guarantees no readers accessing old CP without reference + +4. **NOT Protected**: `m_cp_ctx_map` access + - `add_repl_dev_ctx()`: No lock + - `get_repl_dev_ctx()`: No lock + - `m_cp_map_mtx` declared but never used + +### Why It (Mostly) Works + +**Assumption**: `m_trigger_cp_mtx` provides implicit protection because: +- `on_switchover_cp` only modifies **cur_cp's** context, not new_cp's +- `cp_flush` reads **same CP's** context that was populated in switchover +- `cleanup_cp` only called after CP flush completes + +**Single-Threaded Access Pattern**: +``` +do_trigger_cp_flush(N): + [Hold m_trigger_cp_mtx] + ├─ on_switchover_cp(N, N+1) + │ └─ Populate CP=N's m_cp_ctx_map + [Release m_trigger_cp_mtx] + └─ cp_start_flush(N) + └─ cp_flush(N) + └─ Read CP=N's m_cp_ctx_map + +on_cp_flush_done(N): + └─ cleanup_cp(N) + └─ Read CP=N's m_cp_ctx_map +``` + +**Why No Race**: Each CP's context is only accessed by operations on that specific CP, and CP operations are serialized. + +## Potential Issues + +### Issue 1: Unused Mutex +**Code**: `std::shared_mutex m_cp_map_mtx` declared but never locked + +**Risk**: Low - current access pattern is single-threaded per CP + +**Recommendation**: Either remove unused mutex or add proper locking for defense-in-depth + +### Issue 2: Raw Pointer as Map Key +**Code**: `std::map>` + +**Risk**: Low - ReplDev destroyed long after CP completes + +**Benefit**: Avoids shared_ptr overhead for key comparisons + +### Issue 3: Returning Immediate Future +**Code**: `return folly::makeFuture(true)` instead of `cp_ctx->get_future()` + +**Risk**: Low - work completes synchronously before return + +**Confusion**: Other consumers may expect to return the context's future + +## Interaction with Other Services + +### During Baseline Resync + +**Scenario**: Destroy old PG → Create new PG + +``` +Timeline: +T1: destroy_pg(pg_id=1) + ├─ destroy_pg_superblk(1) + │ └─ trigger_cp_flush(force=true) → Queues CP=444 + ├─ ReplDev for pg=1 still exists in m_rd_map + └─ Returns + +T2: CP=443 completes + └─ Auto-triggers CP=444 + +T3: CP=444 switchover + ├─ on_switchover_cp(444, 445) + │ └─ iterate_repl_devs([...]) // Includes ReplDev for pg=1 + │ └─ add_repl_dev_ctx(repl_dev.get(), ...) + └─ cp_start_flush(444) + +T4: destroy_pg(pg_id=2) + └─ trigger_cp_flush(force=true) → Queues CP=445 + +T5: CP=444 completes (all rdevs skip flush) + └─ Auto-triggers CP=445 + +T6: CP=445 runs, completes, destructs + └─ CRASH during m_cp_ctx_map destruction +``` + +**Key**: ReplDevs persist across PG destroy, remain in CP context. + +## Related Configuration + +- `generic.repl_dev_cleanup_interval_sec`: Delay before destroying ReplDev after marking for destruction +- `consensus.flush_durable_commit_interval_ms`: Interval for flushing durable commit LSN +- `consensus.replace_member_sync_check_interval_ms`: Interval for monitoring replace_member status + +## Common Misconceptions + +1. **"ReplDev is destroyed when PG is destroyed"**: FALSE - ReplDev destroyed later by GC thread +2. **"m_cp_map_mtx protects the map"**: FALSE - mutex is never locked +3. **"Returning wrong future causes bug"**: UNCLEAR - work completes synchronously +4. **"Raw pointers cause dangling"**: FALSE - ReplDev lifetime exceeds CP lifetime + +## Debug Tips + +### Check m_cp_ctx_map State +```gdb +# Get CP context +set $cp = (homestore::CP*)0x... +set $ctx = (homestore::ReplSvcCPContext*)$cp->m_contexts[3].get() + +# Examine map +p $ctx->m_cp_ctx_map +p $ctx->m_cp_ctx_map._M_t._M_impl._M_node_count + +# Check red-black tree integrity +set $root = $ctx->m_cp_ctx_map._M_t._M_impl._M_header._M_parent +x/32gx $root +``` + +### Check ReplDev Validity +```gdb +# Check if ReplDev still in map +set $rdev_ptr = (homestore::ReplDev*)0x... +p homestore::repl_service().get_repl_dev(...) + +# Verify refcount +p $rdev_ptr.use_count() +``` + +### Monitor CP Lifecycle +Add breakpoints: +- `RaftReplServiceCPHandler::on_switchover_cp` +- `RaftReplServiceCPHandler::cp_flush` +- `RaftReplServiceCPHandler::cp_cleanup` +- `ReplSvcCPContext::~ReplSvcCPContext`