diff --git a/bench/taskscheduler/BenchTaskSchedulerTemplate.hpp b/bench/taskscheduler/BenchTaskSchedulerTemplate.hpp index 3bc06639..ad50a4ba 100644 --- a/bench/taskscheduler/BenchTaskSchedulerTemplate.hpp +++ b/bench/taskscheduler/BenchTaskSchedulerTemplate.hpp @@ -365,7 +365,8 @@ namespace bench { uint32_t aTaskCount, uint32_t aExecuteCount) { - TaskScheduler sched("bench", aThreadCount, 5000); + TaskScheduler sched("bench", 5000); + sched.Start(aThreadCount); sched.Reserve(aTaskCount); BenchTaskCtl ctl(aTaskCount, aExecuteCount, &sched); ctl.Warmup(); diff --git a/bench/taskscheduler/BenchTaskSchedulerTrivial.cpp b/bench/taskscheduler/BenchTaskSchedulerTrivial.cpp index 7792da10..ddab5b13 100644 --- a/bench/taskscheduler/BenchTaskSchedulerTrivial.cpp +++ b/bench/taskscheduler/BenchTaskSchedulerTrivial.cpp @@ -57,11 +57,13 @@ namespace bench { public: TaskScheduler( const char* aName, - uint32_t aThreadCount, uint32_t aSubQueueSize); ~TaskScheduler(); + void Start( + uint32_t aThreadCount); + void Post( Task* aTask); @@ -85,6 +87,7 @@ namespace bench { bool myIsStopped; TaskList myQueue; std::vector myWorkers; + const std::string myName; friend TaskSchedulerThread; }; @@ -146,13 +149,10 @@ namespace bench { TaskScheduler::TaskScheduler( const char* aName, - uint32_t aThreadCount, uint32_t /*aSubQueueSize*/) - : myIsStopped(false) + : myIsStopped(true) + , myName(aName) { - myWorkers.resize(aThreadCount); - for (TaskSchedulerThread*& w : myWorkers) - w = new TaskSchedulerThread(aName, this); } TaskScheduler::~TaskScheduler() @@ -165,6 +165,16 @@ namespace bench { delete w; } + void + TaskScheduler::Start( + uint32_t aThreadCount) + { + myIsStopped = false; + myWorkers.resize(aThreadCount); + for (TaskSchedulerThread*& w : myWorkers) + w = new TaskSchedulerThread(myName.c_str(), this); + } + void TaskScheduler::Post( Task* aTask) diff --git a/examples/iocore_03_pipeline/main.cpp b/examples/iocore_03_pipeline/main.cpp index 07828db8..65bd0b84 100644 --- a/examples/iocore_03_pipeline/main.cpp +++ b/examples/iocore_03_pipeline/main.cpp @@ -418,9 +418,9 @@ ServeRequests( // the requests, create MyRequest for each, and submit them to the scheduler. But in // this case it is simplified to just a couple of hardcoded MyRequests. mg::sch::TaskScheduler scheduler("tst", - 1, // Thread count. 5 // Subqueue size. ); + scheduler.Start(1); MG_LOG_INFO("ServeRequests", "got a couple of complex requests"); new MyRequest(1, aClient, scheduler); new MyRequest(2, aClient, scheduler); diff --git a/examples/scheduler_01_simple_task/main.cpp b/examples/scheduler_01_simple_task/main.cpp index 964601fe..0d2b58e1 100644 --- a/examples/scheduler_01_simple_task/main.cpp +++ b/examples/scheduler_01_simple_task/main.cpp @@ -11,9 +11,9 @@ int main() { mg::sch::TaskScheduler sched("tst", - 1, // Thread count. 5 // Subqueue size. ); + sched.Start(1); sched.Post(new mg::sch::Task([&](mg::sch::Task *self) { std::cout << "Executed in scheduler!\n"; delete self; diff --git a/examples/scheduler_02_coroutine_task/main.cpp b/examples/scheduler_02_coroutine_task/main.cpp index 5df2a508..c26ac1a0 100644 --- a/examples/scheduler_02_coroutine_task/main.cpp +++ b/examples/scheduler_02_coroutine_task/main.cpp @@ -48,9 +48,9 @@ main() // Normally one would allocate tasks on the heap and make them delete themselves when // they are finished. mg::sch::TaskScheduler scheduler("tst", - 1, // Thread count. 5 // Subqueue size. ); + scheduler.Start(1); scheduler.Post(&task); return 0; } diff --git a/examples/scheduler_03_multistep_task/main.cpp b/examples/scheduler_03_multistep_task/main.cpp index c0e6e80d..09b12e50 100644 --- a/examples/scheduler_03_multistep_task/main.cpp +++ b/examples/scheduler_03_multistep_task/main.cpp @@ -80,9 +80,9 @@ main() // Normally one would allocate tasks on the heap and make them delete themselves when // they are finished. mg::sch::TaskScheduler scheduler("tst", - 1, // Thread count. 5 // Subqueue size. ); + scheduler.Start(1); scheduler.Post(&task); return 0; } diff --git a/examples/scheduler_04_interacting_tasks/main.cpp b/examples/scheduler_04_interacting_tasks/main.cpp index ab48ec57..25bf3363 100644 --- a/examples/scheduler_04_interacting_tasks/main.cpp +++ b/examples/scheduler_04_interacting_tasks/main.cpp @@ -63,9 +63,9 @@ main() // Normally one would allocate tasks on the heap and make them delete themselves when // they are finished. mg::sch::TaskScheduler scheduler("tst", - 1, // Thread count. 5 // Subqueue size. ); + scheduler.Start(1); task.SetCallback([]( mg::sch::Task& aSelf) -> mg::box::Coro { diff --git a/src/mg/aio/IOTask.cpp b/src/mg/aio/IOTask.cpp index 4a17b718..c9d13c60 100755 --- a/src/mg/aio/IOTask.cpp +++ b/src/mg/aio/IOTask.cpp @@ -101,7 +101,10 @@ namespace aio { // immediately noticed by the scheduler (if the task was already in the front // queue) and would be actually closed + deleted in some worker thread even before // this function ends. - IOTaskStatus oldStatus = myStatus.ExchangeRelaxed(IOTASK_STATUS_CLOSING); + // + // Use 'release' to give the other threads a way to sync the writes done before + // closing, such as the close-guard setting. + IOTaskStatus oldStatus = myStatus.ExchangeRelease(IOTASK_STATUS_CLOSING); MG_BOX_ASSERT_F( oldStatus == IOTASK_STATUS_PENDING || oldStatus == IOTASK_STATUS_READY || @@ -260,10 +263,12 @@ namespace aio { void IOTask::PrivCloseDo() { + // Use 'acquire' to sync all the writes done by the thread which has initiated the + // closing. Such as the close-guard setting. + MG_BOX_ASSERT(myStatus.LoadAcquire() == IOTASK_STATUS_CLOSING); MG_BOX_ASSERT(myCloseGuard.LoadRelaxed()); MG_BOX_ASSERT(!myIsClosed); MG_BOX_ASSERT(myNext == nullptr); - MG_BOX_ASSERT(myStatus.LoadRelaxed() == IOTASK_STATUS_CLOSING); // Closed flag is ok to update non-atomically. Close is done in the scheduler, so // the task is not executed in any worker now and they can't see this flag before // the task's socket is finally removed from the kernel. diff --git a/src/mg/box/Atomic.h b/src/mg/box/Atomic.h index 5a8b6ff2..aa8db5f3 100644 --- a/src/mg/box/Atomic.h +++ b/src/mg/box/Atomic.h @@ -176,6 +176,58 @@ namespace box { T DecrementFetch() { return myValue.fetch_sub(1, std::memory_order_seq_cst) - 1; } + T FetchBitOrRelaxed( + const T& aValue) + { return myValue.fetch_or(aValue, std::memory_order_relaxed); } + T FetchBitOrAcquire( + const T& aValue) + { return myValue.fetch_or(aValue, std::memory_order_acquire); } + T FetchBitOrRelease( + const T& aValue) + { return myValue.fetch_or(aValue, std::memory_order_release); } + T FetchBitOr( + const T& aValue) + { return myValue.fetch_or(aValue, std::memory_order_seq_cst); } + + void BitOrRelaxed( + const T& aValue) + { FetchBitOrRelaxed(aValue); } + void BitOrAcquire( + const T& aValue) + { FetchBitOrAcquire(aValue); } + void BitOrRelease( + const T& aValue) + { FetchBitOrRelease(aValue); } + void BitOr( + const T& aValue) + { FetchBitOr(aValue); } + + T FetchBitAndRelaxed( + const T& aValue) + { return myValue.fetch_and(aValue, std::memory_order_relaxed); } + T FetchBitAndAcquire( + const T& aValue) + { return myValue.fetch_and(aValue, std::memory_order_acquire); } + T FetchBitAndRelease( + const T& aValue) + { return myValue.fetch_and(aValue, std::memory_order_release); } + T FetchBitAnd( + const T& aValue) + { return myValue.fetch_and(aValue, std::memory_order_seq_cst); } + + void BitAndRelaxed( + const T& aValue) + { FetchBitAndRelaxed(aValue); } + void BitAndAcquire( + const T& aValue) + { FetchBitAndAcquire(aValue); } + void BitAndRelease( + const T& aValue) + { FetchBitAndRelease(aValue); } + void BitAnd( + const T& aValue) + { FetchBitAnd(aValue); } + bool CmpExchgWeakRelaxed( T& aExpected, const T& aValue); diff --git a/src/mg/box/CMakeLists.txt b/src/mg/box/CMakeLists.txt index 41f7cab3..b8779f8b 100644 --- a/src/mg/box/CMakeLists.txt +++ b/src/mg/box/CMakeLists.txt @@ -5,6 +5,7 @@ set(mgbox_src ConditionVariable.cpp Coro.cpp Error.cpp + InterruptibleMutex.cpp IOVec.cpp Log.cpp MultiConsumerQueueBase.cpp @@ -50,6 +51,7 @@ set(install_headers DoublyList.h Error.h ForwardList.h + InterruptibleMutex.h IOVec.h Log.h MultiConsumerQueue.h diff --git a/src/mg/box/InterruptibleMutex.cpp b/src/mg/box/InterruptibleMutex.cpp new file mode 100644 index 00000000..434238b2 --- /dev/null +++ b/src/mg/box/InterruptibleMutex.cpp @@ -0,0 +1,127 @@ +#include "InterruptibleMutex.h" + +#include "mg/box/Thread.h" + +namespace mg { +namespace box { +namespace { + enum InterruptibleMutexFlags + { + INTERRUPTIBLE_MUTEX_TAKEN = 0x1, + INTERRUPTIBLE_MUTEX_HAS_WAITERS = 0x2, + }; +} + InterruptibleMutex::InterruptibleMutex() + : myState(0) + { + } + + InterruptibleMutex::~InterruptibleMutex() + { + MG_BOX_ASSERT(myState.LoadRelaxed() == 0); + } + + void + InterruptibleMutex::Lock( + const InterruptibleMutexWakeupCallback& aWakeup) + { + if (TryLock()) + return; + + // The mutex is owned by somebody. Try to take it over. + myMutex.Lock(); + // Pessimistically this thread might have to wait, so lets add the corresponding + // flag. Use 'acquire' to sync all the writes done by the mutex's current owner. + // In case the owner is already leaving. + uint8_t expected = myState.FetchBitOrAcquire( + INTERRUPTIBLE_MUTEX_HAS_WAITERS); + if ((expected & INTERRUPTIBLE_MUTEX_TAKEN) == 0) + { + // Could happen than between trying to lock and taking the internal mutex the + // original owner has already left. If it actually left the ownership + // entirely, it means there were no waiters. + MG_BOX_ASSERT(myWaiters.IsEmpty()); + // And no other thread could take the lock, because it would see the 'waiters' + // flag and would also try to take the internal mutex, which is owner by this + // thread atm. Hence it is safe to just take the ownership. + // 'Relaxed' should be enough, because acquire was already done just above. + expected = myState.ExchangeRelaxed(INTERRUPTIBLE_MUTEX_TAKEN); + // The waiters flag was set by this thread, but not anymore. New waiters, if + // any are waiting on the internal mutex, will just set this flag again if + // needed. + MG_BOX_ASSERT(expected == INTERRUPTIBLE_MUTEX_HAS_WAITERS); + myMutex.Unlock(); + return; + } + // Some other thread owns the lock and this thread has announced its presence via + // the waiters-flag. Now time to wait until the owner will hand the ownership + // over. + InterruptibleMutexWaiter self; + if (myWaiters.IsEmpty()) + { + // Only the first waiter wakes the owner up. Makes no sense to wake it + // multiple times. + aWakeup(); + } + myWaiters.Append(&self); + myMutex.Unlock(); + + // Just wait for the ownership to be handed over. + self.mySignal.ReceiveBlocking(); + // Use 'acquire' to sync all the writes done by the previous owner. + MG_BOX_ASSERT((myState.LoadAcquire() & INTERRUPTIBLE_MUTEX_TAKEN) != 0); + } + + bool + InterruptibleMutex::TryLock() + { + // It is important that the lock can't be taken if there are already waiters. No + // thread can take the ownership out of order. Otherwise this might cause + // starvation of the waiters. + // Use 'acquire' to sync all the writes done by the previous owner. + uint8_t expected = 0; + return myState.CmpExchgStrongAcquire(expected, INTERRUPTIBLE_MUTEX_TAKEN); + } + + void + InterruptibleMutex::Unlock() + { + // The happy-path is when there are no waiters. Then just unlock it and leave. + // Use 'release' to provide a sync point for the next owners to sync all the + // writes done by this thread. + uint8_t expected = INTERRUPTIBLE_MUTEX_TAKEN; + if (myState.CmpExchgStrongRelease(expected, 0)) + return; + + // So there were some waiters spotted. Try to hand the ownership directly to the + // first one. + MG_BOX_ASSERT(expected == + (INTERRUPTIBLE_MUTEX_TAKEN | INTERRUPTIBLE_MUTEX_HAS_WAITERS)); + + myMutex.Lock(); + MG_BOX_ASSERT(!myWaiters.IsEmpty()); + // It is important to firstly pop the waiter, and only then remove the + // waiters-flag (if there was only one waiter). Otherwise the following might + // have happened: + // - Thread-1 takes the lock. + // - Thread-2 becomes a waiter and adds the waiter-flag. + // - Thread-1 frees the lock and (wrongly) signals the thread-2 that the ownership + // is given. + // - Thread-2 wakes up and tries to free the lock too. + // - Thread-2 sees that the waiters-flag is set, but there are no waiters. Broken + // assumption. + InterruptibleMutexWaiter* first = myWaiters.PopFirst(); + // Even when nothing to do - still use 'release'. To give the next owner a way to + // sync all the writes done by this thread. + if (myWaiters.IsEmpty()) + expected = myState.FetchBitAndRelease(~INTERRUPTIBLE_MUTEX_HAS_WAITERS); + else + expected = myState.FetchBitAndRelease(-1); + MG_BOX_ASSERT((expected & INTERRUPTIBLE_MUTEX_TAKEN) != 0); + MG_BOX_ASSERT((expected & INTERRUPTIBLE_MUTEX_HAS_WAITERS) != 0); + first->mySignal.Send(); + myMutex.Unlock(); + } + +} +} diff --git a/src/mg/box/InterruptibleMutex.h b/src/mg/box/InterruptibleMutex.h new file mode 100644 index 00000000..48383183 --- /dev/null +++ b/src/mg/box/InterruptibleMutex.h @@ -0,0 +1,68 @@ +#pragma once + +#include "mg/box/DoublyList.h" +#include "mg/box/Signal.h" + +#include + +namespace mg { +namespace box { + + struct InterruptibleMutexWaiter + { + mg::box::Signal mySignal; + InterruptibleMutexWaiter* myNext; + InterruptibleMutexWaiter* myPrev; + }; + + using InterruptibleMutexWakeupCallback = std::function; + + // The interruptible mutex allows one thread to take the mutex, get blocked + // on something else while holding it, and the other threads can wake it up + // (interrupt) to forcefully take the mutex for some other work. + // + // The guarantees are the following: + // - There is no spin-locking when TryLock() is used. + // - Some threads use TryLock() + can sleep while holding the lock, + // then other threads via Lock() can take the ownership over to do some + // work given that they won't be sleeping on the same condition. + // + // An example: + // + // - Thread-workers (any number): + // if (TryLock()) + // { + // WaitOn(Signal); + // Unlock(); + // } + // + // - Thread-others (any number): + // Lock([](){ Signal.Send(); }); + // DoSomething(); // But don't sleep on the same Signal. + // Unlock(). + // + // Then the 'other' threads are guaranteed to get the lock. And nothing will + // ever deadlock. + // + class InterruptibleMutex + { + public: + InterruptibleMutex(); + ~InterruptibleMutex(); + + void Lock( + const InterruptibleMutexWakeupCallback& aWakeup); + bool TryLock(); + void Unlock(); + + private: + InterruptibleMutex( + const InterruptibleMutex&) = delete; + + mg::box::Mutex myMutex; + mg::box::DoublyList myWaiters; + mg::box::AtomicU8 myState; + }; + +} +} diff --git a/src/mg/sch/TaskScheduler.cpp b/src/mg/sch/TaskScheduler.cpp index 934c835e..e19518d4 100644 --- a/src/mg/sch/TaskScheduler.cpp +++ b/src/mg/sch/TaskScheduler.cpp @@ -10,37 +10,102 @@ namespace sch { TaskScheduler::TaskScheduler( const char* aName, - uint32_t aThreadCount, uint32_t aSubQueueSize) - : myQueueReady(aSubQueueSize) - , myExecBatchSize(aSubQueueSize) - , mySchedBatchSize(aSubQueueSize * aThreadCount) - , myIsSchedulerWorking(0) - , myIsStopped(0) + : myExecBatchSize(aSubQueueSize) + , mySchedBatchSize(myExecBatchSize) + , myQueueReady(aSubQueueSize) + , myName(aName) { + } + + TaskScheduler::~TaskScheduler() + { + MG_BOX_ASSERT(WaitEmpty()); + Stop(); + MG_BOX_ASSERT(myThreads.empty()); + MG_BOX_ASSERT(myQueuePending.IsEmpty()); + MG_BOX_ASSERT(myQueueFront.PopAllFastReversed() == nullptr); + MG_BOX_ASSERT(myQueueWaiting.Count() == 0); + MG_BOX_ASSERT(myQueueReady.Count() == 0); + } + + void + TaskScheduler::Start( + uint32_t aThreadCount) + { + PrivSchedulerLock(); + mySchedBatchSize = myExecBatchSize * aThreadCount; + MG_BOX_ASSERT(myThreads.empty()); myThreads.resize(aThreadCount); for (TaskSchedulerThread*& t : myThreads) { - t = new TaskSchedulerThread(aName, this); + t = new TaskSchedulerThread(myName.c_str(), this); t->Start(); } + PrivSchedulerUnlock(); } - TaskScheduler::~TaskScheduler() + bool + TaskScheduler::IsEmpty() + { + PrivSchedulerLock(); + for (TaskSchedulerThread* worker : myThreads) + { + // It is important to check the worker states. Even if all the queues are + // empty, it doesn't mean there are no tasks and won't be new ones. Because + // the currently running tasks might produce new tasks, and then the scheduler + // isn't empty. + if (worker->GetState() != TASK_SCHEDULER_WORKER_STATE_IDLE) + { + PrivSchedulerUnlock(); + return false; + } + } + bool isEmpty = + myQueueFront.IsEmpty() && + myQueueWaiting.Count() == 0 && + myQueuePending.IsEmpty() && + myQueueReady.Count() == 0; + PrivSchedulerUnlock(); + return isEmpty; + } + + bool + TaskScheduler::WaitEmpty( + mg::box::TimeLimit aTimeLimit) + { + mg::box::TimePoint deadline = aTimeLimit.ToPointFromNow(); + while (!IsEmpty()) + { + // Polling is slow and stupid, but 'wait empty' is needed in such situations + // where these milliseconds won't matter at all. And having it done via + // polling allows to keep the place trivially simple. + if (mg::box::GetMilliseconds() > deadline.myValue) + return false; + mg::box::Sleep(10); + } + return true; + } + + void + TaskScheduler::Stop() { - myIsStopped.StoreRelease(true); + PrivSchedulerLock(); + if (myThreads.empty()) + { + PrivSchedulerUnlock(); + return; + } + mySchedBatchSize = myExecBatchSize; for (TaskSchedulerThread* t : myThreads) t->Stop(); - // It is enough to wake the sched-thread. It will wakeup - // another worker, and they will wakeup each other like - // domino. - mySignalFront.Send(); + PrivSignalReady(); + // Yes, keep holding the lock while stopping the threads. They don't need to enter + // the scheduler-role anyway. for (TaskSchedulerThread* t : myThreads) t->StopAndDelete(); - MG_BOX_ASSERT(myQueuePending.IsEmpty()); - MG_BOX_ASSERT(myQueueFront.PopAllFastReversed() == nullptr); - MG_BOX_ASSERT(myQueueWaiting.Count() == 0); - MG_BOX_ASSERT(myQueueReady.Count() == 0); + myThreads.clear(); + PrivSchedulerUnlock(); } void @@ -68,12 +133,23 @@ namespace sch { mySignalFront.Send(); } - TaskScheduleResult - TaskScheduler::PrivSchedule() + inline void + TaskScheduler::PrivSchedulerLock() + { + mySchedulerMutex.Lock([this]() { mySignalFront.Send(); }); + } + + inline bool + TaskScheduler::PrivSchedulerTryLock() { - if (myIsSchedulerWorking.ExchangeAcqRel(true)) - return TASK_SCHEDULE_BUSY; + return mySchedulerMutex.TryLock(); + } + bool + TaskScheduler::PrivSchedule() + { + if (!PrivSchedulerTryLock()) + return false; // Task status operations can all be relaxed inside the // scheduler. Syncing writes and reads between producers and // workers anyway happens via acquire-release of the front @@ -87,9 +163,7 @@ namespace sch { uint64_t timestamp = mg::box::GetMilliseconds(); uint32_t batch; uint32_t maxBatch = mySchedBatchSize; - TaskScheduleResult result = TASK_SCHEDULE_DONE; - retry: // ------------------------------------------------------- // Handle waiting tasks. They are older than the ones in // the front queue, so must be handled first. @@ -211,11 +285,9 @@ namespace sch { if (myQueueReady.Count() == 0 && myQueuePending.IsEmpty()) { - // No ready tasks means the other workers already - // sleep on ready-signal. Or are going to start - // sleeping any moment. So the sched can't quit. It - // must retry until either a front task appears, or - // one of the waiting tasks' deadline is expired. + // No ready tasks means the other workers already sleep on ready-signal. Or + // are going to start sleeping any moment. So the sched can't quit. It must + // try to wait until something new happens which would require processing. if (myQueueWaiting.Count() > 0) { deadline = myQueueWaiting.GetTop()->myDeadline; @@ -226,23 +298,20 @@ namespace sch { mg::box::TimeDuration(deadline - timestamp)); } } - else if (!PrivIsStopped()) - { - mySignalFront.ReceiveBlocking(); - } else { - // **Only** report scheduling is fully finished when there is actually - // nothing to do. That is, no tasks anywhere at all. Not in the front - // queue, not in the waiting queue, not in the ready queue. - result = TASK_SCHEDULE_FINISHED; - goto end; + mySignalFront.ReceiveBlocking(); } - goto retry; } - end: - myIsSchedulerWorking.StoreRelease(false); + PrivSchedulerUnlock(); + return true; + } + + inline void + TaskScheduler::PrivSchedulerUnlock() + { + mySchedulerMutex.Unlock(); // The signal is absolutely vital to have exactly here. // If the signal would not be emitted here, all the // workers could block on ready tasks in their loops. @@ -272,7 +341,6 @@ namespace sch { // and other workers will sleep on waiting for ready // tasks. PrivSignalReady(); - return result; } bool @@ -292,58 +360,59 @@ namespace sch { return true; } - void + inline void TaskScheduler::PrivWaitReady() { mySignalReady.ReceiveBlocking(); } - void + inline void TaskScheduler::PrivSignalReady() { mySignalReady.Send(); } - bool - TaskScheduler::PrivIsStopped() - { - return myIsStopped.LoadAcquire(); - } - TaskSchedulerThread::TaskSchedulerThread( const char* aSchedulerName, TaskScheduler* aScheduler) : Thread(mg::box::StringFormat( "mgsch.wrk%s", aSchedulerName).c_str()) , myScheduler(aScheduler) + , myState(TASK_SCHEDULER_WORKER_STATE_IDLE) , myExecuteCount(0) , myScheduleCount(0) { myConsumer.Attach(&myScheduler->myQueueReady); } + inline TaskSchedulerWorkerState + TaskSchedulerThread::GetState() const + { + return myState.LoadRelaxed(); + } + void TaskSchedulerThread::Run() { TaskScheduler::ourCurrent = myScheduler; uint64_t maxBatch = myScheduler->myExecBatchSize; uint64_t batch; - TaskScheduleResult result = TASK_SCHEDULE_DONE; - while (result != TASK_SCHEDULE_FINISHED) + while (!StopRequested()) { + myState.StoreRelaxed(TASK_SCHEDULER_WORKER_STATE_RUNNING); do { - result = myScheduler->PrivSchedule(); - if (result != TASK_SCHEDULE_BUSY) + if (myScheduler->PrivSchedule()) myScheduleCount.IncrementRelaxed(); batch = 0; while (myScheduler->PrivExecute(myConsumer.Pop()) && ++batch < maxBatch); myExecuteCount.AddRelaxed(batch); } while (batch == maxBatch); MG_DEV_ASSERT(batch < maxBatch); - + myState.StoreRelaxed(TASK_SCHEDULER_WORKER_STATE_IDLE); myScheduler->PrivWaitReady(); } + myState.StoreRelaxed(TASK_SCHEDULER_WORKER_STATE_IDLE); myScheduler->PrivSignalReady(); MG_BOX_ASSERT(TaskScheduler::ourCurrent == myScheduler); TaskScheduler::ourCurrent = nullptr; diff --git a/src/mg/sch/TaskScheduler.h b/src/mg/sch/TaskScheduler.h index 8c7935e0..838b801e 100644 --- a/src/mg/sch/TaskScheduler.h +++ b/src/mg/sch/TaskScheduler.h @@ -2,6 +2,7 @@ #include "mg/box/BinaryHeap.h" #include "mg/box/ForwardList.h" +#include "mg/box/InterruptibleMutex.h" #include "mg/box/MultiConsumerQueue.h" #include "mg/box/MultiProducerQueueIntrusive.h" #include "mg/box/Signal.h" @@ -49,17 +50,6 @@ namespace sch { class TaskSchedulerThread; - enum TaskScheduleResult - { - // Scheduling isn't done, another thread is doing it right now. - TASK_SCHEDULE_BUSY, - // Scheduling is done successfully. - TASK_SCHEDULE_DONE, - // Done successfully and was the last one. The scheduler is being stopped right - // now. - TASK_SCHEDULE_FINISHED, - }; - // Scheduler for asynchronous execution of tasks. Can be used // for tons of one-shot short-living tasks, as well as for // long-living periodic tasks with deadlines. @@ -77,11 +67,17 @@ namespace sch { // thousands (1-5) usually is fine. TaskScheduler( const char* aName, - uint32_t aThreadCount, uint32_t aSubQueueSize); ~TaskScheduler(); + void Start( + uint32_t aThreadCount); + bool IsEmpty(); + bool WaitEmpty( + mg::box::TimeLimit aTimeLimit = mg::box::theTimeDurationInf); + void Stop(); + // Ensure the scheduler can fit the given number of tasks // in its internal queues without making any additional // memory allocations. @@ -120,7 +116,10 @@ namespace sch { void PrivPost( Task* aTask); - TaskScheduleResult PrivSchedule(); + void PrivSchedulerLock(); + bool PrivSchedulerTryLock(); + bool PrivSchedule(); + void PrivSchedulerUnlock(); bool PrivExecute( Task* aTask); @@ -139,14 +138,11 @@ namespace sch { // Front queue is very hot, it is updated by all threads - // external ones and workers. Should not invalidate the // cache of the less intensively updated fields. It is - // separated from the fields below by thread list, which - // is almost never updated or read. So can be used as a - // barrier. - std::vector myThreads; + // separated from the fields below by a padding. + MG_UNUSED_MEMBER char myFalseSharingProtection1[MG_CACHE_LINE_SIZE]; TaskSchedulerQueuePending myQueuePending; TaskSchedulerQueueWaiting myQueueWaiting; - TaskSchedulerQueueReady myQueueReady; mg::box::Signal mySignalReady; // Threads try to execute not just all ready tasks in a row - periodically they // try to take care of the scheduling too. It helps to prevent the front-queue @@ -157,7 +153,12 @@ namespace sch { // the bottleneck when the scheduling takes too long time while the other threads // are idle and the ready-queue is empty. For example, processing of a million of // front queue tasks might take ~100-200ms. - const uint32_t mySchedBatchSize; + uint32_t mySchedBatchSize; + + // The ready-queue is being used by multiple threads. Lets make sure they won't + // invalidate the scheduler-role's data. + MG_UNUSED_MEMBER char myFalseSharingProtection2[MG_CACHE_LINE_SIZE]; + TaskSchedulerQueueReady myQueueReady; // The pending and waiting tasks must be dispatched // somehow to be moved to the ready queue. For that there @@ -172,8 +173,11 @@ namespace sch { // // The worker thread, doing the scheduling right now, is // called 'sched-thread' throughout the code. - mg::box::AtomicBool myIsSchedulerWorking; - mg::box::AtomicBool myIsStopped; + MG_UNUSED_MEMBER char myFalseSharingProtection3[MG_CACHE_LINE_SIZE]; + mg::box::InterruptibleMutex mySchedulerMutex; + + std::vector myThreads; + const std::string myName; static thread_local TaskScheduler* ourCurrent; @@ -181,6 +185,12 @@ namespace sch { friend class TaskSchedulerThread; }; + enum TaskSchedulerWorkerState + { + TASK_SCHEDULER_WORKER_STATE_RUNNING, + TASK_SCHEDULER_WORKER_STATE_IDLE, + }; + class TaskSchedulerThread : public mg::box::Thread { @@ -193,10 +203,13 @@ namespace sch { uint64_t StatPopScheduleCount(); + TaskSchedulerWorkerState GetState() const; + private: void Run() override; TaskScheduler* myScheduler; + mg::box::Atomic myState; TaskSchedulerQueueReadyConsumer myConsumer; mg::box::AtomicU64 myExecuteCount; mg::box::AtomicU64 myScheduleCount; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 57853010..146fb647 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable(test box/UnitTestDoublyList.cpp box/UnitTestError.cpp box/UnitTestForwardList.cpp + box/UnitTestInterruptibleMutex.cpp box/UnitTestIOVec.cpp box/UnitTestLog.cpp box/UnitTestMultiConsumerQueue.cpp diff --git a/test/box/UnitTestAtomic.cpp b/test/box/UnitTestAtomic.cpp index 6f02ba0f..47e04f58 100644 --- a/test/box/UnitTestAtomic.cpp +++ b/test/box/UnitTestAtomic.cpp @@ -2,6 +2,8 @@ #include "UnitTest.h" +#include + namespace mg { namespace unittests { namespace box { @@ -198,6 +200,90 @@ namespace box { TEST_CHECK(value.LoadRelaxed() == 119); } + static void + UnitTestAtomicFetchBitOr() + { + mg::box::AtomicU32 value; + using Func = std::function; + Func funcs[] = { + std::bind(&mg::box::AtomicU32::FetchBitOrRelaxed, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitOrAcquire, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitOrRelease, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitOr, &value, + std::placeholders::_1), + }; + for (const Func& f : funcs) + { + value.StoreRelaxed(0b0011); + TEST_CHECK(f(0b1000) == 0b0011); + TEST_CHECK(value.LoadRelaxed() == 0b1011); + } + } + + static void + UnitTestAtomicBitOr() + { + mg::box::AtomicU32 value; + using Func = std::function; + Func funcs[] = { + std::bind(&mg::box::AtomicU32::BitOrRelaxed, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitOrAcquire, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitOrRelease, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitOr, &value, std::placeholders::_1), + }; + for (const Func& f : funcs) + { + value.StoreRelaxed(0b0011); + f(0b1000); + TEST_CHECK(value.LoadRelaxed() == 0b1011); + } + } + + static void + UnitTestAtomicFetchBitAnd() + { + mg::box::AtomicU32 value; + using Func = std::function; + Func funcs[] = { + std::bind(&mg::box::AtomicU32::FetchBitAndRelaxed, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitAndAcquire, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitAndRelease, &value, + std::placeholders::_1), + std::bind(&mg::box::AtomicU32::FetchBitAnd, &value, + std::placeholders::_1), + }; + for (const Func& f : funcs) + { + value.StoreRelaxed(0b0011); + TEST_CHECK(f(0b1010) == 0b0011); + TEST_CHECK(value.LoadRelaxed() == 0b0010); + } + } + + static void + UnitTestAtomicBitAnd() + { + mg::box::AtomicU32 value; + using Func = std::function; + Func funcs[] = { + std::bind(&mg::box::AtomicU32::BitAndRelaxed, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitAndAcquire, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitAndRelease, &value, std::placeholders::_1), + std::bind(&mg::box::AtomicU32::BitAnd, &value, std::placeholders::_1), + }; + for (const Func& f : funcs) + { + value.StoreRelaxed(0b0011); + f(0b1010); + TEST_CHECK(value.LoadRelaxed() == 0b0010); + } + } + static void UnitTestAtomicCmpExchgWeak() { @@ -375,6 +461,10 @@ namespace box { UnitTestAtomicDecrement(); UnitTestAtomicFetchDecrement(); UnitTestAtomicDecrementFetch(); + UnitTestAtomicFetchBitOr(); + UnitTestAtomicBitOr(); + UnitTestAtomicFetchBitAnd(); + UnitTestAtomicBitAnd(); UnitTestAtomicCmpExchgWeak(); UnitTestAtomicCmpExchgStrong(); UnitTestAtomicBool(); diff --git a/test/box/UnitTestInterruptibleMutex.cpp b/test/box/UnitTestInterruptibleMutex.cpp new file mode 100644 index 00000000..74ab9317 --- /dev/null +++ b/test/box/UnitTestInterruptibleMutex.cpp @@ -0,0 +1,241 @@ +#include "mg/box/InterruptibleMutex.h" + +#include "mg/box/ThreadFunc.h" + +#include "UnitTest.h" + +namespace mg { +namespace unittests { +namespace box { + + static void + UnreachableWakeup() + { + TEST_CHECK(!"Reachable"); + } + + static void + UnitTestInterruptibleMutexBasic() + { + mg::box::InterruptibleMutex mutex; + mutex.Lock(UnreachableWakeup); + mutex.Unlock(); + mutex.Lock(UnreachableWakeup); + mutex.Unlock(); + } + + void + UnitTestInterruptibleMutexWakeupOne() + { + mg::box::InterruptibleMutex mutex; + mg::box::Signal signal; + // + // One thread takes the mutex and sleeps on something. + // + mg::box::Signal isBlockedOnSignal; + mg::box::ThreadFunc* otherThread = new mg::box::ThreadFunc("mgtst", + [&]() { + TEST_CHECK(mutex.TryLock()); + isBlockedOnSignal.Send(); + signal.ReceiveBlocking(); + mutex.Unlock(); + }); + otherThread->Start(); + isBlockedOnSignal.ReceiveBlocking(); + // + // Another thread wakes it up to take the mutex over. + // + mutex.Lock([&]() { signal.Send(); }); + delete otherThread; + mutex.Unlock(); + } + + void + UnitTestInterruptibleMutexWakeupTwo() + { + mg::box::InterruptibleMutex mutex; + mg::box::Signal mainSignal; + // + // One thread takes the mutex and does something, not yet waiting on the main + // signal. + // + mg::box::Signal isStarted; + mg::box::Signal getTheMainSignal; + mg::box::ThreadFunc* thread1 = new mg::box::ThreadFunc("mgtst1", + [&]() { + TEST_CHECK(mutex.TryLock()); + isStarted.Send(); + getTheMainSignal.ReceiveBlocking(); + mainSignal.ReceiveBlocking(); + mutex.Unlock(); + }); + thread1->Start(); + isStarted.ReceiveBlocking(); + // + // Another thread tries to enter the mutex, but can't atm. Even the wakeup won't + // help, because the first thread doesn't sleep on the main signal yet. + // + mg::box::ThreadFunc* thread2 = new mg::box::ThreadFunc("mgtst2", + [&]() { + mutex.Lock([&]() { + isStarted.Send(); + mainSignal.Send(); + }); + mutex.Unlock(); + }); + thread2->Start(); + isStarted.ReceiveBlocking(); + // + // Third thread enters eventually. + // + getTheMainSignal.Send(); + // No need to signal anything. The second thread must leave the lock + // without waiting on anything. + mutex.Lock([]() {}); + delete thread1; + delete thread2; + mutex.Unlock(); + } + + static void + UnitTestInterruptibleMutexStress() + { + mg::box::InterruptibleMutex mutex; + mg::box::Signal mainSignal; + mg::box::AtomicU32 counter(0); + constexpr uint32_t targetCount = 2000; + mg::box::InterruptibleMutexWakeupCallback wakeupCallback = [&]() { + mainSignal.Send(); + }; + + mg::box::ThreadFunc mainThread("mgtstmai", [&]() { + uint64_t yield = 0; + while (true) + { + if (!mutex.TryLock()) + { + if (++yield % 10000000 == 0) + mg::box::Sleep(1); + continue; + } + mainSignal.ReceiveBlocking(); + uint32_t old = counter.IncrementFetchRelaxed(); + mutex.Unlock(); + if (old >= targetCount) + break; + } + }); + + const uint32_t threadCount = 3; + std::vector threads; + threads.reserve(threadCount); + mg::box::AtomicBool isStopped(false); + for (uint32_t i = 0; i < threadCount; ++i) + { + threads.push_back(new mg::box::ThreadFunc("mgtst", [&]() { + uint64_t yield = 0; + while (!isStopped.LoadRelaxed()) + { + mutex.Lock(wakeupCallback); + mutex.Unlock(); + if (++yield % 1000 == 0) + mg::box::Sleep(1); + } + })); + } + for (mg::box::ThreadFunc* f : threads) + f->Start(); + mainThread.Start(); + uint64_t printDeadline = 0; + while (mainThread.IsRunning()) + { + uint64_t now = mg::box::GetMilliseconds(); + if (now >= printDeadline) + { + printDeadline = now + 1000; + uint32_t counterNow = counter.LoadRelaxed(); + int percent; + if (counterNow >= targetCount) + percent = 100; + else + percent = counterNow * 100 / targetCount; + Report("Progress: %d (= %u)", percent, counterNow); + } + mg::box::Sleep(10); + } + Report("Progress: 100"); + isStopped.StoreRelaxed(true); + for (mg::box::ThreadFunc* f : threads) + delete f; + } + + static void + UnitTestInterruptibleMutexStressMany() + { + mg::box::InterruptibleMutex mutex; + uint32_t counter = 0; + mg::box::AtomicBool isLocked(false); + mg::box::AtomicU32 trueCounter(0); + uint32_t wakeups = 0; + bool isInWakeup = false; + mg::box::InterruptibleMutexWakeupCallback wakeupCallback = [&]() { + TEST_CHECK(!isInWakeup); + isInWakeup = true; + ++wakeups; + TEST_CHECK(isInWakeup); + isInWakeup = false; + }; + + const uint32_t threadCount = 10; + std::vector threads; + threads.reserve(threadCount); + for (uint32_t i = 0; i < threadCount; ++i) + { + threads.push_back(new mg::box::ThreadFunc("mgtst", [&]() { + uint64_t deadline = mg::box::GetMilliseconds() + 2000; + uint64_t yield = 0; + uint32_t localCounter = 0; + while (mg::box::GetMilliseconds() < deadline) + { + ++localCounter; + + mutex.Lock(wakeupCallback); + TEST_CHECK(!isLocked.ExchangeRelaxed(true)); + counter++; + TEST_CHECK(isLocked.ExchangeRelaxed(false)); + mutex.Unlock(); + if (++yield % 1000 == 0) + mg::box::Sleep(1); + } + trueCounter.AddRelaxed(localCounter); + })); + } + for (mg::box::ThreadFunc* f : threads) + f->Start(); + for (mg::box::ThreadFunc* f : threads) + delete f; + // Flush the caches. Make sure all the operations done inside the mutex-lock are + // complete and visible in this thread. + TEST_CHECK(mutex.TryLock()); + mutex.Unlock(); + + TEST_CHECK(counter == trueCounter.LoadRelaxed()); + TEST_CHECK(wakeups <= counter); + Report("wakeups = %u", wakeups); + } + + void + UnitTestInterruptibleMutex() + { + TestSuiteGuard suite("InterruptibleMutex"); + + UnitTestInterruptibleMutexBasic(); + UnitTestInterruptibleMutexWakeupOne(); + UnitTestInterruptibleMutexWakeupTwo(); + UnitTestInterruptibleMutexStress(); + UnitTestInterruptibleMutexStressMany(); + } + +} +} +} diff --git a/test/main.cpp b/test/main.cpp index 488bbbef..85558bdf 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -42,6 +42,7 @@ namespace box { void UnitTestDoublyList(); void UnitTestError(); void UnitTestForwardList(); + void UnitTestInterruptibleMutex(); void UnitTestIOVec(); void UnitTestLog(); void UnitTestMultiConsumerQueue(); @@ -106,6 +107,7 @@ main( MG_RUN_TEST(box, UnitTestDoublyList); MG_RUN_TEST(box, UnitTestError); MG_RUN_TEST(box, UnitTestForwardList); + MG_RUN_TEST(box, UnitTestInterruptibleMutex); MG_RUN_TEST(box, UnitTestIOVec); MG_RUN_TEST(box, UnitTestLog); MG_RUN_TEST(box, UnitTestMultiConsumerQueue); diff --git a/test/sch/UnitTestTaskScheduler.cpp b/test/sch/UnitTestTaskScheduler.cpp index 2609f676..2323c4e6 100644 --- a/test/sch/UnitTestTaskScheduler.cpp +++ b/test/sch/UnitTestTaskScheduler.cpp @@ -31,7 +31,8 @@ namespace sch { { TestCaseGuard guard("Basic"); - mg::sch::TaskScheduler sched("tst", 1, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); mg::sch::TaskCallback cb; mg::sch::Task* tp; mg::box::AtomicU32 progress; @@ -124,12 +125,31 @@ namespace sch { doneCount.IncrementRelaxed(); }); { - mg::sch::TaskScheduler sched("tst", 1, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); sched.Post(&task1); } TEST_CHECK(doneCount.LoadRelaxed() == 2); } + static void + UnitTestTaskSchedulerDestroyWithWaiting() + { + TestCaseGuard guard("Destroy with waiting"); + + mg::box::AtomicU32 doneCount(0); + mg::sch::Task task([&](mg::sch::Task*) { + doneCount.IncrementRelaxed(); + }); + task.SetDelay(50); + { + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); + sched.Post(&task); + } + TEST_CHECK(doneCount.LoadRelaxed() == 1); + } + static void UnitTestTaskSchedulerOrder() { @@ -138,7 +158,8 @@ namespace sch { // Order is never guaranteed in a multi-thread system. But // at least it should be correct when the thread is just // one. - mg::sch::TaskScheduler sched("tst", 1, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); mg::sch::TaskCallback cb; mg::sch::Task t1; mg::sch::Task t2; @@ -181,7 +202,8 @@ namespace sch { // Ensure all the workers wakeup each other if necessary. // The test is called 'domino', because the worker threads // are supposed to wake each other on demand. - mg::sch::TaskScheduler sched("tst", 3, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(3); mg::sch::TaskCallback cb; mg::sch::Task t1; mg::sch::Task t2; @@ -216,7 +238,8 @@ namespace sch { { TestCaseGuard guard("Wakeup"); - mg::sch::TaskScheduler sched("tst", 1, 5); + mg::sch::TaskScheduler sched("tst", 5); + sched.Start(1); mg::sch::TaskCallback cb; mg::sch::Task t1; mg::box::AtomicU32 progress(false); @@ -313,7 +336,8 @@ namespace sch { { TestCaseGuard guard("Expiration"); - mg::sch::TaskScheduler sched("tst", 1, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(1); mg::sch::Task t1; mg::box::AtomicU32 progress; @@ -430,8 +454,10 @@ namespace sch { { TestCaseGuard guard("Reschedule"); - mg::sch::TaskScheduler sched1("tst1", 2, 100); - mg::sch::TaskScheduler sched2("tst2", 2, 100); + mg::sch::TaskScheduler sched1("tst1", 100); + sched1.Start(2); + mg::sch::TaskScheduler sched2("tst2", 100); + sched2.Start(2); mg::sch::TaskCallback cb; mg::sch::Task t1; mg::sch::Task t2; @@ -517,7 +543,8 @@ namespace sch { { TestCaseGuard guard("Signal"); - mg::sch::TaskScheduler sched("tst", 1, 2); + mg::sch::TaskScheduler sched("tst", 2); + sched.Start(1); // Signal works during execution. mg::box::AtomicU32 progress(0); @@ -613,7 +640,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine basic"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::sch::Task t; // @@ -671,7 +699,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine AsyncReceiveSignal()"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::box::AtomicBool isGuardDone(false); mg::sch::Task t; @@ -752,7 +781,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine AsyncExitDelete()"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::sch::Task* t = new mg::sch::Task(); @@ -779,7 +809,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine AsyncExitExec()"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::sch::Task t; t.SetCallback([]( @@ -840,7 +871,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine nested"); - mg::sch::TaskScheduler sched("tst", 2, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(2); mg::box::Signal s; mg::sch::Task t; t.SetCallback([]( @@ -940,8 +972,10 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine different schedulers"); - mg::sch::TaskScheduler sched1("tst1", 1, 100); - mg::sch::TaskScheduler sched2("tst2", 1, 100); + mg::sch::TaskScheduler sched1("tst1", 100); + sched1.Start(1); + mg::sch::TaskScheduler sched2("tst2", 100); + sched2.Start(1); mg::box::Signal s; mg::sch::Task t; t.SetCallback([]( @@ -995,7 +1029,8 @@ namespace sch { { #if MG_CORO_IS_ENABLED TestCaseGuard guard("Coroutine stress"); - mg::sch::TaskScheduler sched("tst", 5, 100); + mg::sch::TaskScheduler sched("tst", 100); + sched.Start(5); mg::box::Signal s; const uint32_t taskCount = 1000; const uint32_t signalCount = 10; @@ -1291,7 +1326,8 @@ namespace sch { Report("Batch test: %u threads, %u tasks, %u executes", aThreadCount, aTaskCount, aExecuteCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, aExecuteCount, &sched); ctx.CreateHeavy(); @@ -1312,7 +1348,8 @@ namespace sch { // fast is the scheduler itself, almost not affected by // the task bodies. Report("Micro test: %u threads, %u tasks", aThreadCount, aTaskCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, 1, &sched); ctx.CreateMicro(); @@ -1337,7 +1374,8 @@ namespace sch { // it is -1 virtual call compared to automatic one-shot // tasks. Report("Micro new test: %u threads, %u tasks", aThreadCount, aTaskCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); mg::box::AtomicU64 executeCount(0); mg::sch::TaskCallback cb( [&](mg::sch::Task* aTask) { @@ -1367,7 +1405,8 @@ namespace sch { // Checkout speed of one-shot tasks, which are allocated // automatically inside of the scheduler. Report("Micro one shot test: %u threads, %u tasks", aThreadCount, aTaskCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); mg::box::AtomicU64 executeCount(0); mg::sch::TaskCallbackOneShot cb([&](void) -> void { executeCount.IncrementRelaxed(); @@ -1396,7 +1435,8 @@ namespace sch { // steps, not in a single sleep-less loop. Report("Portions test: %u threads, %u tasks, %u executes", aThreadCount, aTaskCount, aExecuteCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, aExecuteCount, &sched); ctx.CreateHeavy(); @@ -1427,7 +1467,8 @@ namespace sch { uint32_t tasksPer50ms = aTaskCount / aDuration * 50; Report("Mild load test: %u threads, %u tasks, %u executes, %u per 50ms", aThreadCount, aTaskCount, aExecuteCount, tasksPer50ms); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, aExecuteCount, &sched); ctx.CreateHeavy(); @@ -1455,7 +1496,8 @@ namespace sch { // logarithmic time to remove each of them from the // waiting tasks queue's root, which is a binary heap. Report("Timeouts test: %u tasks", aTaskCount); - mg::sch::TaskScheduler sched("tst", 1, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(1); UTTSchedulerTaskCtx ctx(aTaskCount, 1, &sched); ctx.CreateMicro(); @@ -1506,7 +1548,8 @@ namespace sch { // Ensure the tasks never stuck when signals are used. Report("Signal stress test: %u threads, %u tasks, %u executes", aThreadCount, aTaskCount, aExecuteCount); - mg::sch::TaskScheduler sched("tst", aThreadCount, 5000); + mg::sch::TaskScheduler sched("tst", 5000); + sched.Start(aThreadCount); UTTSchedulerTaskCtx ctx(aTaskCount, aExecuteCount, &sched); ctx.CreateSignaled(); @@ -1530,6 +1573,7 @@ namespace sch { UnitTestTaskSchedulerBasic(); UnitTestTaskSchedulerDestroyWithFront(); + UnitTestTaskSchedulerDestroyWithWaiting(); UnitTestTaskSchedulerOrder(); UnitTestTaskSchedulerDomino(); UnitTestTaskSchedulerWakeup(); diff --git a/tla/InterruptibleMutex.cfg b/tla/InterruptibleMutex.cfg new file mode 100644 index 00000000..6438550a --- /dev/null +++ b/tla/InterruptibleMutex.cfg @@ -0,0 +1,18 @@ +INVARIANT TotalInvariant + +CONSTANTS + +TargetCount = 5 +NULL = NULL + +wid1 = wid1 +wid2 = wid2 +wid3 = wid3 +WorkerThreadIDs = {wid1, wid2, wid3} +MainThreadID = wid1 + +SYMMETRY Perms + +SPECIFICATION Spec + +CHECK_DEADLOCK TRUE diff --git a/tla/InterruptibleMutex.tla b/tla/InterruptibleMutex.tla new file mode 100644 index 00000000..ba572125 --- /dev/null +++ b/tla/InterruptibleMutex.tla @@ -0,0 +1,383 @@ +-------------------------- MODULE InterruptibleMutex --------------------------- +\* +\* Interruptible mutex. It is a construct similar to how a normal mutex + a +\* condition variable are often used. Except that the interruptible mutex is +\* more generic. It abstracts the "condition variable" into something outside of +\* the mutex itself, and which can be "woken up" (like via a callback). +\* +\* The main usecase is the TaskScheduler, where the interruptible mutex is going +\* to protect the scheduler-role, while the scheduler is sleeping on some +\* condition variable / signal / epoll / whatever else. +\* +EXTENDS TLC, Integers, Sequences + +-------------------------------------------------------------------------------- +\* +\* Constant values. +\* + +CONSTANT WorkerThreadIDs +\* One thread is "main". It won't do blocking locks. The logic is that it +\* represents actually one or more such threads, that they try to take the lock +\* periodically, and when one of them does, it goes to sleep on a condition. +\* +\* Allowing all the threads to take the lock in a blocking way eventually leads +\* to a situation when the signal is empty, one thread sleeps on a condition, +\* and all the others are waiting. This could be fixed if "wakeup" would be done +\* by each new waiter, but now it is only done on the first waiter entering the +\* queue and for the use-case (TaskScheduler) this is enough. Because its +\* workers will never take a blocking lock + sleep on the tasks. +CONSTANT MainThreadID +CONSTANT NULL + +\* Worker threads are symmetrical. Meaning that changing their places in +\* different runs won't change the result. +Perms == Permutations(WorkerThreadIDs) + +-------------------------------------------------------------------------------- +\* +\* Variables. +\* + +VARIABLE MutexState +VARIABLE MutexOwnerID +\* The interruptible mutex's internals are protected with a regular mutex. It +\* helps to synchronize certain things in case of contention. But otherwise when +\* there is no contention, the internals aren't touched. +VARIABLE InternalOwnerID +\* A list of threads waiting for the current owner of the mutex to give them the +\* ownership. +VARIABLE Waiters +VARIABLE WorkerThreads +\* Some abstract condition on which a thread can "sleep" and by which it can be +\* "woken up". +VARIABLE IsConditionSignaled + +vars == <> + +-------------------------------------------------------------------------------- +\* +\* Helper functions. +\* + +\* In TLA+ there is no convenient way to change multiple fields of a struct, +\* especially if it is in an array of other structs. Functions here help to make +\* it look sane. For that the order of arguments is inverted - the object to +\* change goes last. Thanks to that, multiple mutations can be written as +\* +\* SetField(key1, val1, +\* SetField(key2, val2, +\* SetField(key3, val3, +\* Object))) +\* +\* Almost like normal assignment. This is additionally simplified by the fact +\* that there are no struct types. The same setter can be used for any struct, +\* only member name matters. + +SetState(v, b) == [b EXCEPT !.state = v] +SetIsTaken(v, b) == [b EXCEPT !.is_taken = v] +SetHasWaiters(v, b) == [b EXCEPT !.has_waiters = v] +SetOldMutexState(v, b) == [b EXCEPT !.old_mutex_state = v] + +\* The same but for struct arrays. + +ArrLen(s) == Len(s) +ArrFirst(s) == Head(s) +ArrIsEmpty(s) == Len(s) = 0 +ArrPopHead(s) == Tail(s) +ArrSet(i, v, s) == [s EXCEPT ![i] = v] +ArrAppend(v, s) == Append(s, v) +ArrSetState(i, v, s) == ArrSet(i, SetState(v, s[i]), s) +ArrSetOldMutexState(i, v, s) == ArrSet(i, SetOldMutexState(v, s[i]), s) + +\* Constructors. + +\* It is basically a few flags which can be checked and set atomically. An +\* std::atomic_uint8 in C++. +MutexStateNew == [ + \* The mutex is owned by some thread right now. + is_taken |-> FALSE, + \* The mutex has at least one pending waiter. It means that when unlock + \* happens, the ownership must be transferred directly to the first waiter. + \* And that new attempts to try-lock the mutex must fail. + has_waiters |-> FALSE +] + +WorkerThreadNew == [ + state |-> "idle", + \* An "on stack" variable for some cases where it couldn't be used in a single + \* atomic step. + old_mutex_state |-> MutexStateNew +] + +Init == + /\ MutexState = MutexStateNew + /\ MutexOwnerID = NULL + /\ InternalOwnerID = NULL + /\ Waiters = << >> + /\ WorkerThreads = [wid \in WorkerThreadIDs |-> WorkerThreadNew] + /\ IsConditionSignaled = FALSE + +\* In the steps below the splitting is done by actions which are visible to +\* other participants. For example, blocking lock firstly takes the internal +\* mutex, and then does changes to the interruptible mutex state. Both those +\* steps are observable in other threads. Hence done as separate steps in TLA. +\* +\* Actions, which do not change globally visible data or are atomic, can be done +\* in one step. For example, an atomic increment can be done as a single step. +\* Without spitting read and write. + +-------------------------------------------------------------------------------- +\* Locking. + +\* Try to take a lock the non-blocking way. +ThreadTryLock(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "idle" + \* Is only possible when completely free. Note that even if the mutex isn't + \* taken, but has waiters, it is still not possible to just steal it with + \* try-lock at this point. Must either wait until all waiters are gone, or + \* must become one of them. + /\ ~MutexState.is_taken /\ ~MutexState.has_waiters + \* --- + /\ WorkerThreads' = ArrSetState(wid, "lock_done", WorkerThreads) + /\ MutexState' = SetIsTaken(TRUE, MutexState) + /\ UNCHANGED<> + +\* Blocking lock. +ThreadBlockingLock(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "idle" + \* Main thread can't do the blocking locks. Because if it would, it might + \* lead to a deadlock when the main thread becomes a waiter, then another + \* waiter comes, then the main thread takes the lock and goes to sleep. Then + \* nobody would call the wakeup, because the waiter list wasn't empty when + \* non-main threads stood into it. This is fixable, but isn't not worth + \* complicating the algorithm and doesn't block the mutex's main usecase in + \* TaskScheduler. + /\ wid # MainThreadID + \* Locking always tries to take the lock the fast-way. Can only do the + \* blocking lock when the fast path didn't work. + /\ MutexState.is_taken \/ MutexState.has_waiters + \* The slow path (blocking lock) requires to take the internal lock first. + /\ InternalOwnerID = NULL + \* --- + /\ InternalOwnerID' = wid + /\ WorkerThreads' = ArrSetState(wid, "lock_start", WorkerThreads) + /\ UNCHANGED<> + +ThreadLockStart(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_start" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + \* Add the waiters-flag so if the current lock-owner leaves now, it would know + \* there are waiters and would have to transfer the ownership, not just drop + \* it. + /\ MutexState' = SetHasWaiters(TRUE, MutexState) + /\ WorkerThreads' = ArrSetState(wid, "lock_check_old_state", + ArrSetOldMutexState(wid, MutexState, + WorkerThreads)) + /\ UNCHANGED<> + +ThreadLockCheckOldState(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_check_old_state" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + /\ IF ~w.old_mutex_state.is_taken THEN + \* Could happen that the owner dropped the lock before this thread managed + \* to set the waiters-flag. Then the lock is free for taking right away. + /\ Assert(MutexState.has_waiters, "has waiters") + /\ Assert(~MutexState.is_taken, "not taken") + /\ MutexState' = SetIsTaken(TRUE, MutexStateNew) + /\ WorkerThreads' = ArrSetState(wid, "lock_exit_without_waiting", + WorkerThreads) + ELSE + \* The owner was still there when the waiters-flag was added. Have to wait + \* then. + /\ WorkerThreads' = ArrSetState(wid, "lock_stand_into_queue", + WorkerThreads) + /\ UNCHANGED<> + /\ UNCHANGED<> + +ThreadLockExitWithoutWaiting(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_exit_without_waiting" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + /\ InternalOwnerID' = NULL + /\ WorkerThreads' = ArrSetState(wid, "lock_done", WorkerThreads) + /\ UNCHANGED<> + +ThreadLockStandIntoQueue(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_stand_into_queue" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + /\ IF ArrIsEmpty(Waiters) THEN + \* Wake the owner up only once. When the first waiter enters the queue. It + \* won't wakeup faster if other threads signal it more than once anyway. + /\ IsConditionSignaled' = TRUE + ELSE + /\ UNCHANGED<> + /\ Waiters' = ArrAppend(wid, Waiters) + /\ WorkerThreads' = ArrSetState(wid, "lock_stood_into_queue", WorkerThreads) + /\ UNCHANGED<> + +ThreadLockStoodIntoQueue(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "lock_stood_into_queue" + /\ Assert(InternalOwnerID = wid, "internal mutex is taken") + \* --- + /\ InternalOwnerID' = NULL + /\ WorkerThreads' = ArrSetState(wid, "lock_wait", WorkerThreads) + /\ UNCHANGED<> + +ThreadLockTake(wid) == + /\ \/ ThreadTryLock(wid) + \/ ThreadBlockingLock(wid) + \/ ThreadLockStart(wid) + \/ ThreadLockCheckOldState(wid) + \/ ThreadLockExitWithoutWaiting(wid) + \/ ThreadLockStandIntoQueue(wid) + \/ ThreadLockStoodIntoQueue(wid) + +-------------------------------------------------------------------------------- +\* The lock is taken. Do stuff while holding the lock. + +ThreadLockedSleep(wid) == + \* Only the main thread is allowed to sleep while holding the lock. All the + \* other threads must do their stuff and unlock quickly. + /\ wid = MainThreadID + /\ WorkerThreads' = ArrSetState(wid, "locked_sleep", WorkerThreads) + +ThreadLockedLeave(wid) == + /\ WorkerThreads' = ArrSetState(wid, "unlock_start", WorkerThreads) + +ThreadLockUse(wid) == + LET w == WorkerThreads[wid] IN + /\ \/ /\ w.state = "lock_done" + /\ Assert(MutexOwnerID = NULL, "no owner") + /\ MutexOwnerID' = wid + /\ \/ ThreadLockedSleep(wid) + \/ ThreadLockedLeave(wid) + /\ UNCHANGED<> + \/ /\ w.state = "locked_sleep" + /\ Assert(MutexOwnerID = wid, "this is owner") + \* Wait for a wakeup. Only then can consume it and proceed. + /\ IsConditionSignaled + /\ IsConditionSignaled' = FALSE + /\ WorkerThreads' = ArrSetState(wid, "unlock_start", WorkerThreads) + /\ UNCHANGED<> + /\ Assert(MutexState.is_taken, "state is taken") + /\ UNCHANGED<> + +-------------------------------------------------------------------------------- +\* Unlocking steps. + +ThreadUnlockStart(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_start" + \* --- + /\ Assert(MutexOwnerID = wid, "this is owner") + /\ Assert(MutexState.is_taken, "state is taken") + /\ MutexOwnerID' = NULL + /\ IF MutexState = SetIsTaken(TRUE, MutexStateNew) THEN + \* Taken, no waiters. Then just unlock and leave. + /\ MutexState' = MutexStateNew + /\ WorkerThreads' = ArrSetState(wid, "idle", WorkerThreads) + ELSE + \* There is a waiter. Need to transfer the ownership then. + /\ Assert(MutexState.is_taken, "is taken") + /\ Assert(MutexState.has_waiters, "has waiters") + /\ WorkerThreads' = ArrSetState(wid, "unlock_take_internal", WorkerThreads) + /\ UNCHANGED<> + /\ UNCHANGED<> + +ThreadUnlockTakeInternal(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_take_internal" + /\ InternalOwnerID = NULL + /\ Assert(MutexState.is_taken, "state is taken") + \* --- + /\ InternalOwnerID' = wid + /\ WorkerThreads' = ArrSetState(wid, "unlock_check_waiters_flag", + WorkerThreads) + /\ UNCHANGED<> + +ThreadUnlockCheckWaitersFlag(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_check_waiters_flag" + /\ Assert(InternalOwnerID = wid, "is internally locked") + /\ Assert(MutexState.is_taken, "state is taken") + /\ Assert(~ArrIsEmpty(Waiters), "has waiters") + \* --- + \* Remove the waiters-flag if there is just one waiter. It is going to be + \* popped now. + /\ IF ArrLen(Waiters) = 1 THEN + /\ MutexState' = SetHasWaiters(FALSE, MutexState) + ELSE + /\ UNCHANGED<> + /\ WorkerThreads' = ArrSetState(wid, "unlock_wakeup_waiter", WorkerThreads) + /\ UNCHANGED<> + +ThreadUnlockWakeupWaiter(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_wakeup_waiter" + /\ Assert(InternalOwnerID = wid, "is internally locked") + /\ Assert(MutexState.is_taken, "state is taken") + /\ Assert(~ArrIsEmpty(Waiters), "has waiters") + \* --- + /\ Waiters' = ArrPopHead(Waiters) + /\ WorkerThreads' = ArrSetState(wid, "unlock_end", + ArrSetState(ArrFirst(Waiters), "lock_done", + WorkerThreads)) + /\ UNCHANGED<> + +ThreadUnlockEnd(wid) == + LET w == WorkerThreads[wid] IN + /\ w.state = "unlock_end" + /\ Assert(InternalOwnerID = wid, "is internally locked") + \* --- + /\ InternalOwnerID' = NULL + /\ WorkerThreads' = ArrSetState(wid, "idle", WorkerThreads) + /\ UNCHANGED<> + +ThreadUnlock(wid) == + /\ \/ ThreadUnlockStart(wid) + \/ ThreadUnlockTakeInternal(wid) + \/ ThreadUnlockCheckWaitersFlag(wid) + \/ ThreadUnlockWakeupWaiter(wid) + \/ ThreadUnlockEnd(wid) + /\ UNCHANGED<> + +-------------------------------------------------------------------------------- +\* +\* Actions. +\* + +Next == + \/ \E wid \in WorkerThreadIDs: + \/ ThreadLockTake(wid) + \/ ThreadLockUse(wid) + \/ ThreadUnlock(wid) + +-------------------------------------------------------------------------------- +\* +\* Invariants. +\* + +\* All the invariants are checked via the assertions. +TotalInvariant == TRUE + +Spec == + /\ Init + /\ [][Next]_vars + /\ \A i \in WorkerThreadIDs: + /\ SF_vars(ThreadLockTake(i)) + /\ WF_vars(ThreadLockUse(i)) + /\ SF_vars(ThreadUnlock(i)) + +================================================================================ diff --git a/tla/TaskScheduler.tla b/tla/TaskScheduler.tla index e90e84a9..77d2275f 100644 --- a/tla/TaskScheduler.tla +++ b/tla/TaskScheduler.tla @@ -567,7 +567,7 @@ SchedWaitFrontSignal(wid) == /\ Assert(IsSchedulerTaken, "Is in scheduler") \* --- /\ IsFrontSignaled' = FALSE - /\ WorkerThreads' = ArrSetState(wid, "sched_check_waiting", WorkerThreads) + /\ WorkerThreads' = ArrSetState(wid, "sched_exit", WorkerThreads) /\ UNCHANGED<> \* Have tasks in the ready-queue. Exit the scheduler and go help other workers