From 1690d26b0d4834c73ee028af2f56e53e1983e4b6 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Sat, 3 May 2025 19:53:32 +0200 Subject: [PATCH 1/6] sch: remove 'retry' from the scheduler The idea previously was that a worker thread can't leave the scheduler role until the scheduler is stopped or there are tasks to do. But actually nothing prevents from leaving the scheduler, entering it again, and then sleeping on waiting for tasks. It allows to simplify the state machine of the scheduler a bit. Gives a guarantee that the scheduler-role will consume the front signal only once per scheduling iteration. This guarantee in turn is going to be quite handy in the future potential API improvement for making it possible to run the scheduler in user's threads. Part of #40 --- src/mg/sch/TaskScheduler.cpp | 10 +++------- tla/TaskScheduler.tla | 2 +- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/mg/sch/TaskScheduler.cpp b/src/mg/sch/TaskScheduler.cpp index 934c835e..dbd73795 100644 --- a/src/mg/sch/TaskScheduler.cpp +++ b/src/mg/sch/TaskScheduler.cpp @@ -89,7 +89,6 @@ namespace sch { uint32_t maxBatch = mySchedBatchSize; TaskScheduleResult result = TASK_SCHEDULE_DONE; - retry: // ------------------------------------------------------- // Handle waiting tasks. They are older than the ones in // the front queue, so must be handled first. @@ -211,11 +210,9 @@ namespace sch { if (myQueueReady.Count() == 0 && myQueuePending.IsEmpty()) { - // No ready tasks means the other workers already - // sleep on ready-signal. Or are going to start - // sleeping any moment. So the sched can't quit. It - // must retry until either a front task appears, or - // one of the waiting tasks' deadline is expired. + // No ready tasks means the other workers already sleep on ready-signal. Or + // are going to start sleeping any moment. So the sched can't quit. It must + // try to wait until something new happens which would require processing. if (myQueueWaiting.Count() > 0) { deadline = myQueueWaiting.GetTop()->myDeadline; @@ -238,7 +235,6 @@ namespace sch { result = TASK_SCHEDULE_FINISHED; goto end; } - goto retry; } end: diff --git a/tla/TaskScheduler.tla b/tla/TaskScheduler.tla index e90e84a9..77d2275f 100644 --- a/tla/TaskScheduler.tla +++ b/tla/TaskScheduler.tla @@ -567,7 +567,7 @@ SchedWaitFrontSignal(wid) == /\ Assert(IsSchedulerTaken, "Is in scheduler") \* --- /\ IsFrontSignaled' = FALSE - /\ WorkerThreads' = ArrSetState(wid, "sched_check_waiting", WorkerThreads) + /\ WorkerThreads' = ArrSetState(wid, "sched_exit", WorkerThreads) /\ UNCHANGED<> \* Have tasks in the ready-queue. Exit the scheduler and go help other workers From 4c1ea838da0b8bdda46bfad5f00cb0fb00da4e7f Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Mon, 19 May 2025 23:21:40 +0200 Subject: [PATCH 2/6] box: introduce bitwise atomic operations Atomic "and" + "or" are going to be quite handy in the next commits. They could be implemented manually, but it is much easier to just take the STL version. Needed for #40 --- src/mg/box/Atomic.h | 52 +++++++++++++++++++++ test/box/UnitTestAtomic.cpp | 90 +++++++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+) diff --git a/src/mg/box/Atomic.h b/src/mg/box/Atomic.h index 5a8b6ff2..aa8db5f3 100644 --- a/src/mg/box/Atomic.h +++ b/src/mg/box/Atomic.h @@ -176,6 +176,58 @@ namespace box { T DecrementFetch() { return myValue.fetch_sub(1, std::memory_order_seq_cst) - 1; } + T FetchBitOrRelaxed( + const T& aValue) + { return myValue.fetch_or(aValue, std::memory_order_relaxed); } + T FetchBitOrAcquire( + const T& aValue) + { return myValue.fetch_or(aValue, std::memory_order_acquire); } + T FetchBitOrRelease( + const T& aValue) + { return myValue.fetch_or(aValue, std::memory_order_release); } + T FetchBitOr( + const T& aValue) + { return myValue.fetch_or(aValue, std::memory_order_seq_cst); } + + void BitOrRelaxed( + const T& aValue) + { FetchBitOrRelaxed(aValue); } + void BitOrAcquire( + const T& aValue) + { FetchBitOrAcquire(aValue); } + void BitOrRelease( + const T& aValue) + { FetchBitOrRelease(aValue); } + void BitOr( + const T& aValue) + { FetchBitOr(aValue); } + + T FetchBitAndRelaxed( + const T& aValue) + { return myValue.fetch_and(aValue, std::memory_order_relaxed); } + T FetchBitAndAcquire( + const T& aValue) + { return myValue.fetch_and(aValue, std::memory_order_acquire); } + T FetchBitAndRelease( + const T& aValue) + { return myValue.fetch_and(aValue, std::memory_order_release); } + T FetchBitAnd( + const T& aValue) + { return myValue.fetch_and(aValue, std::memory_order_seq_cst); } + + void BitAndRelaxed( + const T& aValue) + { FetchBitAndRelaxed(aValue); } + void BitAndAcquire( + const T& aValue) + { FetchBitAndAcquire(aValue); } + void BitAndRelease( + const T& aValue) + { FetchBitAndRelease(aValue); } + void BitAnd( + const T& aValue) + { FetchBitAnd(aValue); } + bool CmpExchgWeakRelaxed( T& aExpected, const T& aValue); diff --git a/test/box/UnitTestAtomic.cpp b/test/box/UnitTestAtomic.cpp index 6f02ba0f..47e04f58 100644 --- a/test/box/UnitTestAtomic.cpp +++ b/test/box/UnitTestAtomic.cpp @@ -2,6 +2,8 @@ #include "UnitTest.h" +#include + namespace mg { namespace unittests { namespace box { @@ -198,6 +200,90 @@ namespace box { TEST_CHECK(value.LoadRelaxed() == 119); } + static void + UnitTestAtomicFetchBitOr() + { + mg::box::AtomicU32 value; + using Func = std::function; + Func funcs[] = { + std::bind(&mg::box::AtomicU32::FetchBitOrRelaxed, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitOrAcquire, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitOrRelease, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitOr, &value, + std::placeholders::_1), + }; + for (const Func& f : funcs) + { + value.StoreRelaxed(0b0011); + TEST_CHECK(f(0b1000) == 0b0011); + TEST_CHECK(value.LoadRelaxed() == 0b1011); + } + } + + static void + UnitTestAtomicBitOr() + { + mg::box::AtomicU32 value; + using Func = std::function; + Func funcs[] = { + std::bind(&mg::box::AtomicU32::BitOrRelaxed, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitOrAcquire, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitOrRelease, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitOr, &value, std::placeholders::_1), + }; + for (const Func& f : funcs) + { + value.StoreRelaxed(0b0011); + f(0b1000); + TEST_CHECK(value.LoadRelaxed() == 0b1011); + } + } + + static void + UnitTestAtomicFetchBitAnd() + { + mg::box::AtomicU32 value; + using Func = std::function; + Func funcs[] = { + std::bind(&mg::box::AtomicU32::FetchBitAndRelaxed, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitAndAcquire, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitAndRelease, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitAnd, &value, + std::placeholders::_1), + }; + for (const Func& f : funcs) + { + value.StoreRelaxed(0b0011); + TEST_CHECK(f(0b1010) == 0b0011); + TEST_CHECK(value.LoadRelaxed() == 0b0010); + } + } + + static void + UnitTestAtomicBitAnd() + { + mg::box::AtomicU32 value; + using Func = std::function; + Func funcs[] = { + std::bind(&mg::box::AtomicU32::BitAndRelaxed, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitAndAcquire, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitAndRelease, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitAnd, &value, std::placeholders::_1), + }; + for (const Func& f : funcs) + { + value.StoreRelaxed(0b0011); + f(0b1010); + TEST_CHECK(value.LoadRelaxed() == 0b0010); + } + } + static void UnitTestAtomicCmpExchgWeak() { @@ -375,6 +461,10 @@ namespace box { UnitTestAtomicDecrement(); UnitTestAtomicFetchDecrement(); UnitTestAtomicDecrementFetch(); + UnitTestAtomicFetchBitOr(); + UnitTestAtomicBitOr(); + UnitTestAtomicFetchBitAnd(); + UnitTestAtomicBitAnd(); UnitTestAtomicCmpExchgWeak(); UnitTestAtomicCmpExchgStrong(); UnitTestAtomicBool(); From e15f91d572bff897dea47c95fbf117860bc9ce4b Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Mon, 19 May 2025 23:23:03 +0200 Subject: [PATCH 3/6] box: introduce InterruptibleMutex The new class is a special type of mutex which allows to take the lock, go to sleep on some arbitrary condition (a condition variable, a epoll_wait() call, a Signal.ReceiveBlocking(), or something similar), and then get woken up when other threads want to take the ownership. This is an abstraction of the TaskScheduler's usecase. The scheduler workers are going to be taking the lock and sleeping when have to wait for new tasks or for a deadline. And other threads would forcefully take the ownership when need to do some maintenance work like stop/start of the scheduler or getting something out of the scheduler-role context. Currently there isn't much of the second type of work, but this will change in scope of #40. Part of #40 --- src/mg/box/CMakeLists.txt | 1 + src/mg/box/InterruptibleMutex.cpp | 127 ++++++++ src/mg/box/InterruptibleMutex.h | 68 +++++ test/CMakeLists.txt | 1 + test/box/UnitTestInterruptibleMutex.cpp | 241 +++++++++++++++ test/main.cpp | 2 + tla/InterruptibleMutex.cfg | 18 ++ tla/InterruptibleMutex.tla | 383 ++++++++++++++++++++++++ 8 files changed, 841 insertions(+) create mode 100644 src/mg/box/InterruptibleMutex.cpp create mode 100644 src/mg/box/InterruptibleMutex.h create mode 100644 test/box/UnitTestInterruptibleMutex.cpp create mode 100644 tla/InterruptibleMutex.cfg create mode 100644 tla/InterruptibleMutex.tla diff --git a/src/mg/box/CMakeLists.txt b/src/mg/box/CMakeLists.txt index 41f7cab3..f202adce 100644 --- a/src/mg/box/CMakeLists.txt +++ b/src/mg/box/CMakeLists.txt @@ -5,6 +5,7 @@ set(mgbox_src ConditionVariable.cpp Coro.cpp Error.cpp + InterruptibleMutex.cpp IOVec.cpp Log.cpp MultiConsumerQueueBase.cpp diff --git a/src/mg/box/InterruptibleMutex.cpp b/src/mg/box/InterruptibleMutex.cpp new file mode 100644 index 00000000..434238b2 --- /dev/null +++ b/src/mg/box/InterruptibleMutex.cpp @@ -0,0 +1,127 @@ +#include "InterruptibleMutex.h" + +#include "mg/box/Thread.h" + +namespace mg { +namespace box { +namespace { + enum InterruptibleMutexFlags + { + INTERRUPTIBLE_MUTEX_TAKEN = 0x1, + INTERRUPTIBLE_MUTEX_HAS_WAITERS = 0x2, + }; +} + InterruptibleMutex::InterruptibleMutex() + : myState(0) + { + } + + InterruptibleMutex::~InterruptibleMutex() + { + MG_BOX_ASSERT(myState.LoadRelaxed() == 0); + } + + void + InterruptibleMutex::Lock( + const InterruptibleMutexWakeupCallback& aWakeup) + { + if (TryLock()) + return; + + // The mutex is owned by somebody. Try to take it over. + myMutex.Lock(); + // Pessimistically this thread might have to wait, so lets add the corresponding + // flag. Use 'acquire' to sync all the writes done by the mutex's current owner. + // In case the owner is already leaving. + uint8_t expected = myState.FetchBitOrAcquire( + INTERRUPTIBLE_MUTEX_HAS_WAITERS); + if ((expected & INTERRUPTIBLE_MUTEX_TAKEN) == 0) + { + // Could happen than between trying to lock and taking the internal mutex the + // original owner has already left. If it actually left the ownership + // entirely, it means there were no waiters. + MG_BOX_ASSERT(myWaiters.IsEmpty()); + // And no other thread could take the lock, because it would see the 'waiters' + // flag and would also try to take the internal mutex, which is owner by this + // thread atm. Hence it is safe to just take the ownership. + // 'Relaxed' should be enough, because acquire was already done just above. + expected = myState.ExchangeRelaxed(INTERRUPTIBLE_MUTEX_TAKEN); + // The waiters flag was set by this thread, but not anymore. New waiters, if + // any are waiting on the internal mutex, will just set this flag again if + // needed. + MG_BOX_ASSERT(expected == INTERRUPTIBLE_MUTEX_HAS_WAITERS); + myMutex.Unlock(); + return; + } + // Some other thread owns the lock and this thread has announced its presence via + // the waiters-flag. Now time to wait until the owner will hand the ownership + // over. + InterruptibleMutexWaiter self; + if (myWaiters.IsEmpty()) + { + // Only the first waiter wakes the owner up. Makes no sense to wake it + // multiple times. + aWakeup(); + } + myWaiters.Append(&self); + myMutex.Unlock(); + + // Just wait for the ownership to be handed over. + self.mySignal.ReceiveBlocking(); + // Use 'acquire' to sync all the writes done by the previous owner. + MG_BOX_ASSERT((myState.LoadAcquire() & INTERRUPTIBLE_MUTEX_TAKEN) != 0); + } + + bool + InterruptibleMutex::TryLock() + { + // It is important that the lock can't be taken if there are already waiters. No + // thread can take the ownership out of order. Otherwise this might cause + // starvation of the waiters. + // Use 'acquire' to sync all the writes done by the previous owner. + uint8_t expected = 0; + return myState.CmpExchgStrongAcquire(expected, INTERRUPTIBLE_MUTEX_TAKEN); + } + + void + InterruptibleMutex::Unlock() + { + // The happy-path is when there are no waiters. Then just unlock it and leave. + // Use 'release' to provide a sync point for the next owners to sync all the + // writes done by this thread. + uint8_t expected = INTERRUPTIBLE_MUTEX_TAKEN; + if (myState.CmpExchgStrongRelease(expected, 0)) + return; + + // So there were some waiters spotted. Try to hand the ownership directly to the + // first one. + MG_BOX_ASSERT(expected == + (INTERRUPTIBLE_MUTEX_TAKEN | INTERRUPTIBLE_MUTEX_HAS_WAITERS)); + + myMutex.Lock(); + MG_BOX_ASSERT(!myWaiters.IsEmpty()); + // It is important to firstly pop the waiter, and only then remove the + // waiters-flag (if there was only one waiter). Otherwise the following might + // have happened: + // - Thread-1 takes the lock. + // - Thread-2 becomes a waiter and adds the waiter-flag. + // - Thread-1 frees the lock and (wrongly) signals the thread-2 that the ownership + // is given. + // - Thread-2 wakes up and tries to free the lock too. + // - Thread-2 sees that the waiters-flag is set, but there are no waiters. Broken + // assumption. + InterruptibleMutexWaiter* first = myWaiters.PopFirst(); + // Even when nothing to do - still use 'release'. To give the next owner a way to + // sync all the writes done by this thread. + if (myWaiters.IsEmpty()) + expected = myState.FetchBitAndRelease(~INTERRUPTIBLE_MUTEX_HAS_WAITERS); + else + expected = myState.FetchBitAndRelease(-1); + MG_BOX_ASSERT((expected & INTERRUPTIBLE_MUTEX_TAKEN) != 0); + MG_BOX_ASSERT((expected & INTERRUPTIBLE_MUTEX_HAS_WAITERS) != 0); + first->mySignal.Send(); + myMutex.Unlock(); + } + +} +} diff --git a/src/mg/box/InterruptibleMutex.h b/src/mg/box/InterruptibleMutex.h new file mode 100644 index 00000000..48383183 --- /dev/null +++ b/src/mg/box/InterruptibleMutex.h @@ -0,0 +1,68 @@ +#pragma once + +#include "mg/box/DoublyList.h" +#include "mg/box/Signal.h" + +#include + +namespace mg { +namespace box { + + struct InterruptibleMutexWaiter + { + mg::box::Signal mySignal; + InterruptibleMutexWaiter* myNext; + InterruptibleMutexWaiter* myPrev; + }; + + using InterruptibleMutexWakeupCallback = std::function; + + // The interruptible mutex allows one thread to take the mutex, get blocked + // on something else while holding it, and the other threads can wake it up + // (interrupt) to forcefully take the mutex for some other work. + // + // The guarantees are the following: + // - There is no spin-locking when TryLock() is used. + // - Some threads use TryLock() + can sleep while holding the lock, + // then other threads via Lock() can take the ownership over to do some + // work given that they won't be sleeping on the same condition. + // + // An example: + // + // - Thread-workers (any number): + // if (TryLock()) + // { + // WaitOn(Signal); + // Unlock(); + // } + // + // - Thread-others (any number): + // Lock([](){ Signal.Send(); }); + // DoSomething(); // But don't sleep on the same Signal. + // Unlock(). + // + // Then the 'other' threads are guaranteed to get the lock. And nothing will + // ever deadlock. + // + class InterruptibleMutex + { + public: + InterruptibleMutex(); + ~InterruptibleMutex(); + + void Lock( + const InterruptibleMutexWakeupCallback& aWakeup); + bool TryLock(); + void Unlock(); + + private: + InterruptibleMutex( + const InterruptibleMutex&) = delete; + + mg::box::Mutex myMutex; + mg::box::DoublyList myWaiters; + mg::box::AtomicU8 myState; + }; + +} +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 57853010..146fb647 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable(test box/UnitTestDoublyList.cpp box/UnitTestError.cpp box/UnitTestForwardList.cpp + box/UnitTestInterruptibleMutex.cpp box/UnitTestIOVec.cpp box/UnitTestLog.cpp box/UnitTestMultiConsumerQueue.cpp diff --git a/test/box/UnitTestInterruptibleMutex.cpp b/test/box/UnitTestInterruptibleMutex.cpp new file mode 100644 index 00000000..74ab9317 --- /dev/null +++ b/test/box/UnitTestInterruptibleMutex.cpp @@ -0,0 +1,241 @@ +#include "mg/box/InterruptibleMutex.h" + +#include "mg/box/ThreadFunc.h" + +#include "UnitTest.h" + +namespace mg { +namespace unittests { +namespace box { + + static void + UnreachableWakeup() + { + TEST_CHECK(!"Reachable"); + } + + static void + UnitTestInterruptibleMutexBasic() + { + mg::box::InterruptibleMutex mutex; + mutex.Lock(UnreachableWakeup); + mutex.Unlock(); + mutex.Lock(UnreachableWakeup); + mutex.Unlock(); + } + + void + UnitTestInterruptibleMutexWakeupOne() + { + mg::box::InterruptibleMutex mutex; + mg::box::Signal signal; + // + // One thread takes the mutex and sleeps on something. + // + mg::box::Signal isBlockedOnSignal; + mg::box::ThreadFunc* otherThread = new mg::box::ThreadFunc("mgtst", + [&]() { + TEST_CHECK(mutex.TryLock()); + isBlockedOnSignal.Send(); + signal.ReceiveBlocking(); + mutex.Unlock(); + }); + otherThread->Start(); + isBlockedOnSignal.ReceiveBlocking(); + // + // Another thread wakes it up to take the mutex over. + // + mutex.Lock([&]() { signal.Send(); }); + delete otherThread; + mutex.Unlock(); + } + + void + UnitTestInterruptibleMutexWakeupTwo() + { + mg::box::InterruptibleMutex mutex; + mg::box::Signal mainSignal; + // + // One thread takes the mutex and does something, not yet waiting on the main + // signal. + // + mg::box::Signal isStarted; + mg::box::Signal getTheMainSignal; + mg::box::ThreadFunc* thread1 = new mg::box::ThreadFunc("mgtst1", + [&]() { + TEST_CHECK(mutex.TryLock()); + isStarted.Send(); + getTheMainSignal.ReceiveBlocking(); + mainSignal.ReceiveBlocking(); + mutex.Unlock(); + }); + thread1->Start(); + isStarted.ReceiveBlocking(); + // + // Another thread tries to enter the mutex, but can't atm. Even the wakeup won't + // help, because the first thread doesn't sleep on the main signal yet. + // + mg::box::ThreadFunc* thread2 = new mg::box::ThreadFunc("mgtst2", + [&]() { + mutex.Lock([&]() { + isStarted.Send(); + mainSignal.Send(); + }); + mutex.Unlock(); + }); + thread2->Start(); + isStarted.ReceiveBlocking(); + // + // Third thread enters eventually. + // + getTheMainSignal.Send(); + // No need to signal anything. The second thread must leave the lock + // without waiting on anything. + mutex.Lock([]() {}); + delete thread1; + delete thread2; + mutex.Unlock(); + } + + static void + UnitTestInterruptibleMutexStress() + { + mg::box::InterruptibleMutex mutex; + mg::box::Signal mainSignal; + mg::box::AtomicU32 counter(0); + constexpr uint32_t targetCount = 2000; + mg::box::InterruptibleMutexWakeupCallback wakeupCallback = [&]() { + mainSignal.Send(); + }; + + mg::box::ThreadFunc mainThread("mgtstmai", [&]() { + uint64_t yield = 0; + while (true) + { + if (!mutex.TryLock()) + { + if (++yield % 10000000 == 0) + mg::box::Sleep(1); + continue; + } + mainSignal.ReceiveBlocking(); + uint32_t old = counter.IncrementFetchRelaxed(); + mutex.Unlock(); + if (old >= targetCount) + break; + } + }); + + const uint32_t threadCount = 3; + std::vector threads; + threads.reserve(threadCount); + mg::box::AtomicBool isStopped(false); + for (uint32_t i = 0; i < threadCount; ++i) + { + threads.push_back(new mg::box::ThreadFunc("mgtst", [&]() { + uint64_t yield = 0; + while (!isStopped.LoadRelaxed()) + { + mutex.Lock(wakeupCallback); + mutex.Unlock(); + if (++yield % 1000 == 0) + mg::box::Sleep(1); + } + })); + } + for (mg::box::ThreadFunc* f : threads) + f->Start(); + mainThread.Start(); + uint64_t printDeadline = 0; + while (mainThread.IsRunning()) + { + uint64_t now = mg::box::GetMilliseconds(); + if (now >= printDeadline) + { + printDeadline = now + 1000; + uint32_t counterNow = counter.LoadRelaxed(); + int percent; + if (counterNow >= targetCount) + percent = 100; + else + percent = counterNow * 100 / targetCount; + Report("Progress: %d (= %u)", percent, counterNow); + } + mg::box::Sleep(10); + } + Report("Progress: 100"); + isStopped.StoreRelaxed(true); + for (mg::box::ThreadFunc* f : threads) + delete f; + } + + static void + UnitTestInterruptibleMutexStressMany() + { + mg::box::InterruptibleMutex mutex; + uint32_t counter = 0; + mg::box::AtomicBool isLocked(false); + mg::box::AtomicU32 trueCounter(0); + uint32_t wakeups = 0; + bool isInWakeup = false; + mg::box::InterruptibleMutexWakeupCallback wakeupCallback = [&]() { + TEST_CHECK(!isInWakeup); + isInWakeup = true; + ++wakeups; + TEST_CHECK(isInWakeup); + isInWakeup = false; + }; + + const uint32_t threadCount = 10; + std::vector threads; + threads.reserve(threadCount); + for (uint32_t i = 0; i < threadCount; ++i) + { + threads.push_back(new mg::box::ThreadFunc("mgtst", [&]() { + uint64_t deadline = mg::box::GetMilliseconds() + 2000; + uint64_t yield = 0; + uint32_t localCounter = 0; + while (mg::box::GetMilliseconds() < deadline) + { + ++localCounter; + + mutex.Lock(wakeupCallback); + TEST_CHECK(!isLocked.ExchangeRelaxed(true)); + counter++; + TEST_CHECK(isLocked.ExchangeRelaxed(false)); + mutex.Unlock(); + if (++yield % 1000 == 0) + mg::box::Sleep(1); + } + trueCounter.AddRelaxed(localCounter); + })); + } + for (mg::box::ThreadFunc* f : threads) + f->Start(); + for (mg::box::ThreadFunc* f : threads) + delete f; + // Flush the caches. Make sure all the operations done inside the mutex-lock are + // complete and visible in this thread. + TEST_CHECK(mutex.TryLock()); + mutex.Unlock(); + + TEST_CHECK(counter == trueCounter.LoadRelaxed()); + TEST_CHECK(wakeups <= counter); + Report("wakeups = %u", wakeups); + } + + void + UnitTestInterruptibleMutex() + { + TestSuiteGuard suite("InterruptibleMutex"); + + UnitTestInterruptibleMutexBasic(); + UnitTestInterruptibleMutexWakeupOne(); + UnitTestInterruptibleMutexWakeupTwo(); + UnitTestInterruptibleMutexStress(); + UnitTestInterruptibleMutexStressMany(); + } + +} +} +} diff --git a/test/main.cpp b/test/main.cpp index 488bbbef..85558bdf 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -42,6 +42,7 @@ namespace box { void UnitTestDoublyList(); void UnitTestError(); void UnitTestForwardList(); + void UnitTestInterruptibleMutex(); void UnitTestIOVec(); void UnitTestLog(); void UnitTestMultiConsumerQueue(); @@ -106,6 +107,7 @@ main( MG_RUN_TEST(box, UnitTestDoublyList); MG_RUN_TEST(box, UnitTestError); MG_RUN_TEST(box, UnitTestForwardList); + MG_RUN_TEST(box, UnitTestInterruptibleMutex); MG_RUN_TEST(box, UnitTestIOVec); MG_RUN_TEST(box, UnitTestLog); MG_RUN_TEST(box, UnitTestMultiConsumerQueue); diff --git a/tla/InterruptibleMutex.cfg b/tla/InterruptibleMutex.cfg new file mode 100644 index 00000000..6438550a --- /dev/null +++ b/tla/InterruptibleMutex.cfg @@ -0,0 +1,18 @@ +INVARIANT TotalInvariant + +CONSTANTS + +TargetCount = 5 +NULL = NULL + +wid1 = wid1 +wid2 = wid2 +wid3 = wid3 +WorkerThreadIDs = {wid1, wid2, wid3} +MainThreadID = wid1 + +SYMMETRY Perms + +SPECIFICATION Spec + +CHECK_DEADLOCK TRUE diff --git a/tla/InterruptibleMutex.tla b/tla/InterruptibleMutex.tla new file mode 100644 index 00000000..ba572125 --- /dev/null +++ b/tla/InterruptibleMutex.tla @@ -0,0 +1,383 @@ +-------------------------- MODULE InterruptibleMutex --------------------------- +\* +\* Interruptible mutex. It is a construct similar to how a normal mutex + a +\* condition variable are often used. Except that the interruptible mutex is +\* more generic. It abstracts the "condition variable" into something outside of +\* the mutex itself, and which can be "woken up" (like via a callback). +\* +\* The main usecase is the TaskScheduler, where the interruptible mutex is going +\* to protect the scheduler-role, while the scheduler is sleeping on some +\* condition variable / signal / epoll / whatever else. +\* +EXTENDS TLC, Integers, Sequences + +-------------------------------------------------------------------------------- +\* +\* Constant values. +\* + +CONSTANT WorkerThreadIDs +\* One thread is "main". It won't do blocking locks. The logic is that it +\* represents actually one or more such threads, that they try to take the lock +\* periodically, and when one of them does, it goes to sleep on a condition. +\* +\* Allowing all the threads to take the lock in a blocking way eventually leads +\* to a situation when the signal is empty, one thread sleeps on a condition, +\* and all the others are waiting. This could be fixed if "wakeup" would be done +\* by each new waiter, but now it is only done on the first waiter entering the +\* queue and for the use-case (TaskScheduler) this is enough. Because its +\* workers will never take a blocking lock + sleep on the tasks. +CONSTANT MainThreadID +CONSTANT NULL + +\* Worker threads are symmetrical. Meaning that changing their places in +\* different runs won't change the result. +Perms == Permutations(WorkerThreadIDs) + +-------------------------------------------------------------------------------- +\* +\* Variables. +\* + +VARIABLE MutexState +VARIABLE MutexOwnerID +\* The interruptible mutex's internals are protected with a regular mutex. It +\* helps to synchronize certain things in case of contention. But otherwise when +\* there is no contention, the internals aren't touched. +VARIABLE InternalOwnerID +\* A list of threads waiting for the current owner of the mutex to give them the +\* ownership. +VARIABLE Waiters +VARIABLE WorkerThreads +\* Some abstract condition on which a thread can "sleep" and by which it can be +\* "woken up". +VARIABLE IsConditionSignaled + +vars == <> + +-------------------------------------------------------------------------------- +\* +\* Helper functions. +\* + +\* In TLA+ there is no convenient way to change multiple fields of a struct, +\* especially if it is in an array of other structs. Functions here help to make +\* it look sane. For that the order of arguments is inverted - the object to +\* change goes last. Thanks to that, multiple mutations can be written as +\* +\* SetField(key1, val1, +\* SetField(key2, val2, +\* SetField(key3, val3, +\* Object))) +\* +\* Almost like normal assignment. This is additionally simplified by the fact +\* that there are no struct types. The same setter can be used for any struct, +\* only member name matters. + +SetState(v, b) == [b EXCEPT !.state = v] +SetIsTaken(v, b) == [b EXCEPT !.is_taken = v] +SetHasWaiters(v, b) == [b EXCEPT !.has_waiters = v] +SetOldMutexState(v, b) == [b EXCEPT !.old_mutex_state = v] + +\* The same but for struct arrays. + +ArrLen(s) == Len(s) +ArrFirst(s) == Head(s) +ArrIsEmpty(s) == Len(s) = 0 +ArrPopHead(s) == Tail(s) +ArrSet(i, v, s) == [s EXCEPT ![i] = v] +ArrAppend(v, s) == Append(s, v) +ArrSetState(i, v, s) == ArrSet(i, SetState(v, s[i]), s) +ArrSetOldMutexState(i, v, s) == ArrSet(i, SetOldMutexState(v, s[i]), s) + +\* Constructors. + +\* It is basically a few flags which can be checked and set atomically. An +\* std::atomic_uint8 in C++. +MutexStateNew == [ + \* The mutex is owned by some thread right now. + is_taken |-> FALSE, + \* The mutex has at least one pending waiter. It means that when unlock + \* happens, the ownership must be transferred directly to the first waiter. + \* And that new attempts to try-lock the mutex must fail. + has_waiters |-> FALSE +] + +WorkerThreadNew == [ + state |-> "idle", + \* An "on stack" variable for some cases where it couldn't be used in a single + \* atomic step. + old_mutex_state |-> MutexStateNew +] + +Init == + /\ MutexState = MutexStateNew + /\ MutexOwnerID = NULL + /\ InternalOwnerID = NULL + /\ Waiters = << >> + /\ WorkerThreads = [wid \in WorkerThreadIDs |-> WorkerThreadNew] + /\ IsConditionSignaled = FALSE + +\* In the steps below the splitting is done by actions which are visible to +\* other participants. For example, blocking lock firstly takes the internal +\* mutex, and then does changes to the interruptible mutex state. Both those +\* steps are observable in other threads. Hence done as separate steps in TLA. +\* +\* Actions, which do not change globally visible data or are atomic, can be done +\* in one step. For example, an atomic increment can be done as a single step. +\* Without spitting read and write. + +-------------------------------------------------------------------------------- +\* Locking. + +\* Try to take a lock the non-blocking way. +ThreadTryLock(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "idle" + \* Is only possible when completely free. Note that even if the mutex isn't + \* taken, but has waiters, it is still not possible to just steal it with + \* try-lock at this point. Must either wait until all waiters are gone, or + \* must become one of them. + /\ ~MutexState.is_taken /\ ~MutexState.has_waiters + \* --- + /\ WorkerThreads' = ArrSetState(wid, "lock_done", WorkerThreads) + /\ MutexState' = SetIsTaken(TRUE, MutexState) + /\ UNCHANGED<> + +\* Blocking lock. +ThreadBlockingLock(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "idle" + \* Main thread can't do the blocking locks. Because if it would, it might + \* lead to a deadlock when the main thread becomes a waiter, then another + \* waiter comes, then the main thread takes the lock and goes to sleep. Then + \* nobody would call the wakeup, because the waiter list wasn't empty when + \* non-main threads stood into it. This is fixable, but isn't not worth + \* complicating the algorithm and doesn't block the mutex's main usecase in + \* TaskScheduler. + /\ wid # MainThreadID + \* Locking always tries to take the lock the fast-way. Can only do the + \* blocking lock when the fast path didn't work. + /\ MutexState.is_taken \/ MutexState.has_waiters + \* The slow path (blocking lock) requires to take the internal lock first. + /\ InternalOwnerID = NULL + \* --- + /\ InternalOwnerID' = wid + /\ WorkerThreads' = ArrSetState(wid, "lock_start", WorkerThreads) + /\ UNCHANGED<> + +ThreadLockStart(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_start" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + \* Add the waiters-flag so if the current lock-owner leaves now, it would know + \* there are waiters and would have to transfer the ownership, not just drop + \* it. + /\ MutexState' = SetHasWaiters(TRUE, MutexState) + /\ WorkerThreads' = ArrSetState(wid, "lock_check_old_state", + ArrSetOldMutexState(wid, MutexState, + WorkerThreads)) + /\ UNCHANGED<> + +ThreadLockCheckOldState(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_check_old_state" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + /\ IF ~w.old_mutex_state.is_taken THEN + \* Could happen that the owner dropped the lock before this thread managed + \* to set the waiters-flag. Then the lock is free for taking right away. + /\ Assert(MutexState.has_waiters, "has waiters") + /\ Assert(~MutexState.is_taken, "not taken") + /\ MutexState' = SetIsTaken(TRUE, MutexStateNew) + /\ WorkerThreads' = ArrSetState(wid, "lock_exit_without_waiting", + WorkerThreads) + ELSE + \* The owner was still there when the waiters-flag was added. Have to wait + \* then. + /\ WorkerThreads' = ArrSetState(wid, "lock_stand_into_queue", + WorkerThreads) + /\ UNCHANGED<> + /\ UNCHANGED<> + +ThreadLockExitWithoutWaiting(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_exit_without_waiting" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + /\ InternalOwnerID' = NULL + /\ WorkerThreads' = ArrSetState(wid, "lock_done", WorkerThreads) + /\ UNCHANGED<> + +ThreadLockStandIntoQueue(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_stand_into_queue" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + /\ IF ArrIsEmpty(Waiters) THEN + \* Wake the owner up only once. When the first waiter enters the queue. It + \* won't wakeup faster if other threads signal it more than once anyway. + /\ IsConditionSignaled' = TRUE + ELSE + /\ UNCHANGED<> + /\ Waiters' = ArrAppend(wid, Waiters) + /\ WorkerThreads' = ArrSetState(wid, "lock_stood_into_queue", WorkerThreads) + /\ UNCHANGED<> + +ThreadLockStoodIntoQueue(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_stood_into_queue" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + /\ InternalOwnerID' = NULL + /\ WorkerThreads' = ArrSetState(wid, "lock_wait", WorkerThreads) + /\ UNCHANGED<> + +ThreadLockTake(wid) == + /\ \/ ThreadTryLock(wid) + \/ ThreadBlockingLock(wid) + \/ ThreadLockStart(wid) + \/ ThreadLockCheckOldState(wid) + \/ ThreadLockExitWithoutWaiting(wid) + \/ ThreadLockStandIntoQueue(wid) + \/ ThreadLockStoodIntoQueue(wid) + +-------------------------------------------------------------------------------- +\* The lock is taken. Do stuff while holding the lock. + +ThreadLockedSleep(wid) == + \* Only the main thread is allowed to sleep while holding the lock. All the + \* other threads must do their stuff and unlock quickly. + /\ wid = MainThreadID + /\ WorkerThreads' = ArrSetState(wid, "locked_sleep", WorkerThreads) + +ThreadLockedLeave(wid) == + /\ WorkerThreads' = ArrSetState(wid, "unlock_start", WorkerThreads) + +ThreadLockUse(wid) == + LET w == WorkerThreads[wid] IN + /\ \/ /\ w.state = "lock_done" + /\ Assert(MutexOwnerID = NULL, "no owner") + /\ MutexOwnerID' = wid + /\ \/ ThreadLockedSleep(wid) + \/ ThreadLockedLeave(wid) + /\ UNCHANGED<> + \/ /\ w.state = "locked_sleep" + /\ Assert(MutexOwnerID = wid, "this is owner") + \* Wait for a wakeup. Only then can consume it and proceed. + /\ IsConditionSignaled + /\ IsConditionSignaled' = FALSE + /\ WorkerThreads' = ArrSetState(wid, "unlock_start", WorkerThreads) + /\ UNCHANGED<> + /\ Assert(MutexState.is_taken, "state is taken") + /\ UNCHANGED<> + +-------------------------------------------------------------------------------- +\* Unlocking steps. + +ThreadUnlockStart(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_start" + \* --- + /\ Assert(MutexOwnerID = wid, "this is owner") + /\ Assert(MutexState.is_taken, "state is taken") + /\ MutexOwnerID' = NULL + /\ IF MutexState = SetIsTaken(TRUE, MutexStateNew) THEN + \* Taken, no waiters. Then just unlock and leave. + /\ MutexState' = MutexStateNew + /\ WorkerThreads' = ArrSetState(wid, "idle", WorkerThreads) + ELSE + \* There is a waiter. Need to transfer the ownership then. + /\ Assert(MutexState.is_taken, "is taken") + /\ Assert(MutexState.has_waiters, "has waiters") + /\ WorkerThreads' = ArrSetState(wid, "unlock_take_internal", WorkerThreads) + /\ UNCHANGED<> + /\ UNCHANGED<> + +ThreadUnlockTakeInternal(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_take_internal" + /\ InternalOwnerID = NULL + /\ Assert(MutexState.is_taken, "state is taken") + \* --- + /\ InternalOwnerID' = wid + /\ WorkerThreads' = ArrSetState(wid, "unlock_check_waiters_flag", + WorkerThreads) + /\ UNCHANGED<> + +ThreadUnlockCheckWaitersFlag(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_check_waiters_flag" + /\ Assert(InternalOwnerID = wid, "is internally locked") + /\ Assert(MutexState.is_taken, "state is taken") + /\ Assert(~ArrIsEmpty(Waiters), "has waiters") + \* --- + \* Remove the waiters-flag if there is just one waiter. It is going to be + \* popped now. + /\ IF ArrLen(Waiters) = 1 THEN + /\ MutexState' = SetHasWaiters(FALSE, MutexState) + ELSE + /\ UNCHANGED<> + /\ WorkerThreads' = ArrSetState(wid, "unlock_wakeup_waiter", WorkerThreads) + /\ UNCHANGED<> + +ThreadUnlockWakeupWaiter(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_wakeup_waiter" + /\ Assert(InternalOwnerID = wid, "is internally locked") + /\ Assert(MutexState.is_taken, "state is taken") + /\ Assert(~ArrIsEmpty(Waiters), "has waiters") + \* --- + /\ Waiters' = ArrPopHead(Waiters) + /\ WorkerThreads' = ArrSetState(wid, "unlock_end", + ArrSetState(ArrFirst(Waiters), "lock_done", + WorkerThreads)) + /\ UNCHANGED<> + +ThreadUnlockEnd(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_end" + /\ Assert(InternalOwnerID = wid, "is internally locked") + \* --- + /\ InternalOwnerID' = NULL + /\ WorkerThreads' = ArrSetState(wid, "idle", WorkerThreads) + /\ UNCHANGED<> + +ThreadUnlock(wid) == + /\ \/ ThreadUnlockStart(wid) + \/ ThreadUnlockTakeInternal(wid) + \/ ThreadUnlockCheckWaitersFlag(wid) + \/ ThreadUnlockWakeupWaiter(wid) + \/ ThreadUnlockEnd(wid) + /\ UNCHANGED<> + +-------------------------------------------------------------------------------- +\* +\* Actions. +\* + +Next == + \/ \E wid \in WorkerThreadIDs: + \/ ThreadLockTake(wid) + \/ ThreadLockUse(wid) + \/ ThreadUnlock(wid) + +-------------------------------------------------------------------------------- +\* +\* Invariants. +\* + +\* All the invariants are checked via the assertions. +TotalInvariant == TRUE + +Spec == + /\ Init + /\ [][Next]_vars + /\ \A i \in WorkerThreadIDs: + /\ SF_vars(ThreadLockTake(i)) + /\ WF_vars(ThreadLockUse(i)) + /\ SF_vars(ThreadUnlock(i)) + +================================================================================ From b3b6e28992c5fa34e53c24b21469015a599e8e1d Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Mon, 19 May 2025 23:38:12 +0200 Subject: [PATCH 4/6] sch: use InterruptibleMutex in it It replaces the previously used atomic flag and does basically the same at this point. But in the future the lock's usage is going to be extended to do more stuff in the scheduler. Part of #40 --- src/mg/box/CMakeLists.txt | 1 + src/mg/sch/TaskScheduler.cpp | 19 +++++++++++++++---- src/mg/sch/TaskScheduler.h | 5 ++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/mg/box/CMakeLists.txt b/src/mg/box/CMakeLists.txt index f202adce..b8779f8b 100644 --- a/src/mg/box/CMakeLists.txt +++ b/src/mg/box/CMakeLists.txt @@ -51,6 +51,7 @@ set(install_headers DoublyList.h Error.h ForwardList.h + InterruptibleMutex.h IOVec.h Log.h MultiConsumerQueue.h diff --git a/src/mg/sch/TaskScheduler.cpp b/src/mg/sch/TaskScheduler.cpp index dbd73795..decf577d 100644 --- a/src/mg/sch/TaskScheduler.cpp +++ b/src/mg/sch/TaskScheduler.cpp @@ -15,7 +15,6 @@ namespace sch { : myQueueReady(aSubQueueSize) , myExecBatchSize(aSubQueueSize) , mySchedBatchSize(aSubQueueSize * aThreadCount) - , myIsSchedulerWorking(0) , myIsStopped(0) { myThreads.resize(aThreadCount); @@ -68,10 +67,16 @@ namespace sch { mySignalFront.Send(); } + inline bool + TaskScheduler::PrivSchedulerTryLock() + { + return mySchedulerMutex.TryLock(); + } + TaskScheduleResult TaskScheduler::PrivSchedule() { - if (myIsSchedulerWorking.ExchangeAcqRel(true)) + if (!PrivSchedulerTryLock()) return TASK_SCHEDULE_BUSY; // Task status operations can all be relaxed inside the @@ -238,7 +243,14 @@ namespace sch { } end: - myIsSchedulerWorking.StoreRelease(false); + PrivSchedulerUnlock(); + return result; + } + + inline void + TaskScheduler::PrivSchedulerUnlock() + { + mySchedulerMutex.Unlock(); // The signal is absolutely vital to have exactly here. // If the signal would not be emitted here, all the // workers could block on ready tasks in their loops. @@ -268,7 +280,6 @@ namespace sch { // and other workers will sleep on waiting for ready // tasks. PrivSignalReady(); - return result; } bool diff --git a/src/mg/sch/TaskScheduler.h b/src/mg/sch/TaskScheduler.h index 8c7935e0..3206b6bd 100644 --- a/src/mg/sch/TaskScheduler.h +++ b/src/mg/sch/TaskScheduler.h @@ -2,6 +2,7 @@ #include "mg/box/BinaryHeap.h" #include "mg/box/ForwardList.h" +#include "mg/box/InterruptibleMutex.h" #include "mg/box/MultiConsumerQueue.h" #include "mg/box/MultiProducerQueueIntrusive.h" #include "mg/box/Signal.h" @@ -120,7 +121,9 @@ namespace sch { void PrivPost( Task* aTask); + bool PrivSchedulerTryLock(); TaskScheduleResult PrivSchedule(); + void PrivSchedulerUnlock(); bool PrivExecute( Task* aTask); @@ -172,7 +175,7 @@ namespace sch { // // The worker thread, doing the scheduling right now, is // called 'sched-thread' throughout the code. - mg::box::AtomicBool myIsSchedulerWorking; + mg::box::InterruptibleMutex mySchedulerMutex; mg::box::AtomicBool myIsStopped; static thread_local TaskScheduler* ourCurrent; From 87fbfa55983701d672d5dc2375f9c88e6006596f Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Tue, 20 May 2025 00:23:23 +0200 Subject: [PATCH 5/6] sch: introduce Start/IsEmpty/WaitEmpty/Stop() Those methods repeat the ones from IOCore. It is the next step towards allowing the user to run the scheduler in user-threads. Part of #40 --- .../BenchTaskSchedulerTemplate.hpp | 3 +- .../BenchTaskSchedulerTrivial.cpp | 22 ++- examples/iocore_03_pipeline/main.cpp | 2 +- examples/scheduler_01_simple_task/main.cpp | 2 +- examples/scheduler_02_coroutine_task/main.cpp | 2 +- examples/scheduler_03_multistep_task/main.cpp | 2 +- .../scheduler_04_interacting_tasks/main.cpp | 2 +- src/mg/sch/TaskScheduler.cpp | 150 +++++++++++++----- src/mg/sch/TaskScheduler.h | 50 +++--- test/sch/UnitTestTaskScheduler.cpp | 94 ++++++++--- 10 files changed, 228 insertions(+), 101 deletions(-) diff --git a/bench/taskscheduler/BenchTaskSchedulerTemplate.hpp b/bench/taskscheduler/BenchTaskSchedulerTemplate.hpp index 3bc06639..ad50a4ba 100644 --- a/bench/taskscheduler/BenchTaskSchedulerTemplate.hpp +++ b/bench/taskscheduler/BenchTaskSchedulerTemplate.hpp @@ -365,7 +365,8 @@ namespace bench { uint32_t aTaskCount, uint32_t aExecuteCount) { - TaskScheduler sched("bench", aThreadCount, 5000); + TaskScheduler sched("bench", 5000); + sched.Start(aThreadCount); sched.Reserve(aTaskCount); BenchTaskCtl ctl(aTaskCount, aExecuteCount, &sched); ctl.Warmup(); diff --git a/bench/taskscheduler/BenchTaskSchedulerTrivial.cpp b/bench/taskscheduler/BenchTaskSchedulerTrivial.cpp index 7792da10..ddab5b13 100644 --- a/bench/taskscheduler/BenchTaskSchedulerTrivial.cpp +++ b/bench/taskscheduler/BenchTaskSchedulerTrivial.cpp @@ -57,11 +57,13 @@ namespace bench { public: TaskScheduler( const char* aName, - uint32_t aThreadCount, uint32_t aSubQueueSize); ~TaskScheduler(); + void Start( + uint32_t aThreadCount); + void Post( Task* aTask); @@ -85,6 +87,7 @@ namespace bench { bool myIsStopped; TaskList myQueue; std::vector myWorkers; + const std::string myName; friend TaskSchedulerThread; }; @@ -146,13 +149,10 @@ namespace bench { TaskScheduler::TaskScheduler( const char* aName, - uint32_t aThreadCount, uint32_t /*aSubQueueSize*/) - : myIsStopped(false) + : myIsStopped(true) + , myName(aName) { - myWorkers.resize(aThreadCount); - for (TaskSchedulerThread*& w : myWorkers) - w = new TaskSchedulerThread(aName, this); } TaskScheduler::~TaskScheduler() @@ -165,6 +165,16 @@ namespace bench { delete w; } + void + TaskScheduler::Start( + uint32_t aThreadCount) + { + myIsStopped = false; + myWorkers.resize(aThreadCount); + for (TaskSchedulerThread*& w : myWorkers) + w = new TaskSchedulerThread(myName.c_str(), this); + } + void TaskScheduler::Post( Task* aTask) diff --git a/examples/iocore_03_pipeline/main.cpp b/examples/iocore_03_pipeline/main.cpp index 07828db8..65bd0b84 100644 --- a/examples/iocore_03_pipeline/main.cpp +++ b/examples/iocore_03_pipeline/main.cpp @@ -418,9 +418,9 @@ ServeRequests( // the requests, create MyRequest for each, and submit them to the scheduler. But in // this case it is simplified to just a couple of hardcoded MyRequests. mg::sch::TaskScheduler scheduler("tst", - 1, // Thread count. 5 // Subqueue size. ); + scheduler.Start(1); MG_LOG_INFO("ServeRequests", "got a couple of complex requests"); new MyRequest(1, aClient, scheduler); new MyRequest(2, aClient, scheduler); diff --git a/examples/scheduler_01_simple_task/main.cpp b/examples/scheduler_01_simple_task/main.cpp index 964601fe..0d2b58e1 100644 --- a/examples/scheduler_01_simple_task/main.cpp +++ b/examples/scheduler_01_simple_task/main.cpp @@ -11,9 +11,9 @@ int main() { mg::sch::TaskScheduler sched("tst", - 1, // Thread count. 5 // Subqueue size. ); + sched.Start(1); sched.Post(new mg::sch::Task([&](mg::sch::Task *self) { std::cout << "Executed in scheduler!\n"; delete self; diff --git a/examples/scheduler_02_coroutine_task/main.cpp b/examples/scheduler_02_coroutine_task/main.cpp index 5df2a508..c26ac1a0 100644 --- a/examples/scheduler_02_coroutine_task/main.cpp +++ b/examples/scheduler_02_coroutine_task/main.cpp @@ -48,9 +48,9 @@ main() // Normally one would allocate tasks on the heap and make them delete themselves when // they are finished. mg::sch::TaskScheduler scheduler("tst", - 1, // Thread count. 5 // Subqueue size. ); + scheduler.Start(1); scheduler.Post(&task); return 0; } diff --git a/examples/scheduler_03_multistep_task/main.cpp b/examples/scheduler_03_multistep_task/main.cpp index c0e6e80d..09b12e50 100644 --- a/examples/scheduler_03_multistep_task/main.cpp +++ b/examples/scheduler_03_multistep_task/main.cpp @@ -80,9 +80,9 @@ main() // Normally one would allocate tasks on the heap and make them delete themselves when // they are finished. mg::sch::TaskScheduler scheduler("tst", - 1, // Thread count. 5 // Subqueue size. ); + scheduler.Start(1); scheduler.Post(&task); return 0; } diff --git a/examples/scheduler_04_interacting_tasks/main.cpp b/examples/scheduler_04_interacting_tasks/main.cpp index ab48ec57..25bf3363 100644 --- a/examples/scheduler_04_interacting_tasks/main.cpp +++ b/examples/scheduler_04_interacting_tasks/main.cpp @@ -63,9 +63,9 @@ main() // Normally one would allocate tasks on the heap and make them delete themselves when // they are finished. mg::sch::TaskScheduler scheduler("tst", - 1, // Thread count. 5 // Subqueue size. ); + scheduler.Start(1); task.SetCallback([]( mg::sch::Task& aSelf) -> mg::box::Coro { diff --git a/src/mg/sch/TaskScheduler.cpp b/src/mg/sch/TaskScheduler.cpp index decf577d..e19518d4 100644 --- a/src/mg/sch/TaskScheduler.cpp +++ b/src/mg/sch/TaskScheduler.cpp @@ -10,36 +10,102 @@ namespace sch { TaskScheduler::TaskScheduler( const char* aName, - uint32_t aThreadCount, uint32_t aSubQueueSize) - : myQueueReady(aSubQueueSize) - , myExecBatchSize(aSubQueueSize) - , mySchedBatchSize(aSubQueueSize * aThreadCount) - , myIsStopped(0) + : myExecBatchSize(aSubQueueSize) + , mySchedBatchSize(myExecBatchSize) + , myQueueReady(aSubQueueSize) + , myName(aName) { + } + + TaskScheduler::~TaskScheduler() + { + MG_BOX_ASSERT(WaitEmpty()); + Stop(); + MG_BOX_ASSERT(myThreads.empty()); + MG_BOX_ASSERT(myQueuePending.IsEmpty()); + MG_BOX_ASSERT(myQueueFront.PopAllFastReversed() == nullptr); + MG_BOX_ASSERT(myQueueWaiting.Count() == 0); + MG_BOX_ASSERT(myQueueReady.Count() == 0); + } + + void + TaskScheduler::Start( + uint32_t aThreadCount) + { + PrivSchedulerLock(); + mySchedBatchSize = myExecBatchSize * aThreadCount; + MG_BOX_ASSERT(myThreads.empty()); myThreads.resize(aThreadCount); for (TaskSchedulerThread*& t : myThreads) { - t = new TaskSchedulerThread(aName, this); + t = new TaskSchedulerThread(myName.c_str(), this); t->Start(); } + PrivSchedulerUnlock(); } - TaskScheduler::~TaskScheduler() + bool + TaskScheduler::IsEmpty() + { + PrivSchedulerLock(); + for (TaskSchedulerThread* worker : myThreads) + { + // It is important to check the worker states. Even if all the queues are + // empty, it doesn't mean there are no tasks and won't be new ones. Because + // the currently running tasks might produce new tasks, and then the scheduler + // isn't empty. + if (worker->GetState() != TASK_SCHEDULER_WORKER_STATE_IDLE) + { + PrivSchedulerUnlock(); + return false; + } + } + bool isEmpty = + myQueueFront.IsEmpty() && + myQueueWaiting.Count() == 0 && + myQueuePending.IsEmpty() && + myQueueReady.Count() == 0; + PrivSchedulerUnlock(); + return isEmpty; + } + + bool + TaskScheduler::WaitEmpty( + mg::box::TimeLimit aTimeLimit) { - myIsStopped.StoreRelease(true); + mg::box::TimePoint deadline = aTimeLimit.ToPointFromNow(); + while (!IsEmpty()) + { + // Polling is slow and stupid, but 'wait empty' is needed in such situations + // where these milliseconds won't matter at all. And having it done via + // polling allows to keep the place trivially simple. + if (mg::box::GetMilliseconds() > deadline.myValue) + return false; + mg::box::Sleep(10); + } + return true; + } + + void + TaskScheduler::Stop() + { + PrivSchedulerLock(); + if (myThreads.empty()) + { + PrivSchedulerUnlock(); + return; + } + mySchedBatchSize = myExecBatchSize; for (TaskSchedulerThread* t : myThreads) t->Stop(); - // It is enough to wake the sched-thread. It will wakeup - // another worker, and they will wakeup each other like - // domino. - mySignalFront.Send(); + PrivSignalReady(); + // Yes, keep holding the lock while stopping the threads. They don't need to enter + // the scheduler-role anyway. for (TaskSchedulerThread* t : myThreads) t->StopAndDelete(); - MG_BOX_ASSERT(myQueuePending.IsEmpty()); - MG_BOX_ASSERT(myQueueFront.PopAllFastReversed() == nullptr); - MG_BOX_ASSERT(myQueueWaiting.Count() == 0); - MG_BOX_ASSERT(myQueueReady.Count() == 0); + myThreads.clear(); + PrivSchedulerUnlock(); } void @@ -67,18 +133,23 @@ namespace sch { mySignalFront.Send(); } + inline void + TaskScheduler::PrivSchedulerLock() + { + mySchedulerMutex.Lock([this]() { mySignalFront.Send(); }); + } + inline bool TaskScheduler::PrivSchedulerTryLock() { return mySchedulerMutex.TryLock(); } - TaskScheduleResult + bool TaskScheduler::PrivSchedule() { if (!PrivSchedulerTryLock()) - return TASK_SCHEDULE_BUSY; - + return false; // Task status operations can all be relaxed inside the // scheduler. Syncing writes and reads between producers and // workers anyway happens via acquire-release of the front @@ -92,7 +163,6 @@ namespace sch { uint64_t timestamp = mg::box::GetMilliseconds(); uint32_t batch; uint32_t maxBatch = mySchedBatchSize; - TaskScheduleResult result = TASK_SCHEDULE_DONE; // ------------------------------------------------------- // Handle waiting tasks. They are older than the ones in @@ -228,23 +298,14 @@ namespace sch { mg::box::TimeDuration(deadline - timestamp)); } } - else if (!PrivIsStopped()) - { - mySignalFront.ReceiveBlocking(); - } else { - // **Only** report scheduling is fully finished when there is actually - // nothing to do. That is, no tasks anywhere at all. Not in the front - // queue, not in the waiting queue, not in the ready queue. - result = TASK_SCHEDULE_FINISHED; - goto end; + mySignalFront.ReceiveBlocking(); } } - end: PrivSchedulerUnlock(); - return result; + return true; } inline void @@ -299,58 +360,59 @@ namespace sch { return true; } - void + inline void TaskScheduler::PrivWaitReady() { mySignalReady.ReceiveBlocking(); } - void + inline void TaskScheduler::PrivSignalReady() { mySignalReady.Send(); } - bool - TaskScheduler::PrivIsStopped() - { - return myIsStopped.LoadAcquire(); - } - TaskSchedulerThread::TaskSchedulerThread( const char* aSchedulerName, TaskScheduler* aScheduler) : Thread(mg::box::StringFormat( "mgsch.wrk%s", aSchedulerName).c_str()) , myScheduler(aScheduler) + , myState(TASK_SCHEDULER_WORKER_STATE_IDLE) , myExecuteCount(0) , myScheduleCount(0) { myConsumer.Attach(&myScheduler->myQueueReady); } + inline TaskSchedulerWorkerState + TaskSchedulerThread::GetState() const + { + return myState.LoadRelaxed(); + } + void TaskSchedulerThread::Run() { TaskScheduler::ourCurrent = myScheduler; uint64_t maxBatch = myScheduler->myExecBatchSize; uint64_t batch; - TaskScheduleResult result = TASK_SCHEDULE_DONE; - while (result != TASK_SCHEDULE_FINISHED) + while (!StopRequested()) { + myState.StoreRelaxed(TASK_SCHEDULER_WORKER_STATE_RUNNING); do { - result = myScheduler->PrivSchedule(); - if (result != TASK_SCHEDULE_BUSY) + if (myScheduler->PrivSchedule()) myScheduleCount.IncrementRelaxed(); batch = 0; while (myScheduler->PrivExecute(myConsumer.Pop()) && ++batch < maxBatch); myExecuteCount.AddRelaxed(batch); } while (batch == maxBatch); MG_DEV_ASSERT(batch < maxBatch); - + myState.StoreRelaxed(TASK_SCHEDULER_WORKER_STATE_IDLE); myScheduler->PrivWaitReady(); } + myState.StoreRelaxed(TASK_SCHEDULER_WORKER_STATE_IDLE); myScheduler->PrivSignalReady(); MG_BOX_ASSERT(TaskScheduler::ourCurrent == myScheduler); TaskScheduler::ourCurrent = nullptr; diff --git a/src/mg/sch/TaskScheduler.h b/src/mg/sch/TaskScheduler.h index 3206b6bd..838b801e 100644 --- a/src/mg/sch/TaskScheduler.h +++ b/src/mg/sch/TaskScheduler.h @@ -50,17 +50,6 @@ namespace sch { class TaskSchedulerThread; - enum TaskScheduleResult - { - // Scheduling isn't done, another thread is doing it right now. - TASK_SCHEDULE_BUSY, - // Scheduling is done successfully. - TASK_SCHEDULE_DONE, - // Done successfully and was the last one. The scheduler is being stopped right - // now. - TASK_SCHEDULE_FINISHED, - }; - // Scheduler for asynchronous execution of tasks. Can be used // for tons of one-shot short-living tasks, as well as for // long-living periodic tasks with deadlines. @@ -78,11 +67,17 @@ namespace sch { // thousands (1-5) usually is fine. TaskScheduler( const char* aName, - uint32_t aThreadCount, uint32_t aSubQueueSize); ~TaskScheduler(); + void Start( + uint32_t aThreadCount); + bool IsEmpty(); + bool WaitEmpty( + mg::box::TimeLimit aTimeLimit = mg::box::theTimeDurationInf); + void Stop(); + // Ensure the scheduler can fit the given number of tasks // in its internal queues without making any additional // memory allocations. @@ -121,8 +116,9 @@ namespace sch { void PrivPost( Task* aTask); + void PrivSchedulerLock(); bool PrivSchedulerTryLock(); - TaskScheduleResult PrivSchedule(); + bool PrivSchedule(); void PrivSchedulerUnlock(); bool PrivExecute( @@ -142,14 +138,11 @@ namespace sch { // Front queue is very hot, it is updated by all threads - // external ones and workers. Should not invalidate the // cache of the less intensively updated fields. It is - // separated from the fields below by thread list, which - // is almost never updated or read. So can be used as a - // barrier. - std::vector myThreads; + // separated from the fields below by a padding. + MG_UNUSED_MEMBER char myFalseSharingProtection1[MG_CACHE_LINE_SIZE]; TaskSchedulerQueuePending myQueuePending; TaskSchedulerQueueWaiting myQueueWaiting; - TaskSchedulerQueueReady myQueueReady; mg::box::Signal mySignalReady; // Threads try to execute not just all ready tasks in a row - periodically they // try to take care of the scheduling too. It helps to prevent the front-queue @@ -160,7 +153,12 @@ namespace sch { // the bottleneck when the scheduling takes too long time while the other threads // are idle and the ready-queue is empty. For example, processing of a million of // front queue tasks might take ~100-200ms. - const uint32_t mySchedBatchSize; + uint32_t mySchedBatchSize; + + // The ready-queue is being used by multiple threads. Lets make sure they won't + // invalidate the scheduler-role's data. + MG_UNUSED_MEMBER char myFalseSharingProtection2[MG_CACHE_LINE_SIZE]; + TaskSchedulerQueueReady myQueueReady; // The pending and waiting tasks must be dispatched // somehow to be moved to the ready queue. For that there @@ -175,8 +173,11 @@ namespace sch { // // The worker thread, doing the scheduling right now, is // called 'sched-thread' throughout the code. + MG_UNUSED_MEMBER char myFalseSharingProtection3[MG_CACHE_LINE_SIZE]; mg::box::InterruptibleMutex mySchedulerMutex; - mg::box::AtomicBool myIsStopped; + + std::vector myThreads; + const std::string myName; static thread_local TaskScheduler* ourCurrent; @@ -184,6 +185,12 @@ namespace sch { friend class TaskSchedulerThread; }; + enum TaskSchedulerWorkerState + { + TASK_SCHEDULER_WORKER_STATE_RUNNING, + TASK_SCHEDULER_WORKER_STATE_IDLE, + }; + class TaskSchedulerThread : public mg::box::Thread { @@ -196,10 +203,13 @@ namespace sch { uint64_t StatPopScheduleCount(); + TaskSchedulerWorkerState GetState() const; + private: void Run() override; TaskScheduler* myScheduler; + mg::box::Atomic myState; TaskSchedulerQueueReadyConsumer myConsumer; mg::box::AtomicU64 myExecuteCount; mg::box::AtomicU64 myScheduleCount; diff --git a/test/sch/UnitTestTaskScheduler.cpp b/test/sch/UnitTestTaskScheduler.cpp index 2609f676..2323c4e6 100644 --- a/test/sch/UnitTestTaskScheduler.cpp +++ b/test/sch/UnitTestTaskScheduler.cpp @@ -31,7 +31,8 @@ namespace sch { { TestCaseGuard guard("Basic"); - mg::sch::TaskScheduler sched("tst", 1, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); mg::sch::TaskCallback cb; mg::sch::Task* tp; mg::box::AtomicU32 progress; @@ -124,12 +125,31 @@ namespace sch { doneCount.IncrementRelaxed(); }); { - mg::sch::TaskScheduler sched("tst", 1, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); sched.Post(&task1); } TEST_CHECK(doneCount.LoadRelaxed() == 2); } + static void + UnitTestTaskSchedulerDestroyWithWaiting() + { + TestCaseGuard guard("Destroy with waiting"); + + mg::box::AtomicU32 doneCount(0); + mg::sch::Task task([&](mg::sch::Task*) { + doneCount.IncrementRelaxed(); + }); + task.SetDelay(50); + { + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); + sched.Post(&task); + } + TEST_CHECK(doneCount.LoadRelaxed() == 1); + } + static void UnitTestTaskSchedulerOrder() { @@ -138,7 +158,8 @@ namespace sch { // Order is never guaranteed in a multi-thread system. But // at least it should be correct when the thread is just // one. - mg::sch::TaskScheduler sched("tst", 1, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); mg::sch::TaskCallback cb; mg::sch::Task t1; mg::sch::Task t2; @@ -181,7 +202,8 @@ namespace sch { // Ensure all the workers wakeup each other if necessary. // The test is called 'domino', because the worker threads // are supposed to wake each other on demand. - mg::sch::TaskScheduler sched("tst", 3, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(3); mg::sch::TaskCallback cb; mg::sch::Task t1; mg::sch::Task t2; @@ -216,7 +238,8 @@ namespace sch { { TestCaseGuard guard("Wakeup"); - mg::sch::TaskScheduler sched("tst", 1, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); mg::sch::TaskCallback cb; mg::sch::Task t1; mg::box::AtomicU32 progress(false); @@ -313,7 +336,8 @@ namespace sch { { TestCaseGuard guard("Expiration"); - mg::sch::TaskScheduler sched("tst", 1, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(1); mg::sch::Task t1; mg::box::AtomicU32 progress; @@ -430,8 +454,10 @@ namespace sch { { TestCaseGuard guard("Reschedule"); - mg::sch::TaskScheduler sched1("tst1", 2, 100); - mg::sch::TaskScheduler sched2("tst2", 2, 100); + mg::sch::TaskScheduler sched1("tst1", 100); + sched1.Start(2); + mg::sch::TaskScheduler sched2("tst2", 100); + sched2.Start(2); mg::sch::TaskCallback cb; mg::sch::Task t1; mg::sch::Task t2; @@ -517,7 +543,8 @@ namespace sch { { TestCaseGuard guard("Signal"); - mg::sch::TaskScheduler sched("tst", 1, 2); + mg::sch::TaskScheduler sched("tst", 2); + sched.Start(1); // Signal works during execution. mg::box::AtomicU32 progress(0); @@ -613,7 +640,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine basic"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::sch::Task t; // @@ -671,7 +699,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine AsyncReceiveSignal()"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::box::AtomicBool isGuardDone(false); mg::sch::Task t; @@ -752,7 +781,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine AsyncExitDelete()"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::sch::Task* t = new mg::sch::Task(); @@ -779,7 +809,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine AsyncExitExec()"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::sch::Task t; t.SetCallback([]( @@ -840,7 +871,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine nested"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::sch::Task t; t.SetCallback([]( @@ -940,8 +972,10 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine different schedulers"); - mg::sch::TaskScheduler sched1("tst1", 1, 100); - mg::sch::TaskScheduler sched2("tst2", 1, 100); + mg::sch::TaskScheduler sched1("tst1", 100); + sched1.Start(1); + mg::sch::TaskScheduler sched2("tst2", 100); + sched2.Start(1); mg::box::Signal s; mg::sch::Task t; t.SetCallback([]( @@ -995,7 +1029,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine stress"); - mg::sch::TaskScheduler sched("tst", 5, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(5); mg::box::Signal s; const uint32_t taskCount = 1000; const uint32_t signalCount = 10; @@ -1291,7 +1326,8 @@ namespace sch { Report("Batch test: %u threads, %u tasks, %u executes", aThreadCount, aTaskCount, aExecuteCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, aExecuteCount, &sched); ctx.CreateHeavy(); @@ -1312,7 +1348,8 @@ namespace sch { // fast is the scheduler itself, almost not affected by // the task bodies. Report("Micro test: %u threads, %u tasks", aThreadCount, aTaskCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, 1, &sched); ctx.CreateMicro(); @@ -1337,7 +1374,8 @@ namespace sch { // it is -1 virtual call compared to automatic one-shot // tasks. Report("Micro new test: %u threads, %u tasks", aThreadCount, aTaskCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); mg::box::AtomicU64 executeCount(0); mg::sch::TaskCallback cb( [&](mg::sch::Task* aTask) { @@ -1367,7 +1405,8 @@ namespace sch { // Checkout speed of one-shot tasks, which are allocated // automatically inside of the scheduler. Report("Micro one shot test: %u threads, %u tasks", aThreadCount, aTaskCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); mg::box::AtomicU64 executeCount(0); mg::sch::TaskCallbackOneShot cb([&](void) -> void { executeCount.IncrementRelaxed(); @@ -1396,7 +1435,8 @@ namespace sch { // steps, not in a single sleep-less loop. Report("Portions test: %u threads, %u tasks, %u executes", aThreadCount, aTaskCount, aExecuteCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, aExecuteCount, &sched); ctx.CreateHeavy(); @@ -1427,7 +1467,8 @@ namespace sch { uint32_t tasksPer50ms = aTaskCount / aDuration * 50; Report("Mild load test: %u threads, %u tasks, %u executes, %u per 50ms", aThreadCount, aTaskCount, aExecuteCount, tasksPer50ms); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, aExecuteCount, &sched); ctx.CreateHeavy(); @@ -1455,7 +1496,8 @@ namespace sch { // logarithmic time to remove each of them from the // waiting tasks queue's root, which is a binary heap. Report("Timeouts test: %u tasks", aTaskCount); - mg::sch::TaskScheduler sched("tst", 1, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(1); UTTSchedulerTaskCtx ctx(aTaskCount, 1, &sched); ctx.CreateMicro(); @@ -1506,7 +1548,8 @@ namespace sch { // Ensure the tasks never stuck when signals are used. Report("Signal stress test: %u threads, %u tasks, %u executes", aThreadCount, aTaskCount, aExecuteCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, aExecuteCount, &sched); ctx.CreateSignaled(); @@ -1530,6 +1573,7 @@ namespace sch { UnitTestTaskSchedulerBasic(); UnitTestTaskSchedulerDestroyWithFront(); + UnitTestTaskSchedulerDestroyWithWaiting(); UnitTestTaskSchedulerOrder(); UnitTestTaskSchedulerDomino(); UnitTestTaskSchedulerWakeup(); From 3c2a04f6f70898aa03ca59d77ac79919460991f3 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Tue, 20 May 2025 22:12:23 +0200 Subject: [PATCH 6/6] aio: use proper barriers in socket closure code The socket closure consists of 2 steps: - Set the close guard. If wasn't already set, the set CLOSING state. - When see CLOSING state, ensure the close-guard is set and do the closure. The close guard and CLOSING state were both set using the relaxed memory order. It could lead to races on ARM. One thread could set close-guard and then CLOSING state. Another thread could see the CLOSING state, but not see the close-guard being set yet. Lets fix that by making those threads sync on the state (similar to how signals work in TaskScheduler). The first thread would release-write. And the second thread would acquire-read. Then the close-guard is properly synced between them. Closes #46 --- src/mg/aio/IOTask.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/mg/aio/IOTask.cpp b/src/mg/aio/IOTask.cpp index 4a17b718..c9d13c60 100755 --- a/src/mg/aio/IOTask.cpp +++ b/src/mg/aio/IOTask.cpp @@ -101,7 +101,10 @@ namespace aio { // immediately noticed by the scheduler (if the task was already in the front // queue) and would be actually closed + deleted in some worker thread even before // this function ends. - IOTaskStatus oldStatus = myStatus.ExchangeRelaxed(IOTASK_STATUS_CLOSING); + // + // Use 'release' to give the other threads a way to sync the writes done before + // closing, such as the close-guard setting. + IOTaskStatus oldStatus = myStatus.ExchangeRelease(IOTASK_STATUS_CLOSING); MG_BOX_ASSERT_F( oldStatus == IOTASK_STATUS_PENDING || oldStatus == IOTASK_STATUS_READY || @@ -260,10 +263,12 @@ namespace aio { void IOTask::PrivCloseDo() { + // Use 'acquire' to sync all the writes done by the thread which has initiated the + // closing. Such as the close-guard setting. + MG_BOX_ASSERT(myStatus.LoadAcquire() == IOTASK_STATUS_CLOSING); MG_BOX_ASSERT(myCloseGuard.LoadRelaxed()); MG_BOX_ASSERT(!myIsClosed); MG_BOX_ASSERT(myNext == nullptr); - MG_BOX_ASSERT(myStatus.LoadRelaxed() == IOTASK_STATUS_CLOSING); // Closed flag is ok to update non-atomically. Close is done in the scheduler, so // the task is not executed in any worker now and they can't see this flag before // the task's socket is finally removed from the kernel.