diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp index e635668373..e28f5ccb8b 100644 --- a/src/bthread/rwlock.cpp +++ b/src/bthread/rwlock.cpp @@ -15,350 +15,508 @@ // specific language governing permissions and limitations // under the License. +#include #include "bvar/collector.h" +#include "butil/memory/scope_guard.h" #include "bthread/rwlock.h" +#include "bthread/mutex.h" #include "bthread/butex.h" namespace bthread { -// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock, -// which is a bthread implementation of golang RWMutex. -// The lock can be held by an arbitrary number of readers or a single writer. -// For details, see https://github.com/golang/go/blob/master/src/sync/rwmutex.go - -// Define in bthread/mutex.cpp +// Defined in bthread/mutex.cpp; reused here so that bthread_rwlock_t +// participates in the global ContentionProfiler just like bthread_mutex_t +// and bthread_sem_t. class ContentionProfiler; extern ContentionProfiler* g_cp; extern bvar::CollectorSpeedLimit g_cp_sl; -extern bool is_contention_site_valid(const bthread_contention_site_t& cs); -extern void make_contention_site_invalid(bthread_contention_site_t* cs); extern void submit_contention(const bthread_contention_site_t& csite, int64_t now_ns); -// It is enough for readers. If the reader exceeds this value, -// need to use `int64_t' instead of `int'. -const int RWLockMaxReaders = 1 << 30; - -// For reading. -static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime) { - int reader_count = ((butil::atomic*)&rwlock->reader_count) - ->fetch_add(1, butil::memory_order_acquire) + 1; - // Fast path. - if (reader_count >= 0) { - CHECK_LT(reader_count, RWLockMaxReaders); - return 0; - } - - // Slow path. +// Lazily arm sampling on first contention. Caller must declare +// `size_t sampling_range' and `int64_t start_ns' in scope: +// start_ns == 0 -> not yet decided +// start_ns == -1 -> decided NOT to sample (profiler off / not selected) +// start_ns > 0 -> sampling armed; value is the wall-clock start time +#define BTHREAD_RWLOCK_MAYBE_START_SAMPLING \ + do { \ + if (start_ns == 0) { \ + if (BAIDU_UNLIKELY(g_cp != NULL)) { \ + sampling_range = bvar::is_collectable(&g_cp_sl); \ + start_ns = bvar::is_sampling_range_valid(sampling_range) ? \ + butil::cpuwide_time_ns() : -1; \ + } else { \ + start_ns = -1; \ + } \ + } \ + } while (0) - // Don't sample when contention profiler is off. - if (NULL == bthread::g_cp) { - return bthread_sem_timedwait(&rwlock->reader_sema, abstime); - } - // Ask Collector if this (contended) locking should be sampled. - const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); - if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample. - return bthread_sem_timedwait(&rwlock->reader_sema, abstime); +// Submit one contention sample if sampling was armed for this attempt. +// `start_ns > 0' is the convention used everywhere in this file to indicate +// that BTHREAD_RWLOCK_MAYBE_START_SAMPLING actually decided to sample. +// No-op otherwise. Force-inlined so the uncontended fast path stays cheap. +static BUTIL_FORCE_INLINE void submit_contention_if_sampled( + int64_t start_ns, size_t sampling_range) { + if (BAIDU_UNLIKELY(start_ns > 0)) { + const int64_t end_ns = butil::cpuwide_time_ns(); + const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; + submit_contention(csite, end_ns); } - - // Sample. - const int64_t start_ns = butil::cpuwide_time_ns(); - int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime); - const int64_t end_ns = butil::cpuwide_time_ns(); - const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; - // Submit `csite' for each reader immediately after - // owning rdlock to avoid the contention of `csite'. - bthread::submit_contention(csite, end_ns); - - return rc; -} - -static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) { - return rwlock_rdlock_impl(rwlock, NULL); } -static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime) { - return rwlock_rdlock_impl(rwlock, abstime); -} +// bthread RWLock +// writer-priority implementation overview +// Three synchronization fields are used: +// +// * `lock_word' (32-bit butex): +// bit 31 : 1 if the write lock is held, 0 otherwise. +// bit 0~30: number of readers currently holding the read lock. +// Mutually exclusive: when bit 31 is set, the lower 31 bits are 0. +// +// * `writer_wait_count' (32-bit butex): +// Number of writers that have entered wrlock() but not yet finished +// (i.e. currently waiting for the mutex / waiting for lock_word==0 / +// holding the write lock). Each writer accounts for itself: it is +// incremented at the very beginning of wrlock() and decremented at +// the very end of unwrlock()/cleanup(). +// Readers consult this field to implement writer-priority: if any +// writer is "in flight", new readers yield by waiting on it. +// +// * `writer_queue_mutex' (bthread_mutex_t): +// Serializes writers so that at most one writer races for `lock_word' +// at any time. Other writers queue up on this mutex. +// +// Wakeup channels: +// * Readers waiting on writers -> wait on writer_wait_count, woken by unwrlock/cleanup +// * Writers waiting on readers -> wait on lock_word, woken by unrdlock +// * Writers waiting on writers -> wait on writer_queue_mutex + +static int rwlock_rdlock(bthread_rwlock_t* rwlock, bool try_lock, + const struct timespec* abstime) { + auto lock_word = (butil::atomic*)rwlock->lock_word; + auto writer_wait_count = (butil::atomic*)rwlock->writer_wait_count; + + // Sampling state for the contention profiler (lazily armed on first + // contention so that the uncontended fast path stays cheap): + // start_ns == 0 -> not yet decided + // start_ns == -1 -> decided NOT to sample + // start_ns > 0 -> sampling armed; submit on exit + // Each reader samples independently and submits once on its own way out; + // we deliberately do NOT use rwlock->writer_csite here because that field + // is exclusively owned by the writer. + size_t sampling_range = bvar::INVALID_SAMPLING_RANGE; + int64_t start_ns = 0; + int rc = 0; -// Returns 0 if the lock was acquired, otherwise errno. -static inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) { while (true) { - int reader_count = ((butil::atomic*)&rwlock->reader_count) - ->load(butil::memory_order_relaxed); - if (reader_count < 0) { - // Failed to acquire the read lock because there is a writer. - return EBUSY; - } - if (((butil::atomic*)&rwlock->reader_count) - ->compare_exchange_weak(reader_count, reader_count + 1, - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - return 0; + // Writer-priority: if any writer is in flight, yield to it. + // `relaxed' is sufficient here because: + // - There is no data published via writer_wait_count; + // data visibility is established via the acquire-CAS on + // `lock_word' below paired with the release-CAS in unwrlock(). + // - butex_wait() will re-check the expected value before sleeping, + // so we cannot lose a wakeup even if `w' is slightly stale. + unsigned w = writer_wait_count->load(butil::memory_order_relaxed); + if (w > 0) { + if (try_lock) { + // Don't sample tryrdlock failures: they are by design a + // non-blocking probe, not a contention event. + return EBUSY; + } + // We are about to block on writer_wait_count; arm sampling + // before parking so the wait time is included in the report. + BTHREAD_RWLOCK_MAYBE_START_SAMPLING; + if (butex_wait(writer_wait_count, w, abstime) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + rc = errno; + break; + } + continue; } - } -} -static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) { - int reader_count = ((butil::atomic*)&rwlock->reader_count) - ->fetch_add(-1, butil::memory_order_relaxed) - 1; - // Fast path. - if (reader_count >= 0) { - return 0; + // No writer in flight: try to add ourselves to the reader count. + // 2^31 - 1 readers should be enough for any realistic workload. + unsigned l = lock_word->load(butil::memory_order_relaxed); + if ((l >> 31) == 0) { + // Refuse to increment when the reader count has saturated + // the low 31 bits. Otherwise `l + 1' would flip bit 31 and + // we would corrupt lock_word into "writer held" state. + // POSIX-style: report EAGAIN ("max read locks exceeded"). + if (BAIDU_UNLIKELY(l == 0x7FFFFFFFu)) { + LOG(ERROR) << "Too many readers on bthread_rwlock_t=" << rwlock; + rc = EAGAIN; + break; + } + // Acquire on success synchronizes-with the release-CAS in + // unwrlock(), so any data written by the previous writer is + // visible to us before we start reading. + if (lock_word->compare_exchange_weak(l, l + 1, + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + rc = 0; + break; + } + // CAS failed (likely another reader bumped r): retry. + } else if (try_lock) { + // Write lock is currently held. + return EBUSY; + } else { + // Write lock currently held but not yet self-accounted as a + // pending writer (very narrow window inside wrlock). Arm + // sampling now so the spin/wait until writer_wait_count >= 1 + // is also accounted for. + BTHREAD_RWLOCK_MAYBE_START_SAMPLING; + } + // Otherwise (write lock held but not try_lock): spin once more. + // The next iteration will observe writer_wait_count >= 1 (writers + // self-account in writer_wait_count for the entire wrlock lifetime), + // and we will block on it instead of busy spinning. } - // Slow path. - if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 == -RWLockMaxReaders)) { - CHECK(false) << "rwlock_unrdlock of unlocked rwlock"; - return EINVAL; - } + // Submit one contention sample for this reader (success or failure). + submit_contention_if_sampled(start_ns, sampling_range); + return rc; +} - // A writer is pending. - int reader_wait = ((butil::atomic*)&rwlock->reader_wait) - ->fetch_add(-1, butil::memory_order_relaxed) - 1; - if (reader_wait != 0) { +static int rwlock_unrdlock(bthread_rwlock_t* rwlock) { + auto lock_word = (butil::atomic*)rwlock->lock_word; + while (true) { + unsigned l = lock_word->load(butil::memory_order_relaxed); + // Misuse detection: the caller must currently hold a read lock. + // l == 0 -> no lock is held (double unlock?) + // (l >> 31) != 0 -> write lock is held, not read lock + if (l == 0 || (l >> 31) != 0) { + LOG(ERROR) << "Invalid unrdlock on bthread_rwlock_t=" << rwlock + << ", lock_word=" << l; + return EINVAL; + } + // Release on success publishes any reads/writes done while holding + // the read lock to the next acquirer (typically a writer's + // acquire-CAS in wrlock()). + if(!(lock_word->compare_exchange_weak(l, l - 1, + butil::memory_order_release, + butil::memory_order_relaxed))) { + continue; + } + // We were the last reader (lock_word transitioned 1 -> 0). Wake the + // single writer (if any) that may be sleeping on `lock_word' inside + // wrlock(). At most one writer can be there because writers are + // serialized by writer_queue_mutex. + // No-op if nobody is waiting; butex_wake() short-circuits cheaply. + if (l == 1) { + butex_wake(lock_word); + } return 0; } +} - // The last reader unblocks the writer. - - if (NULL == bthread::g_cp) { - bthread_sem_post(&rwlock->writer_sema); - return 0; +// Roll back the side effects of a failed wrlock attempt: +// - Release writer_queue_mutex if we managed to acquire it. +// - Decrement our share of writer_wait_count. +// - If we were the last in-flight writer, wake all readers that have +// been parked by writer-priority (w == 1 means writer_wait_count is now 0). +// Called on EBUSY (try_lock failed), ETIMEDOUT, EINTR-leading-to-fail. +static BUTIL_FORCE_INLINE void rwlock_wrlock_cleanup(bthread_rwlock_t* rwlock, bool write_queue_locked) { + if (write_queue_locked) { + bthread_mutex_unlock(&rwlock->writer_queue_mutex); } - // Ask Collector if this (contended) locking should be sampled. - const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl); - if (!sampling_range) { // Don't sample - bthread_sem_post(&rwlock->writer_sema); - return 0; + auto writer_wait_count = (butil::atomic*)rwlock->writer_wait_count; + // Withdraw our writer-priority "vote" so readers can make progress. + auto w = writer_wait_count->fetch_sub(1, butil::memory_order_relaxed); + // w is the value BEFORE the subtraction, so w == 1 means we were the + // last writer in flight; wake every reader parked on writer_wait_count. + if (w == 1) { + butex_wake_all(writer_wait_count); } - - // Sampling. - const int64_t start_ns = butil::cpuwide_time_ns(); - bthread_sem_post(&rwlock->writer_sema); - const int64_t end_ns = butil::cpuwide_time_ns(); - const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; - // Submit `csite' for each reader immediately after - // releasing rdlock to avoid the contention of `csite'. - bthread::submit_contention(csite, end_ns); - return 0; } -#define DO_CSITE_IF_NEED \ - do { \ - /* Don't sample when contention profiler is off. */ \ - if (NULL != bthread::g_cp) { \ - /* Ask Collector if this (contended) locking should be sampled. */ \ - sampling_range = bvar::is_collectable(&bthread::g_cp_sl); \ - start_ns = bvar::is_sampling_range_valid(sampling_range) ? \ - butil::cpuwide_time_ns() : -1; \ - } else { \ - start_ns = -1; \ - } \ - } while (0) - -#define SUBMIT_CSITE_IF_NEED \ - do { \ - if (ETIMEDOUT == rc && start_ns > 0) { \ - /* Failed to lock due to ETIMEDOUT, submit the elapse directly. */ \ - const int64_t end_ns = butil::cpuwide_time_ns(); \ - const bthread_contention_site_t csite{end_ns - start_ns, sampling_range}; \ - bthread::submit_contention(csite, end_ns); \ - } \ - } while (0) - -// For writing. -static inline int rwlock_wrlock_impl(bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime) { - // First, resolve competition with other writers. - int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex); +static int rwlock_wrlock(bthread_rwlock_t* rwlock, bool try_lock, + const struct timespec* abstime) { + auto writer_wait_count = (butil::atomic*)rwlock->writer_wait_count; + // Step 1: announce ourselves before doing anything else, so that + // concurrent readers immediately observe writer-priority and back off. + // This MUST happen before we try to acquire writer_queue_mutex, + // otherwise a flood of readers could starve us indefinitely. + // 2^31 in-flight writers should be enough for any realistic workload. + writer_wait_count->fetch_add(1, butil::memory_order_relaxed); + + // Sampling state for the contention profiler. Both wrlock() and + // unwrlock() sample independently: wrlock() submits its own wait time + // on the way out (success or failure); unwrlock() samples its own + // CAS-spin / mutex_unlock / butex_wake_all latency separately. We do + // NOT use rwlock->writer_csite here -- the two operations are not + // forced to share a single sample. size_t sampling_range = bvar::INVALID_SAMPLING_RANGE; - // -1: don't sample. - // 0: default value. - // > 0: Start time of sampling. int64_t start_ns = 0; - if (0 != rc) { - DO_CSITE_IF_NEED; - rc = bthread_mutex_timedlock(&rwlock->write_queue_mutex, abstime); + // Step 2: serialize with other writers. At most one writer holds + // `writer_queue_mutex' at a time and races for `lock_word'. + int rc = bthread_mutex_trylock(&rwlock->writer_queue_mutex); + if (0 != rc) { + if (try_lock) { + // Fail to acquire the wrlock. Don't sample trywrlock failures: + // they are by design a non-blocking probe, not a contention event. + rwlock_wrlock_cleanup(rwlock, false); + return rc; + } + // We are about to block on writer_queue_mutex; arm sampling. + // Note: the inner mutex itself has csite disabled (see init), so + // its blocking time is only counted once -- here, by the rwlock. + BTHREAD_RWLOCK_MAYBE_START_SAMPLING; + rc = bthread_mutex_timedlock(&rwlock->writer_queue_mutex, abstime); if (0 != rc) { - SUBMIT_CSITE_IF_NEED; + // Fail to acquire the wrlock. Submit the elapsed wait time + // directly (no unwrlock() will run for this writer). + submit_contention_if_sampled(start_ns, sampling_range); + rwlock_wrlock_cleanup(rwlock, false); return rc; } } - // Announce to readers there is a pending writer. - int reader_count = ((butil::atomic*)&rwlock->reader_count) - ->fetch_add(-RWLockMaxReaders, butil::memory_order_release); - // Wait for active readers. - if (reader_count != 0 && - ((butil::atomic*)&rwlock->reader_wait) - ->fetch_add(reader_count) + reader_count != 0) { - rc = bthread_sem_trywait(&rwlock->writer_sema); - if (0 != rc) { - if (0 == start_ns) { - DO_CSITE_IF_NEED; + // Step 3: with `writer_queue_mutex' held, wait for all readers to drain + // and then claim the write bit of `lock_word'. + auto lock_word = (butil::atomic*)rwlock->lock_word; + while (true) { + unsigned l = lock_word->load(butil::memory_order_relaxed); + if (l != 0) { + // Readers still hold the lock. Park on `lock_word' until the last + // reader releases (unrdlock will butex_wake on transition 1->0). + if (try_lock) { + errno = EBUSY; + break; } - - rc = bthread_sem_timedwait(&rwlock->writer_sema, abstime); - if (0 != rc) { - SUBMIT_CSITE_IF_NEED; - bthread_mutex_unlock(&rwlock->write_queue_mutex); - return rc; + // Arm sampling before parking so the wait-for-readers time is + // counted (in case the queue_mutex acquisition above was uncontended). + BTHREAD_RWLOCK_MAYBE_START_SAMPLING; + // Use the freshly read `r' as expected; if lock_word changes + // before we sleep, butex_wait returns EWOULDBLOCK and we retry. + if (butex_wait(lock_word, l, abstime) < 0 && + errno != EWOULDBLOCK && errno != EINTR) { + break; } + continue; } + // Acquire on success synchronizes-with release-CAS in + // unrdlock()/unwrlock(): we will see all data published by the + // previous reader/writer before we start writing. + if (lock_word->compare_exchange_weak(l, (unsigned)(1 << 31), + butil::memory_order_acquire, + butil::memory_order_relaxed)) { + // Submit the writer's wait sample immediately on success. + // unwrlock() will sample its own latency separately. + submit_contention_if_sampled(start_ns, sampling_range); + return 0; + } + // CAS may spuriously fail (weak); retry without sleeping. } - if (start_ns > 0) { - rwlock->writer_csite.duration_ns = butil::cpuwide_time_ns() - start_ns; - rwlock->writer_csite.sampling_range = sampling_range; - } - rwlock->wlock_flag = true; - return 0; -} -#undef DO_CSITE_IF_NEED -#undef SUBMIT_CSITE_IF_NEED -static inline int rwlock_wrlock(bthread_rwlock_t* rwlock) { - return rwlock_wrlock_impl(rwlock, NULL); + // Failure path: snapshot errno before cleanup, because + // bthread_mutex_unlock / butex_wake_all inside cleanup may invoke + // syscalls or yield and clobber errno on this thread. + int saved_errno = errno; + // Submit the elapsed wait directly; we never reached unwrlock(). + submit_contention_if_sampled(start_ns, sampling_range); + rwlock_wrlock_cleanup(rwlock, true); + return saved_errno; } -static inline int rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, - const struct timespec* __restrict abstime) { - return rwlock_wrlock_impl(rwlock, abstime); -} - -static inline int rwlock_trywrlock(bthread_rwlock_t* rwlock) { - int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex); - if (0 != rc) { - return rc; - } - - int expected = 0; - if (!((butil::atomic*)&rwlock->reader_count) - ->compare_exchange_strong(expected, -RWLockMaxReaders, - butil::memory_order_acquire, - butil::memory_order_relaxed)) { - // Failed to acquire the write lock because there are active readers. - bthread_mutex_unlock(&rwlock->write_queue_mutex); - return EBUSY; - } - rwlock->wlock_flag = true; - - return 0; -} +static int rwlock_unwrlock(bthread_rwlock_t* rwlock) { + auto lock_word = (butil::atomic*)rwlock->lock_word; + auto writer_wait_count = (butil::atomic*)rwlock->writer_wait_count; -static inline void rwlock_unwrlock_slow(bthread_rwlock_t* rwlock, int reader_count) { - bthread_sem_post_n(&rwlock->reader_sema, reader_count); - // Allow other writers to proceed. - bthread_mutex_unlock(&rwlock->write_queue_mutex); -} - -static inline int rwlock_unwrlock(bthread_rwlock_t* rwlock) { - rwlock->wlock_flag = false; + // Sampling state for the contention profiler. unwrlock() samples + // independently of wrlock(): although the release-CAS itself cannot + // fail due to writer-writer contention (writers are serialized by + // writer_queue_mutex), the body still does mutex_unlock(), + // butex_wake_all() and may spuriously spin on the weak CAS, all of + // which contribute to the critical-section tail latency. + size_t sampling_range = bvar::INVALID_SAMPLING_RANGE; + int64_t start_ns = 0; + BTHREAD_RWLOCK_MAYBE_START_SAMPLING; - // Announce to readers there is no active writer. - int reader_count = ((butil::atomic*)&rwlock->reader_count)->fetch_add( - RWLockMaxReaders, butil::memory_order_release) + RWLockMaxReaders; - if (BAIDU_UNLIKELY(reader_count >= RWLockMaxReaders)) { - CHECK(false) << "rwlock_unwlock of unlocked rwlock"; - return EINVAL; - } + while (true) { + unsigned l = lock_word->load(butil::memory_order_relaxed); + // Misuse detection: we must currently hold the write lock. + if (BAIDU_UNLIKELY(l != (unsigned)(1 << 31))) { + LOG(ERROR) << "Invalid unwrlock!"; + return EINVAL; + } + // Release-CAS publishes all writes performed under the write lock + // to the next acquirer (a reader's acquire-CAS or another writer's + // acquire-CAS). The CAS itself cannot fail due to contention since + // writers are serialized by writer_queue_mutex; weak failure here is + // only a spurious CAS failure -- just retry. + if (!lock_word->compare_exchange_weak(l, 0, + butil::memory_order_release, + butil::memory_order_relaxed)) { + continue; + } - bool is_valid = bthread::is_contention_site_valid(rwlock->writer_csite); - if (BAIDU_UNLIKELY(is_valid)) { - bthread_contention_site_t saved_csite = rwlock->writer_csite; - bthread::make_contention_site_invalid(&rwlock->writer_csite); + // ---- Order of the next two operations is INTENTIONAL ---- + // + // We deliberately: + // (1) unlock writer_queue_mutex FIRST, then + // (2) fetch_sub(writer_wait_count) and conditionally wake readers. + // + // Rationale (writer-priority semantics): + // * Any writer queued on writer_queue_mutex has already + // fetch_add'ed its share into writer_wait_count back in wrlock() + // (before it even tried to lock the mutex). So when it wakes + // up here and we later fetch_sub, the counter still reflects + // "there is at least one more writer in flight": w_old >= 2, + // which means w != 1, which means we will NOT wake readers. + // Readers must keep yielding to the next writer -- exactly the + // writer-priority invariant. + // * Only when we are truly the last writer in flight (w_old == 1 + // after our fetch_sub, i.e. writer_wait_count is now 0) do we + // wake_all readers parked on writer_wait_count. + // + // Subtle but harmless effect: + // Between (1) and (2) there is a small window in which our + // own "ghost share" is still counted in writer_wait_count even though + // we have effectively left. New readers entering rdlock() during + // this window will see writer_wait_count >= 1 and park on it; they + // will be woken either by step (2) below (if no successor writer + // appeared) or by the successor writer's eventual unwrlock. + // No wakeup is ever lost: butex_wait re-checks the expected + // value before truly sleeping, and any successor writer will + // itself execute this same wake logic on its way out. + // + // Reversing the order (fetch_sub before unlock mutex) would break + // strict writer-priority because woken readers could grab the + // read lock before a successor writer queued on the mutex even + // gets a chance to CAS lock_word. + bthread_mutex_unlock(&rwlock->writer_queue_mutex); + unsigned w = writer_wait_count->fetch_sub(1, butil::memory_order_relaxed); + if (w == 1) { + butex_wake_all(writer_wait_count); + } - const int64_t unlock_start_ns = butil::cpuwide_time_ns(); - rwlock_unwrlock_slow(rwlock, reader_count); - const int64_t unlock_end_ns = butil::cpuwide_time_ns(); - saved_csite.duration_ns += unlock_end_ns - unlock_start_ns; - bthread::submit_contention(saved_csite, unlock_end_ns); - } else { - rwlock_unwrlock_slow(rwlock, reader_count); + // Submit our own unwrlock-side sample (CAS spin + mutex_unlock + + // butex_wake_all). This is independent of the wrlock-side sample. + submit_contention_if_sampled(start_ns, sampling_range); + return 0; } - - return 0; } -static inline int rwlock_unlock(bthread_rwlock_t* rwlock) { - if (rwlock->wlock_flag) { +// Generic unlock entry that dispatches to unwrlock/unrdlock by inspecting +// `lock_word'. This is safe ONLY because the caller must already hold one of +// the two locks: while holding a read lock the high bit of `lock_word' cannot +// flip on, and while holding the write lock the low bits cannot be set. +// Therefore a relaxed load is sufficient to make the dispatch decision. +static int rwlock_unlock(bthread_rwlock_t* rwlock) { + auto lock_word = (butil::atomic*)rwlock->lock_word; + unsigned r = lock_word->load(butil::memory_order_relaxed); + if ((r >> 31) != 0) { return rwlock_unwrlock(rwlock); } else { return rwlock_unrdlock(rwlock); } } -} // namespace bthread - -__BEGIN_DECLS - -int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, - const bthread_rwlockattr_t* __restrict) { - int rc = bthread_sem_init(&rwlock->reader_sema, 0); - if (BAIDU_UNLIKELY(0 != rc)) { - return rc; +// Deleter that turns butex_create_checked()'s raw pointer into something +// std::unique_ptr can clean up automatically. Using RAII here lets the +// init-error paths just `return rc' without manually unwinding partial +// allocations; ownership is `release()'d only on the all-success path. +struct ButexDeleter { + void operator()(void* butex) const { + if (butex != NULL) { + butex_destroy(butex); + } } - bthread_sem_disable_csite(&rwlock->reader_sema); - rc = bthread_sem_init(&rwlock->writer_sema, 0); - if (BAIDU_UNLIKELY(0 != rc)) { - bthread_sem_destroy(&rwlock->reader_sema); - return rc; +}; + +static int rwlock_init(bthread_rwlock_t* rwlock) { + std::unique_ptr writer_wait_count( + butex_create_checked()); + if (writer_wait_count == NULL) { + LOG(ERROR) << "Fail to create writer_wait_count butex: out of memory"; + return ENOMEM; } - bthread_sem_disable_csite(&rwlock->writer_sema); - - rwlock->reader_count = 0; - rwlock->reader_wait = 0; - rwlock->wlock_flag = false; + std::unique_ptr lock_word(butex_create_checked()); + if (lock_word == NULL) { + LOG(ERROR) << "Fail to create lock_word butex: out of memory"; + return ENOMEM; + } + *writer_wait_count = 0; + *lock_word = 0; bthread_mutexattr_t attr; bthread_mutexattr_init(&attr); + BRPC_SCOPE_EXIT { bthread_mutexattr_destroy(&attr); }; + // Disable csite on the inner queue mutex so the writer's wait time is + // accounted exactly once -- by the rwlock layer, not double-counted via + // the inner mutex. bthread_mutexattr_disable_csite(&attr); - rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr); - if (BAIDU_UNLIKELY(0 != rc)) { - bthread_sem_destroy(&rwlock->reader_sema); - bthread_sem_destroy(&rwlock->writer_sema); + const int rc = bthread_mutex_init(&rwlock->writer_queue_mutex, &attr); + if (rc != 0) { + LOG(ERROR) << "Fail to init writer_queue_mutex, rc=" << rc; return rc; } - bthread_mutexattr_destroy(&attr); - - bthread::make_contention_site_invalid(&rwlock->writer_csite); + // All resources successfully created; transfer butex ownership to + // rwlock. From here on, bthread_rwlock_destroy() is responsible for + // releasing them. + rwlock->writer_wait_count = writer_wait_count.release(); + rwlock->lock_word = lock_word.release(); return 0; } +static int rwlock_destroy(bthread_rwlock_t* rwlock) { + // Destroy the inner mutex first; bthread_mutex_init() allocates an + // internal butex which would otherwise leak. Pointers are nulled to + // surface accidental double-destroy / use-after-destroy bugs early. + int rc = bthread_mutex_destroy(&rwlock->writer_queue_mutex); + if (rc != 0) { + LOG(ERROR) << "Fail to destroy writer_queue_mutex, rc=" << rc; + } + if (rwlock->writer_wait_count != NULL) { + butex_destroy(rwlock->writer_wait_count); + rwlock->writer_wait_count = NULL; + } + if (rwlock->lock_word != NULL) { + butex_destroy(rwlock->lock_word); + rwlock->lock_word = NULL; + } + return rc; +} + +} // namespace bthread + +__BEGIN_DECLS + +int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock, + const bthread_rwlockattr_t* __restrict) { + return bthread::rwlock_init(rwlock); +} + int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) { - bthread_sem_destroy(&rwlock->reader_sema); - bthread_sem_destroy(&rwlock->writer_sema); - bthread_mutex_destroy(&rwlock->write_queue_mutex); - return 0; + return bthread::rwlock_destroy(rwlock); } int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) { - return bthread::rwlock_rdlock(rwlock); + return bthread::rwlock_rdlock(rwlock, false, NULL); } int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) { - return bthread::rwlock_tryrdlock(rwlock); + return bthread::rwlock_rdlock(rwlock, true, NULL); } int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime) { - return bthread::rwlock_timedrdlock(rwlock, abstime); + return bthread::rwlock_rdlock(rwlock, false, abstime); } int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) { - return bthread::rwlock_wrlock(rwlock); + return bthread::rwlock_wrlock(rwlock, false, NULL); } int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) { - return bthread::rwlock_trywrlock(rwlock); + return bthread::rwlock_wrlock(rwlock, true, NULL); } int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock, const struct timespec* __restrict abstime) { - return bthread::rwlock_timedwrlock(rwlock, abstime); + return bthread::rwlock_wrlock(rwlock, false, abstime); } int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) { diff --git a/src/bthread/types.h b/src/bthread/types.h index 86148c938b..d46de1e835 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -225,16 +225,26 @@ typedef struct bthread_sem_t { typedef struct bthread_rwlock_t { #if defined(__cplusplus) bthread_rwlock_t() - : reader_count(0), reader_wait(0), wlock_flag(false), writer_csite{} {} + : writer_wait_count(0), lock_word(NULL) {} DISALLOW_COPY_AND_ASSIGN(bthread_rwlock_t); #endif - bthread_sem_t reader_sema; // Semaphore for readers to wait for completing writers. - bthread_sem_t writer_sema; // Semaphore for writers to wait for completing readers. - int reader_count; // Number of pending readers. - int reader_wait; // Number of departing readers. - bool wlock_flag; // Flag used to indicate that a write lock has been held. - bthread_mutex_t write_queue_mutex; // Held if there are pending writers. - bthread_contention_site_t writer_csite; + // Number of writers currently in flight (used as a butex): + // writers waiting on writer_queue_mutex, writers waiting for + // lock_word == 0, and the writer currently holding the write lock + // are all counted here. Each writer accounts for itself: incremented + // at the very beginning of wrlock() and decremented at the very end + // of unwrlock()/cleanup(). Readers consult this field to honor + // writer-priority: any non-zero value parks new readers. + unsigned* writer_wait_count; + // Serializes writers so that at most one writer at a time races for + // lock_word. Other writers queue up on this mutex. + bthread_mutex_t writer_queue_mutex; + // Bit-packed atomic lock word (used as a butex): + // bit 31 : 1 if the write lock is held, 0 otherwise. + // bit 0~30: number of readers currently holding the read lock. + // 0 : unlocked. + // The high bit and the low 31 bits are mutually exclusive. + unsigned* lock_word; } bthread_rwlock_t; typedef struct { diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp index 2da226cb2f..9a88051c1a 100644 --- a/test/bthread_rwlock_unittest.cpp +++ b/test/bthread_rwlock_unittest.cpp @@ -17,6 +17,7 @@ #include #include "gperftools_helper.h" +#include "butil/atomicops.h" #include namespace { @@ -286,6 +287,253 @@ TEST(RWLockTest, mix_thread_types) { ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); } +// Tests below verify the writer-priority semantics and the cleanup path +// guarded by the design notes in bthread/rwlock.cpp. +struct WriterPriorityArgs { + bthread_rwlock_t* rw; + butil::atomic* order; + int my_order; // sequence number captured inside the critical section + int hold_us; +}; + +void* wp_writer_fn(void* arg) { + auto* a = (WriterPriorityArgs*)arg; + EXPECT_EQ(0, bthread_rwlock_wrlock(a->rw)); + a->my_order = a->order->fetch_add(1, butil::memory_order_relaxed); + bthread_usleep(a->hold_us); + EXPECT_EQ(0, bthread_rwlock_unlock(a->rw)); + return NULL; +} + +void* wp_reader_fn(void* arg) { + auto* a = (WriterPriorityArgs*)arg; + EXPECT_EQ(0, bthread_rwlock_rdlock(a->rw)); + a->my_order = a->order->fetch_add(1, butil::memory_order_relaxed); + bthread_usleep(a->hold_us); + EXPECT_EQ(0, bthread_rwlock_unlock(a->rw)); + return NULL; +} + +// Verifies the writer-priority invariant guarded by the order +// "unlock writer_queue_mutex BEFORE fetch_sub(writer_wait_count)" in +// rwlock_unwrlock(): once a writer is queued, any new reader arriving +// later MUST yield to that writer. +TEST(RWLockTest, writer_priority) { + bthread_setconcurrency(8); + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + + // (1) Main thread holds the read lock first. + ASSERT_EQ(0, bthread_rwlock_rdlock(&rw)); + + butil::atomic order(0); + WriterPriorityArgs warg {&rw, &order, -1, 5000}; + WriterPriorityArgs r2arg {&rw, &order, -1, 0}; + + // (2) Start a writer; it should park inside wrlock() because the read + // lock is held. Sleep long enough for it to fetch_add into + // writer_wait_count and reach the butex_wait on `lock_word'. + bthread_t wth; + ASSERT_EQ(0, bthread_start_urgent(&wth, NULL, wp_writer_fn, &warg)); + bthread_usleep(50 * 1000); + + // (3) Now spawn a fresh reader. By writer-priority it MUST observe + // writer_wait_count > 0 and park on it (NOT join the active read + // lock). + bthread_t r2th; + ASSERT_EQ(0, bthread_start_urgent(&r2th, NULL, wp_reader_fn, &r2arg)); + bthread_usleep(50 * 1000); + + // (4) Release the original read lock. The writer should win the race + // and complete BEFORE the queued reader. + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + + bthread_join(wth, NULL); + bthread_join(r2th, NULL); + + EXPECT_GE(warg.my_order, 0); + EXPECT_GE(r2arg.my_order, 0); + EXPECT_LT(warg.my_order, r2arg.my_order) + << "Writer-priority violated: writer entered with order=" + << warg.my_order << " but late reader entered with order=" + << r2arg.my_order; + + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + +void* wp_timed_wrlock_short(void* arg) { + auto* rw = (bthread_rwlock_t*)arg; + timespec ts = butil::milliseconds_from_now(50); + EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedwrlock(rw, &ts)); + return NULL; +} + +// Verifies the cleanup path of rwlock_wrlock_cleanup(): after multiple +// writers fail with ETIMEDOUT, writer_wait_count must be back to 0 so +// that subsequent readers are not blocked by leftover "ghost shares". +TEST(RWLockTest, wrlock_failure_does_not_leak_writer_count) { + bthread_setconcurrency(8); + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + + // Hold the read lock so every wrlock attempt must block on `lock_word'. + ASSERT_EQ(0, bthread_rwlock_rdlock(&rw)); + + const int N = 8; + bthread_t wth[N]; + for (int i = 0; i < N; ++i) { + ASSERT_EQ(0, bthread_start_urgent(&wth[i], NULL, wp_timed_wrlock_short, &rw)); + } + // Wait for all timed wrlock attempts to time out and run cleanup. + for (int i = 0; i < N; ++i) { + bthread_join(wth[i], NULL); + } + + // Release the read lock; from this point on no writer is in flight, + // so a new reader MUST acquire the lock immediately. + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + + timespec ts = butil::milliseconds_from_now(500); + butil::Timer t; + t.start(); + ASSERT_EQ(0, bthread_rwlock_timedrdlock(&rw, &ts)); + t.stop(); + EXPECT_LT(t.m_elapsed(), 100) + << "Reader was blocked for " << t.m_elapsed() << "ms; " + << "writer_wait_count was likely leaked by the cleanup path."; + + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + +struct DataConsistencyArgs { + bthread_rwlock_t* rw; + int64_t* shared; // protected by rw + int64_t local_inc; // writer: number of increments this thread did + int64_t observed_max; // reader: max value observed + bool is_writer; +}; + +void* dc_worker(void* arg) { + auto* a = (DataConsistencyArgs*)arg; + while (!g_stopped) { + if (a->is_writer) { + EXPECT_EQ(0, bthread_rwlock_wrlock(a->rw)); + ++(*a->shared); + ++a->local_inc; + EXPECT_EQ(0, bthread_rwlock_unlock(a->rw)); + } else { + EXPECT_EQ(0, bthread_rwlock_rdlock(a->rw)); + int64_t v = *a->shared; + if (v > a->observed_max) { + a->observed_max = v; + } + EXPECT_EQ(0, bthread_rwlock_unlock(a->rw)); + } + } + return NULL; +} + +// Verifies the release/acquire memory ordering pair on `lock_word'. +// If the CAS in unwrlock()/unrdlock() weren't release-ordered, or the +// CAS in rdlock()/wrlock() weren't acquire-ordered, writes done inside +// the critical section could appear lost or inconsistent to other +// threads, causing the final counter to disagree with total writer ops. +TEST(RWLockTest, data_consistency) { + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + + g_stopped = false; + const int W = 4; + const int R = 8; + bthread_setconcurrency(W + R + 4); + + int64_t shared = 0; + std::vector args(W + R); + std::vector threads(W + R); + for (int i = 0; i < W + R; ++i) { + args[i].rw = &rw; + args[i].shared = &shared; + args[i].local_inc = 0; + args[i].observed_max = -1; + args[i].is_writer = (i < W); + ASSERT_EQ(0, bthread_start_urgent(&threads[i], NULL, dc_worker, &args[i])); + } + + bthread_usleep(500 * 1000); + g_stopped = true; + + int64_t total_inc = 0; + for (int i = 0; i < W + R; ++i) { + bthread_join(threads[i], NULL); + if (args[i].is_writer) { + total_inc += args[i].local_inc; + } + } + + // No lost updates: every writer's increment is reflected in `shared'. + EXPECT_EQ(total_inc, shared) + << "Lost updates: total writer ops=" << total_inc + << " but shared counter=" << shared; + // No reader saw a value greater than the final counter. + for (int i = W; i < W + R; ++i) { + EXPECT_LE(args[i].observed_max, shared) + << "Reader " << i << " observed_max=" << args[i].observed_max + << " > final shared=" << shared; + } + + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + +void* ws_reader_loop(void* arg) { + auto* rw = (bthread_rwlock_t*)arg; + while (!g_stopped) { + EXPECT_EQ(0, bthread_rwlock_rdlock(rw)); + // Hold the read lock briefly to keep the lock continuously busy. + bthread_usleep(100); + EXPECT_EQ(0, bthread_rwlock_unlock(rw)); + } + return NULL; +} + +// Verifies that under a continuous read load, a writer can still acquire +// the lock in bounded time. This is the end-to-end guarantee of the +// writer-priority strategy: any reader arriving AFTER the writer entered +// wrlock() must yield, ensuring the writer never starves. +TEST(RWLockTest, no_writer_starvation) { + bthread_rwlock_t rw; + ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL)); + + g_stopped = false; + const int R = 16; + bthread_setconcurrency(R + 4); + bthread_t rth[R]; + for (int i = 0; i < R; ++i) { + ASSERT_EQ(0, bthread_start_urgent(&rth[i], NULL, ws_reader_loop, &rw)); + } + + // Let the readers ramp up and saturate the lock. + bthread_usleep(50 * 1000); + + // A single writer must succeed within a generous budget. + butil::Timer t; + t.start(); + ASSERT_EQ(0, bthread_rwlock_wrlock(&rw)); + t.stop(); + + EXPECT_LT(t.m_elapsed(), 1000) + << "Writer starved for " << t.m_elapsed() << "ms under " + << R << " concurrent readers; writer-priority is broken."; + + ASSERT_EQ(0, bthread_rwlock_unlock(&rw)); + + g_stopped = true; + for (int i = 0; i < R; ++i) { + bthread_join(rth[i], NULL); + } + ASSERT_EQ(0, bthread_rwlock_destroy(&rw)); +} + struct BAIDU_CACHELINE_ALIGNMENT PerfArgs { bthread_rwlock_t* rw; int64_t counter; @@ -386,13 +634,14 @@ void PerfTest(uint32_t writer_ratio, ThreadId* /*dummy*/, int thread_num, << " writer_ratio=" << writer_ratio << " reader_num=" << reader_num << " read_count=" << read_count - << " read_average_time=" << (read_count == 0 ? 0 : read_wait_time / (double)read_count) + << " read_average_time=" << (read_count == 0 ? 0 : read_wait_time / (double)read_count) << "ns" << " writer_num=" << writer_num << " write_count=" << write_count - << " write_average_time=" << (write_count == 0 ? 0 : write_wait_time / (double)write_count); + << " write_average_time=" << (write_count == 0 ? 0 : write_wait_time / (double)write_count) << "ns"; } TEST(RWLockTest, performance) { + bthread_setconcurrency(16); const int thread_num = 12; PerfTest(0, (pthread_t*)NULL, thread_num, pthread_create, pthread_join); PerfTest(0, (bthread_t*)NULL, thread_num, bthread_start_background, bthread_join);