Skip to content

Commit 7bedf7d

Browse files
committed
lock free queue
1 parent 7f1395e commit 7bedf7d

1 file changed

Lines changed: 91 additions & 41 deletions

File tree

include/sys++/ActorThread.hpp

Lines changed: 91 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,12 @@
2424
#include <utility>
2525
#include <cstddef>
2626
#include <type_traits>
27-
#include <tuple>
2827
#include <thread>
2928
#include <mutex>
3029
#include <condition_variable>
30+
#include <atomic>
3131
#include <chrono>
3232
#include <stdexcept>
33-
#include <deque>
3433
#include <set>
3534
#include <map>
3635

@@ -89,7 +88,6 @@ template <typename Runnable> class ActorThread
8988

9089
std::size_t pendingMessages() const // amount of undispatched messages in the active object
9190
{
92-
std::lock_guard<std::mutex> ulock(mtx);
9391
return mboxNormPri.size() + mboxHighPri.size();
9492
}
9593

@@ -108,7 +106,6 @@ template <typename Runnable> class ActorThread
108106

109107
bool exiting() const // mostly to allow active objects running intensive jobs to poll for a shutdown request
110108
{
111-
std::lock_guard<std::mutex> ulock(mtx);
112109
return !dispatching;
113110
}
114111

@@ -242,8 +239,8 @@ template <typename Runnable> class ActorThread
242239
void handleActorEvents() // this must be invoked from the external dispatcher as specified above
243240
{
244241
auto status = eventsLoop();
245-
if (std::get<1>(status))
246-
static_cast<Runnable*>(this)->onWaitingTimer(std::get<2>(status));
242+
if (status.first)
243+
static_cast<Runnable*>(this)->onWaitingTimer(status.second);
247244
else
248245
static_cast<Runnable*>(this)->onWaitingTimerCancel();
249246
}
@@ -253,7 +250,64 @@ template <typename Runnable> class ActorThread
253250
ActorThread& operator=(const ActorThread&) = delete;
254251
ActorThread(const ActorThread&) = delete;
255252

256-
struct ActorParcel
253+
template <typename Item> class ActorQueue // FIFO (based on the MPSC queue at https://github.com/mstump/queues)
254+
{
255+
template <typename T> using Aligned = typename std::aligned_storage<sizeof(T),std::alignment_of<T>::value>::type;
256+
257+
public:
258+
259+
struct Linked { std::atomic<Item*> next; };
260+
261+
ActorQueue() : head(reinterpret_cast<Item*>(new Aligned<Item>)), // dummy transient placeholder
262+
tail(head.load(std::memory_order_relaxed)),
263+
count(0)
264+
{
265+
head.load(std::memory_order_relaxed)->next.store(nullptr, std::memory_order_relaxed);
266+
}
267+
268+
void clear() { while (front()) pop_front(); }
269+
270+
~ActorQueue()
271+
{
272+
clear();
273+
::operator delete(head.load(std::memory_order_relaxed)); // also suitable for a dummy instance
274+
}
275+
276+
template <typename Linkable> std::size_t emplace_back(Linkable* item) // e.g. any type derived from 'Linked'
277+
{
278+
item->next.store(nullptr, std::memory_order_relaxed);
279+
Item* back = head.exchange(item, std::memory_order_acq_rel);
280+
back->next.store(item, std::memory_order_release);
281+
return count.fetch_add(1, std::memory_order_release); // amount of queued items minus 1
282+
}
283+
284+
inline Item* front() // returns nullptr if empty
285+
{
286+
return tail.load(std::memory_order_relaxed)->next.load(std::memory_order_acquire);
287+
}
288+
289+
inline void pop_front() // also destroys and ultimately deletes the item
290+
{
291+
count.fetch_sub(1, std::memory_order_release);
292+
Item* top = tail.load(std::memory_order_relaxed);
293+
Item* item = top->next.load(std::memory_order_acquire);
294+
tail.store(item, std::memory_order_release);
295+
item->~Item(); // (atomic destructor is trivial) memory deletion is actually deferred one step behind
296+
::operator delete(top); // delete the *previous* item memory no longer needed
297+
}
298+
299+
inline std::size_t size() const { return count.load(std::memory_order_acquire); }
300+
301+
inline bool empty() const { return !size(); }
302+
303+
private:
304+
305+
std::atomic<Item*> head; // the stored objects hold the linked list pointers (single memory allocation)
306+
std::atomic<Item*> tail;
307+
std::atomic<std::size_t> count;
308+
};
309+
310+
struct ActorParcel : public ActorQueue<ActorParcel>::Linked
257311
{
258312
virtual ~ActorParcel() {}
259313
virtual void deliverTo(Runnable* instance) = 0;
@@ -372,12 +426,11 @@ template <typename Runnable> class ActorThread
372426
template <typename Parcelable, bool HighPri, typename Any> void post(Any&& msg) // runs on the calling thread
373427
{
374428
auto& mbox = HighPri? mboxHighPri : mboxNormPri;
375-
std::lock_guard<std::mutex> lock(mtx);
376429
if (!dispatching) return; // don't store anything in a frozen queue
377-
bool isIdle = mbox.empty();
378-
mbox.emplace_back(new Parcelable(std::forward<Any>(msg)));
430+
bool isIdle = !mbox.emplace_back(new Parcelable(std::forward<Any>(msg)));
379431
if (HighPri) mboxPaused = false;
380432
if (!isIdle) return;
433+
std::lock_guard<std::mutex> lock(mtx);
381434
messageWaiter.notify_one();
382435
static_cast<Runnable*>(this)->onWaitingEvents();
383436
}
@@ -390,10 +443,9 @@ template <typename Runnable> class ActorThread
390443
for (;;)
391444
{
392445
burst = 0;
393-
auto status = eventsLoop();
394-
if (!std::get<0>(status)) break; // dispatching == false
446+
eventsLoop();
447+
if (!dispatching) break;
395448
runnable->onDispatching();
396-
std::lock_guard<std::mutex> lock(mtx);
397449
externalDispatcher = false;
398450
}
399451
runnable->onStop();
@@ -404,13 +456,13 @@ template <typename Runnable> class ActorThread
404456

405457
void retryMbox(const DispatchRetry&) { mboxPaused = false; }
406458

407-
std::tuple<bool, bool, TimerClock::duration> eventsLoop()
459+
std::pair<bool, TimerClock::duration> eventsLoop()
408460
{
409461
bool haveTimerLapse = false;
410462
TimerClock::duration timerLapse;
411463
Runnable* runnable = static_cast<Runnable*>(this);
412-
std::unique_lock<std::mutex> ulock(mtx);
413-
while (dispatching)
464+
bool mustDispatch = true;
465+
while (dispatching && mustDispatch)
414466
{
415467
bool hasHigh = !mboxHighPri.empty();
416468
bool hasNorm = !mboxNormPri.empty();
@@ -419,36 +471,35 @@ template <typename Runnable> class ActorThread
419471
if (!isPaused && (hasHigh || hasNorm)) // consume the messages queue
420472
{
421473
auto& mbox = hasHigh? mboxHighPri : mboxNormPri;
422-
423474
try
424475
{
425-
auto& msg = mbox.front(); // queue iterator valid through insertions (but thread-unsafe call)
426-
ulock.unlock();
427-
msg->deliverTo(runnable);
428-
msg.reset(); // delete the argument before getting the lock (prevent a self-lock
429-
ulock.lock(); // if that object sends a message to this thread from its destructor)
430-
mbox.pop_front();
476+
while (ActorParcel* msg = mbox.front())
477+
{
478+
msg->deliverTo(runnable);
479+
mbox.pop_front();
480+
if ((++burst % 16) == 0)
481+
{
482+
if (externalDispatcher) // do not monopolize the CPU on this dispatcher
483+
{
484+
runnable->onWaitingEvents(); // queue a resume request
485+
mustDispatch = false;
486+
}
487+
break; // keep an eye on the timers
488+
}
489+
}
431490
}
432491
catch (const DispatchRetry& retry)
433492
{
434493
auto event = Channel<const DispatchRetry>([this](const DispatchRetry& dr) { retryMbox(dr); });
435494
timerStart(retry, retry.retryInterval, std::move(event));
436-
ulock.lock();
437495
mboxPaused = true;
438496
}
439-
440-
if (externalDispatcher && ((++burst % 64) == 0)) // do not monopolize the CPU on this dispatcher
441-
{
442-
runnable->onWaitingEvents(); // queue a resume request
443-
break;
444-
}
445497
}
446498

447-
ulock.unlock();
448499
auto firstTimer = timers.cbegin();
449500
if (firstTimer == timers.cend())
450501
{
451-
ulock.lock();
502+
std::unique_lock<std::mutex> ulock(mtx);
452503
if (mboxNormPri.empty() && mboxHighPri.empty() && dispatching)
453504
{
454505
idleWaiter.notify_all();
@@ -463,11 +514,10 @@ template <typename Runnable> class ActorThread
463514
{
464515
auto timerEvent = *firstTimer; // this shared_ptr keeps it alive when self-removed from the set
465516
timerEvent->deliverTo(runnable); // here it could be self-removed (timerStop)
466-
ulock.lock();
467517
}
468518
else // the other timers are scheduled even further
469519
{
470-
ulock.lock();
520+
std::unique_lock<std::mutex> ulock(mtx);
471521
if (dispatching && mboxHighPri.empty() && (mboxNormPri.empty() || mboxPaused))
472522
{
473523
idleWaiter.notify_all();
@@ -482,7 +532,7 @@ template <typename Runnable> class ActorThread
482532
}
483533
}
484534
}
485-
return std::make_tuple(dispatching, haveTimerLapse, timerLapse); // take advantage of the acquired lock
535+
return std::make_pair(haveTimerLapse, timerLapse);
486536
}
487537

488538
template <typename Key> struct ActorPointedKeyComparator
@@ -493,19 +543,19 @@ template <typename Runnable> class ActorThread
493543
}
494544
};
495545

496-
bool dispatching;
497-
bool externalDispatcher;
498-
bool detached;
546+
std::atomic<bool> dispatching;
547+
std::atomic<bool> externalDispatcher;
548+
std::atomic<bool> detached;
499549
mutable std::weak_ptr<Runnable> weak_this;
500550
std::thread runner;
501551
std::thread::id id;
502552
int exitCode;
503553
mutable std::mutex mtx;
504554
std::condition_variable messageWaiter;
505555
std::condition_variable idleWaiter;
506-
std::deque<std::unique_ptr<ActorParcel>> mboxNormPri;
507-
std::deque<std::unique_ptr<ActorParcel>> mboxHighPri;
508-
bool mboxPaused;
556+
ActorQueue<ActorParcel> mboxNormPri;
557+
ActorQueue<ActorParcel> mboxHighPri;
558+
std::atomic<bool> mboxPaused;
509559
uint16_t burst;
510560
std::set<std::shared_ptr<ActorTimer>, ActorPointedKeyComparator<ActorTimer>> timers; // ordered by deadline
511561
};

0 commit comments

Comments
 (0)