2424#include < utility>
2525#include < cstddef>
2626#include < type_traits>
27- #include < tuple>
2827#include < thread>
2928#include < mutex>
3029#include < condition_variable>
30+ #include < atomic>
3131#include < chrono>
3232#include < stdexcept>
33- #include < deque>
3433#include < set>
3534#include < map>
3635
@@ -89,7 +88,6 @@ template <typename Runnable> class ActorThread
8988
9089 std::size_t pendingMessages () const // amount of undispatched messages in the active object
9190 {
92- std::lock_guard<std::mutex> ulock (mtx);
9391 return mboxNormPri.size () + mboxHighPri.size ();
9492 }
9593
@@ -108,7 +106,6 @@ template <typename Runnable> class ActorThread
108106
109107 bool exiting () const // mostly to allow active objects running intensive jobs to poll for a shutdown request
110108 {
111- std::lock_guard<std::mutex> ulock (mtx);
112109 return !dispatching;
113110 }
114111
@@ -242,8 +239,8 @@ template <typename Runnable> class ActorThread
242239 void handleActorEvents () // this must be invoked from the external dispatcher as specified above
243240 {
244241 auto status = eventsLoop ();
245- if (std::get< 1 >( status) )
246- static_cast <Runnable*>(this )->onWaitingTimer (std::get< 2 >( status) );
242+ if (status. first )
243+ static_cast <Runnable*>(this )->onWaitingTimer (status. second );
247244 else
248245 static_cast <Runnable*>(this )->onWaitingTimerCancel ();
249246 }
@@ -253,7 +250,64 @@ template <typename Runnable> class ActorThread
253250 ActorThread& operator =(const ActorThread&) = delete ;
254251 ActorThread (const ActorThread&) = delete ;
255252
256- struct ActorParcel
253+ template <typename Item> class ActorQueue // FIFO (based on the MPSC queue at https://github.com/mstump/queues)
254+ {
255+ template <typename T> using Aligned = typename std::aligned_storage<sizeof (T),std::alignment_of<T>::value>::type;
256+
257+ public:
258+
259+ struct Linked { std::atomic<Item*> next; };
260+
261+ ActorQueue () : head(reinterpret_cast <Item*>(new Aligned<Item>)), // dummy transient placeholder
262+ tail (head.load(std::memory_order_relaxed)),
263+ count(0 )
264+ {
265+ head.load (std::memory_order_relaxed)->next .store (nullptr , std::memory_order_relaxed);
266+ }
267+
268+ void clear () { while (front ()) pop_front (); }
269+
270+ ~ActorQueue ()
271+ {
272+ clear ();
273+ ::operator delete (head.load (std::memory_order_relaxed)); // also suitable for a dummy instance
274+ }
275+
276+ template <typename Linkable> std::size_t emplace_back (Linkable* item) // e.g. any type derived from 'Linked'
277+ {
278+ item->next .store (nullptr , std::memory_order_relaxed);
279+ Item* back = head.exchange (item, std::memory_order_acq_rel);
280+ back->next .store (item, std::memory_order_release);
281+ return count.fetch_add (1 , std::memory_order_release); // amount of queued items minus 1
282+ }
283+
284+ inline Item* front () // returns nullptr if empty
285+ {
286+ return tail.load (std::memory_order_relaxed)->next .load (std::memory_order_acquire);
287+ }
288+
289+ inline void pop_front () // also destroys and ultimately deletes the item
290+ {
291+ count.fetch_sub (1 , std::memory_order_release);
292+ Item* top = tail.load (std::memory_order_relaxed);
293+ Item* item = top->next .load (std::memory_order_acquire);
294+ tail.store (item, std::memory_order_release);
295+ item->~Item (); // (atomic destructor is trivial) memory deletion is actually deferred one step behind
296+ ::operator delete (top); // delete the *previous* item memory no longer needed
297+ }
298+
299+ inline std::size_t size () const { return count.load (std::memory_order_acquire); }
300+
301+ inline bool empty () const { return !size (); }
302+
303+ private:
304+
305+ std::atomic<Item*> head; // the stored objects hold the linked list pointers (single memory allocation)
306+ std::atomic<Item*> tail;
307+ std::atomic<std::size_t > count;
308+ };
309+
310+ struct ActorParcel : public ActorQueue <ActorParcel>::Linked
257311 {
258312 virtual ~ActorParcel () {}
259313 virtual void deliverTo (Runnable* instance) = 0;
@@ -372,12 +426,11 @@ template <typename Runnable> class ActorThread
372426 template <typename Parcelable, bool HighPri, typename Any> void post (Any&& msg) // runs on the calling thread
373427 {
374428 auto & mbox = HighPri? mboxHighPri : mboxNormPri;
375- std::lock_guard<std::mutex> lock (mtx);
376429 if (!dispatching) return ; // don't store anything in a frozen queue
377- bool isIdle = mbox.empty ();
378- mbox.emplace_back (new Parcelable (std::forward<Any>(msg)));
430+ bool isIdle = !mbox.emplace_back (new Parcelable (std::forward<Any>(msg)));
379431 if (HighPri) mboxPaused = false ;
380432 if (!isIdle) return ;
433+ std::lock_guard<std::mutex> lock (mtx);
381434 messageWaiter.notify_one ();
382435 static_cast <Runnable*>(this )->onWaitingEvents ();
383436 }
@@ -390,10 +443,9 @@ template <typename Runnable> class ActorThread
390443 for (;;)
391444 {
392445 burst = 0 ;
393- auto status = eventsLoop ();
394- if (!std::get< 0 >(status)) break ; // dispatching == false
446+ eventsLoop ();
447+ if (!dispatching) break ;
395448 runnable->onDispatching ();
396- std::lock_guard<std::mutex> lock (mtx);
397449 externalDispatcher = false ;
398450 }
399451 runnable->onStop ();
@@ -404,13 +456,13 @@ template <typename Runnable> class ActorThread
404456
405457 void retryMbox (const DispatchRetry&) { mboxPaused = false ; }
406458
407- std::tuple< bool , bool , TimerClock::duration> eventsLoop ()
459+ std::pair< bool , TimerClock::duration> eventsLoop ()
408460 {
409461 bool haveTimerLapse = false ;
410462 TimerClock::duration timerLapse;
411463 Runnable* runnable = static_cast <Runnable*>(this );
412- std::unique_lock<std::mutex> ulock (mtx) ;
413- while (dispatching)
464+ bool mustDispatch = true ;
465+ while (dispatching && mustDispatch )
414466 {
415467 bool hasHigh = !mboxHighPri.empty ();
416468 bool hasNorm = !mboxNormPri.empty ();
@@ -419,36 +471,35 @@ template <typename Runnable> class ActorThread
419471 if (!isPaused && (hasHigh || hasNorm)) // consume the messages queue
420472 {
421473 auto & mbox = hasHigh? mboxHighPri : mboxNormPri;
422-
423474 try
424475 {
425- auto & msg = mbox.front (); // queue iterator valid through insertions (but thread-unsafe call)
426- ulock.unlock ();
427- msg->deliverTo (runnable);
428- msg.reset (); // delete the argument before getting the lock (prevent a self-lock
429- ulock.lock (); // if that object sends a message to this thread from its destructor)
430- mbox.pop_front ();
476+ while (ActorParcel* msg = mbox.front ())
477+ {
478+ msg->deliverTo (runnable);
479+ mbox.pop_front ();
480+ if ((++burst % 16 ) == 0 )
481+ {
482+ if (externalDispatcher) // do not monopolize the CPU on this dispatcher
483+ {
484+ runnable->onWaitingEvents (); // queue a resume request
485+ mustDispatch = false ;
486+ }
487+ break ; // keep an eye on the timers
488+ }
489+ }
431490 }
432491 catch (const DispatchRetry& retry)
433492 {
434493 auto event = Channel<const DispatchRetry>([this ](const DispatchRetry& dr) { retryMbox (dr); });
435494 timerStart (retry, retry.retryInterval , std::move (event));
436- ulock.lock ();
437495 mboxPaused = true ;
438496 }
439-
440- if (externalDispatcher && ((++burst % 64 ) == 0 )) // do not monopolize the CPU on this dispatcher
441- {
442- runnable->onWaitingEvents (); // queue a resume request
443- break ;
444- }
445497 }
446498
447- ulock.unlock ();
448499 auto firstTimer = timers.cbegin ();
449500 if (firstTimer == timers.cend ())
450501 {
451- ulock. lock ( );
502+ std::unique_lock<std::mutex> ulock (mtx );
452503 if (mboxNormPri.empty () && mboxHighPri.empty () && dispatching)
453504 {
454505 idleWaiter.notify_all ();
@@ -463,11 +514,10 @@ template <typename Runnable> class ActorThread
463514 {
464515 auto timerEvent = *firstTimer; // this shared_ptr keeps it alive when self-removed from the set
465516 timerEvent->deliverTo (runnable); // here it could be self-removed (timerStop)
466- ulock.lock ();
467517 }
468518 else // the other timers are scheduled even further
469519 {
470- ulock. lock ( );
520+ std::unique_lock<std::mutex> ulock (mtx );
471521 if (dispatching && mboxHighPri.empty () && (mboxNormPri.empty () || mboxPaused))
472522 {
473523 idleWaiter.notify_all ();
@@ -482,7 +532,7 @@ template <typename Runnable> class ActorThread
482532 }
483533 }
484534 }
485- return std::make_tuple (dispatching, haveTimerLapse, timerLapse); // take advantage of the acquired lock
535+ return std::make_pair ( haveTimerLapse, timerLapse);
486536 }
487537
488538 template <typename Key> struct ActorPointedKeyComparator
@@ -493,19 +543,19 @@ template <typename Runnable> class ActorThread
493543 }
494544 };
495545
496- bool dispatching;
497- bool externalDispatcher;
498- bool detached;
546+ std::atomic< bool > dispatching;
547+ std::atomic< bool > externalDispatcher;
548+ std::atomic< bool > detached;
499549 mutable std::weak_ptr<Runnable> weak_this;
500550 std::thread runner;
501551 std::thread::id id;
502552 int exitCode;
503553 mutable std::mutex mtx;
504554 std::condition_variable messageWaiter;
505555 std::condition_variable idleWaiter;
506- std::deque<std::unique_ptr< ActorParcel> > mboxNormPri;
507- std::deque<std::unique_ptr< ActorParcel> > mboxHighPri;
508- bool mboxPaused;
556+ ActorQueue< ActorParcel> mboxNormPri;
557+ ActorQueue< ActorParcel> mboxHighPri;
558+ std::atomic< bool > mboxPaused;
509559 uint16_t burst;
510560 std::set<std::shared_ptr<ActorTimer>, ActorPointedKeyComparator<ActorTimer>> timers; // ordered by deadline
511561};
0 commit comments