Skip to content

Commit c17e7d5

Browse files
committed
improve front() / pop_front()
1 parent 7bedf7d commit c17e7d5

1 file changed

Lines changed: 17 additions & 15 deletions

File tree

include/sys++/ActorThread.hpp

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,11 @@ template <typename Runnable> class ActorThread
260260

261261
ActorQueue() : head(reinterpret_cast<Item*>(new Aligned<Item>)), // dummy transient placeholder
262262
tail(head.load(std::memory_order_relaxed)),
263-
count(0)
263+
count(0),
264+
lastFront(nullptr),
265+
prevFront(tail.load(std::memory_order_relaxed))
264266
{
265-
head.load(std::memory_order_relaxed)->next.store(nullptr, std::memory_order_relaxed);
267+
prevFront->next.store(nullptr, std::memory_order_relaxed);
266268
}
267269

268270
void clear() { while (front()) pop_front(); }
@@ -273,27 +275,26 @@ template <typename Runnable> class ActorThread
273275
::operator delete(head.load(std::memory_order_relaxed)); // also suitable for a dummy instance
274276
}
275277

276-
template <typename Linkable> std::size_t emplace_back(Linkable* item) // e.g. any type derived from 'Linked'
278+
template <typename Linkable> std::size_t push_back(Linkable* item) // e.g. any type derived from 'Linked'
277279
{
278280
item->next.store(nullptr, std::memory_order_relaxed);
279281
Item* back = head.exchange(item, std::memory_order_acq_rel);
280282
back->next.store(item, std::memory_order_release);
281283
return count.fetch_add(1, std::memory_order_release); // amount of queued items minus 1
282284
}
283285

284-
inline Item* front() // returns nullptr if empty
286+
inline Item* front() // returns nullptr if empty (must be always invoked just before pop_front())
285287
{
286-
return tail.load(std::memory_order_relaxed)->next.load(std::memory_order_acquire);
288+
return lastFront = prevFront->next.load(std::memory_order_acquire);
287289
}
288290

289-
inline void pop_front() // also destroys and ultimately deletes the item
291+
void pop_front() // also destroys and ultimately deletes the item
290292
{
291293
count.fetch_sub(1, std::memory_order_release);
292-
Item* top = tail.load(std::memory_order_relaxed);
293-
Item* item = top->next.load(std::memory_order_acquire);
294-
tail.store(item, std::memory_order_release);
295-
item->~Item(); // (atomic destructor is trivial) memory deletion is actually deferred one step behind
296-
::operator delete(top); // delete the *previous* item memory no longer needed
294+
tail.store(lastFront, std::memory_order_release);
295+
lastFront->~Item(); // (atomic destructor is trivial) memory deletion is actually deferred one step behind
296+
::operator delete(prevFront); // delete the *previous* item memory no longer needed
297+
prevFront = lastFront;
297298
}
298299

299300
inline std::size_t size() const { return count.load(std::memory_order_acquire); }
@@ -305,6 +306,8 @@ template <typename Runnable> class ActorThread
305306
std::atomic<Item*> head; // the stored objects hold the linked list pointers (single memory allocation)
306307
std::atomic<Item*> tail;
307308
std::atomic<std::size_t> count;
309+
Item* lastFront;
310+
Item* prevFront;
308311
};
309312

310313
struct ActorParcel : public ActorQueue<ActorParcel>::Linked
@@ -427,7 +430,7 @@ template <typename Runnable> class ActorThread
427430
{
428431
auto& mbox = HighPri? mboxHighPri : mboxNormPri;
429432
if (!dispatching) return; // don't store anything in a frozen queue
430-
bool isIdle = !mbox.emplace_back(new Parcelable(std::forward<Any>(msg)));
433+
bool isIdle = mbox.push_back(new Parcelable(std::forward<Any>(msg))) == 0;
431434
if (HighPri) mboxPaused = false;
432435
if (!isIdle) return;
433436
std::lock_guard<std::mutex> lock(mtx);
@@ -466,9 +469,8 @@ template <typename Runnable> class ActorThread
466469
{
467470
bool hasHigh = !mboxHighPri.empty();
468471
bool hasNorm = !mboxNormPri.empty();
469-
bool isPaused = mboxPaused;
470472

471-
if (!isPaused && (hasHigh || hasNorm)) // consume the messages queue
473+
if (!mboxPaused && (hasHigh || hasNorm)) // consume the messages queue
472474
{
473475
auto& mbox = hasHigh? mboxHighPri : mboxNormPri;
474476
try
@@ -477,7 +479,7 @@ template <typename Runnable> class ActorThread
477479
{
478480
msg->deliverTo(runnable);
479481
mbox.pop_front();
480-
if ((++burst % 16) == 0)
482+
if ((++burst % 64) == 0)
481483
{
482484
if (externalDispatcher) // do not monopolize the CPU on this dispatcher
483485
{

0 commit comments

Comments
 (0)