forked from Abc-Arbitrage/Disruptor-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRingBuffer.h
More file actions
678 lines (603 loc) · 24.1 KB
/
RingBuffer.h
File metadata and controls
678 lines (603 loc) · 24.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
#pragma once
#include <ostream>
#include <type_traits>
#include "Disruptor/ArgumentException.h"
#include "Disruptor/ArgumentOutOfRangeException.h"
#include "Disruptor/BlockingWaitStrategy.h"
#include "Disruptor/ICursored.h"
#include "Disruptor/IEventSequencer.h"
#include "Disruptor/IEventTranslator.h"
#include "Disruptor/IEventTranslatorVararg.h"
#include "Disruptor/ISequenceBarrier.h"
#include "Disruptor/ISequencer.h"
#include "Disruptor/MultiProducerSequencer.h"
#include "Disruptor/ProducerType.h"
#include "Disruptor/SingleProducerSequencer.h"
#include "Disruptor/Util.h"
namespace Disruptor
{
/**
* Ring based store of reusable entries containing the data representing an event being exchanged between event publisher and IEventProcessors.
*
* \tparam T implementation storing the data for sharing during exchange or parallel coordination of an event.
*/
template <class T>
class RingBuffer : public IEventSequencer< T >, public ICursored, public std::enable_shared_from_this< RingBuffer< T > >
{
static_assert(std::is_class< T >::value, "T should be a class");
static const std::int32_t m_bufferPad = 128 / sizeof(int*);
template <class TTranslators>
struct DeduceTranslatorFromContainer
{
private:
typedef typename std::decay< TTranslators >::type ContainerType;
typedef typename ContainerType::value_type ValueType;
public:
using Type = typename ValueType::element_type;
};
template <class... TItem>
static std::int32_t getGreatestLength(const std::initializer_list< TItem >&... l)
{
const std::size_t lengths[] = { l.size()... };
auto length = std::numeric_limits< std::size_t >::min();
for (auto i = 0u; i < sizeof...(l); ++i)
{
if (lengths[i] > length)
length = lengths[i];
}
return static_cast< std::int32_t >(length);
}
public:
/**
* Construct a RingBuffer with the full option set.
*
* \param eventFactory eventFactory to create entries for filling the RingBuffer
* \param sequencer waiting strategy employed by processorsToTrack waiting on entries becoming available.
*/
RingBuffer(const std::function< T() >& eventFactory, const std::shared_ptr< ISequencer< T > >& sequencer)
{
m_sequencer = sequencer;
m_bufferSize = sequencer->bufferSize();
if (m_bufferSize < 1)
{
DISRUPTOR_THROW_ARGUMENT_EXCEPTION("bufferSize must not be less than 1");
}
if (Util::ceilingNextPowerOfTwo(m_bufferSize) != m_bufferSize)
{
DISRUPTOR_THROW_ARGUMENT_EXCEPTION("bufferSize must be a power of 2");
}
m_indexMask = m_bufferSize - 1;
m_entries.resize(m_bufferSize + 2 * m_bufferPad);
fill(eventFactory);
}
/**
* Construct a RingBuffer with default strategies of: MultiThreadedLowContentionClaimStrategy and BlockingWaitStrategy
*
* \param eventFactory eventFactory to create entries for filling the RingBuffer
* \param bufferSize number of elements to create within the ring buffer.
*/
RingBuffer(const std::function< T() >& eventFactory, std::int32_t bufferSize)
: RingBuffer(eventFactory, std::make_shared< MultiProducerSequencer< T > >(bufferSize, std::make_shared< BlockingWaitStrategy >()))
{
}
void fill(const std::function< T() >& eventFactory)
{
for (std::int32_t i = 0; i < m_bufferSize; ++i)
{
m_entries[m_bufferPad + i] = eventFactory();
}
}
/**
* Create a new multiple producer RingBuffer using the default wait strategy BlockingWaitStrategy
*
* \param factory used to create the events within the ring buffer.
* \param bufferSize number of elements to create within the ring buffer.
* \param waitStrategy used to determine how to wait for new elements to become available.
*
*/
static std::shared_ptr< RingBuffer< T > > createMultiProducer(const std::function< T() >& factory, std::int32_t bufferSize, const std::shared_ptr< IWaitStrategy >& waitStrategy)
{
return std::make_shared< RingBuffer< T > >(factory, std::make_shared< MultiProducerSequencer< T > >(bufferSize, waitStrategy));
}
/**
*
* \param factory
* \param bufferSize
*
*/
static std::shared_ptr< RingBuffer< T > > createMultiProducer(const std::function< T() >& factory, std::int32_t bufferSize)
{
return createMultiProducer(factory, bufferSize, std::make_shared< BlockingWaitStrategy >());
}
/**
* Create a new single producer RingBuffer with the specified wait strategy.
*
* \param factory used to create the events within the ring buffer.
* \param bufferSize number of elements to create within the ring buffer.
* \param waitStrategy used to determine how to wait for new elements to become available.
*
*/
static std::shared_ptr< RingBuffer< T > > createSingleProducer(const std::function< T() >& factory, std::int32_t bufferSize, const std::shared_ptr< IWaitStrategy >& waitStrategy)
{
return std::make_shared< RingBuffer< T > >(factory, std::make_shared< SingleProducerSequencer< T > >(bufferSize, waitStrategy));
}
/**
* Create a new single producer RingBuffer using the default wait strategy BlockingWaitStrategy
*
* \param factory used to create the events within the ring buffer.
* \param bufferSize number of elements to create within the ring buffer.
*
*/
static std::shared_ptr< RingBuffer< T > > createSingleProducer(const std::function< T() >& factory, std::int32_t bufferSize)
{
return createSingleProducer(factory, bufferSize, std::make_shared< BlockingWaitStrategy >());
}
/**
* Create a new Ring Buffer with the specified producer type (SINGLE or MULTI)
*
* \param producerType producer type to use<see cref="ProducerType"/>
* \param factory used to create the events within the ring buffer.
* \param bufferSize number of elements to create within the ring buffer.
* \param waitStrategy used to determine how to wait for new elements to become available.
*
*/
static std::shared_ptr< RingBuffer< T > > create(ProducerType producerType, const std::function< T() >& factory, std::int32_t bufferSize, const std::shared_ptr< IWaitStrategy >& waitStrategy)
{
switch (producerType)
{
case ProducerType::Single:
return createSingleProducer(factory, bufferSize, waitStrategy);
case ProducerType::Multi:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
DISRUPTOR_THROW_ARGUMENT_OUT_OF_RANGE_EXCEPTION(producerType);
}
}
/**
* Get the event for a given sequence in the RingBuffer.
*
* \param sequence sequence for the event
*/
T& operator[](std::int64_t sequence) const override
{
return m_entries[m_bufferPad + (static_cast< std::int32_t >(sequence) & m_indexMask)];
}
std::int32_t bufferSize() override
{
return m_bufferSize;
}
bool hasAvailableCapacity(std::int32_t requiredCapacity) override
{
return m_sequencer->hasAvailableCapacity(requiredCapacity);
}
std::int64_t next() override
{
return m_sequencer->next();
}
std::int64_t next(std::int32_t n) override
{
return m_sequencer->next(n);
}
std::int64_t tryNext() override
{
return m_sequencer->tryNext();
}
std::int64_t tryNext(std::int32_t n) override
{
return m_sequencer->tryNext(n);
}
/**
* Get the current cursor value for the ring buffer. The actual value received will depend on the type of ISequencer that is being used.
*/
std::int64_t cursor() const override
{
return m_sequencer->cursor();
}
/**
* Get the remaining capacity for this ringBuffer.
*
* \returns The number of slots remaining.
*/
std::int64_t getRemainingCapacity() override
{
return m_sequencer->getRemainingCapacity();
}
void publish(std::int64_t sequence) override
{
m_sequencer->publish(sequence);
}
/**
* Publish the specified sequences. This action marks these particular messages as being available to be read.
*
* \param lo the lowest sequence number to be published
* \param hi the highest sequence number to be published
*/
void publish(std::int64_t lo, std::int64_t hi) override
{
m_sequencer->publish(lo, hi);
}
//deprecated
void resetTo(std::int64_t sequence)
{
m_sequencer->claim(sequence);
m_sequencer->publish(sequence);
}
T& claimAndGetPreallocated(std::int64_t sequence)
{
m_sequencer->claim(sequence);
return (*this)[sequence];
}
bool isPublished(std::int64_t sequence)
{
return m_sequencer->isAvailable(sequence);
}
void addGatingSequences(const std::vector< std::shared_ptr< ISequence > >& gatingSequences)
{
m_sequencer->addGatingSequences(gatingSequences);
}
std::int64_t getMinimumGatingSequence()
{
return m_sequencer->getMinimumSequence();
}
/**
* Remove the specified sequence from this ringBuffer.
*
* \param sequence sequence to be removed.
* \returns true if this sequence was found, false otherwise.
*/
bool removeGatingSequence(const std::shared_ptr< ISequence >& sequence)
{
return m_sequencer->removeGatingSequence(sequence);
}
/**
* Create a new SequenceBarrier to be used by an EventProcessor to track which messages are available to be read from the ring buffer given a list of sequences to track.
*
* \param sequencesToTrack the additional sequences to track
* \returns A sequence barrier that will track the specified sequences.
*/
std::shared_ptr< ISequenceBarrier > newBarrier(const std::vector< std::shared_ptr< ISequence > >& sequencesToTrack = {})
{
return m_sequencer->newBarrier(sequencesToTrack);
}
/**
* Creates an event poller for this ring buffer gated on the supplied sequences.
*
* \param gatingSequences
* \returns A poller that will gate on this ring buffer and the supplied sequences.
*/
std::shared_ptr< EventPoller< T > > newPoller(const std::vector< std::shared_ptr< ISequence > >& gatingSequences = {})
{
return m_sequencer->newPoller(this->shared_from_this(), gatingSequences);
}
template <class TTranslator>
typename std::enable_if
<
std::is_base_of< IEventTranslator< T >, TTranslator >::value
>
::type publishEvent(const std::shared_ptr< TTranslator >& translator)
{
auto sequence = m_sequencer->next();
translateAndPublish(translator, sequence);
}
template <class TTranslator>
typename std::enable_if
<
std::is_base_of< IEventTranslator< T >, TTranslator >::value,
bool
>
::type tryPublishEvent(const std::shared_ptr< TTranslator >& translator)
{
try
{
auto sequence = m_sequencer->tryNext();
translateAndPublish(translator, sequence);
return true;
}
catch (InsufficientCapacityException&)
{
return false;
}
}
template <class TTranslator, class... TArgs>
typename std::enable_if
<
std::is_base_of< IEventTranslatorVararg< T, TArgs... >, TTranslator >::value
>
::type publishEvent(const std::shared_ptr< TTranslator >& translator, const TArgs&... args)
{
auto sequence = m_sequencer->next();
translateAndPublish(translator, sequence, args...);
}
template <class TTranslator, class... TArgs>
typename std::enable_if
<
std::is_base_of< IEventTranslatorVararg< T, TArgs... >, TTranslator >::value,
bool
>
::type tryPublishEvent(const std::shared_ptr< TTranslator >& translator, const TArgs&... args)
{
try
{
auto sequence = m_sequencer->tryNext();
translateAndPublish(translator, sequence, args...);
return true;
}
catch (InsufficientCapacityException&)
{
return false;
}
}
template <class TTranslators, class = decltype(std::declval< TTranslators >().begin())>
typename std::enable_if
<
std::is_base_of
<
IEventTranslator< T >,
typename DeduceTranslatorFromContainer< TTranslators >::Type
>::value
>
::type publishEvents(const TTranslators& translators)
{
publishEvents(translators, 0, static_cast< std::int32_t >(translators.size()));
}
template <class TTranslators, class = decltype(std::declval< TTranslators >().begin())>
typename std::enable_if
<
std::is_base_of
<
IEventTranslator< T >,
typename DeduceTranslatorFromContainer< TTranslators >::Type
>::value
>
::type publishEvents(const TTranslators& translators, std::int32_t batchStartsAt, std::int32_t batchSize)
{
checkBounds(static_cast< std::int32_t >(translators.size()), batchStartsAt, batchSize);
std::int64_t finalSequence = m_sequencer->next(batchSize);
translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
}
template <class TTranslators, class = decltype(std::declval< TTranslators >().begin())>
typename std::enable_if
<
std::is_base_of
<
IEventTranslator< T >,
typename DeduceTranslatorFromContainer< TTranslators >::Type
>::value,
bool
>
::type tryPublishEvents(const TTranslators& translators)
{
return tryPublishEvents(translators, 0, static_cast< std::int32_t >(translators.size()));
}
template <class TTranslators, class = decltype(std::declval< TTranslators >().begin())>
typename std::enable_if
<
std::is_base_of
<
IEventTranslator< T >,
typename DeduceTranslatorFromContainer< TTranslators >::Type
>::value,
bool
>
::type tryPublishEvents(const TTranslators& translators, std::int32_t batchStartsAt, std::int32_t batchSize)
{
checkBounds(static_cast< std::int32_t >(translators.size()), batchStartsAt, batchSize);
try
{
auto finalSequence = m_sequencer->tryNext(batchSize);
translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
return true;
}
catch (InsufficientCapacityException&)
{
return false;
}
}
template <class TTranslator, class... TArgs>
typename std::enable_if
<
std::is_base_of
<
IEventTranslatorVararg< T, TArgs... >,
TTranslator
>::value
>
::type publishEvents(const std::shared_ptr< TTranslator >& translator, const std::initializer_list< TArgs >&... args)
{
publishEvents(translator, 0, getGreatestLength(args...), args...);
}
template <class TTranslator, class... TArgs>
typename std::enable_if
<
std::is_base_of
<
IEventTranslatorVararg< T, TArgs... >,
TTranslator
>::value
>
::type publishEvents(const std::shared_ptr< TTranslator >& translator, std::int32_t batchStartsAt, std::int32_t batchSize, const std::initializer_list< TArgs >&... args)
{
checkBounds(getGreatestLength(args...), batchStartsAt, batchSize);
std::int64_t finalSequence = m_sequencer->next(batchSize);
translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args...);
}
template <class TTranslator, class... TArgs>
typename std::enable_if
<
std::is_base_of
<
IEventTranslatorVararg< T, TArgs... >,
TTranslator
>::value,
bool
>
::type tryPublishEvents(const std::shared_ptr< TTranslator >& translator, const std::initializer_list< TArgs >&... args)
{
return tryPublishEvents(translator, 0, getGreatestLength(args...), args...);
}
template <class TTranslator, class... TArgs>
typename std::enable_if
<
std::is_base_of
<
IEventTranslatorVararg< T, TArgs... >,
TTranslator
>::value,
bool
>
::type tryPublishEvents(const std::shared_ptr< TTranslator >& translator, std::int32_t batchStartsAt, std::int32_t batchSize, const std::initializer_list< TArgs >&... args)
{
checkBounds(getGreatestLength(args...), batchStartsAt, batchSize);
try
{
auto finalSequence = m_sequencer->tryNext(batchSize);
translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args...);
return true;
}
catch (InsufficientCapacityException&)
{
return false;
}
}
void writeDescriptionTo(std::ostream& stream) const
{
stream << "BufferSize: " << m_bufferSize << ", Sequencer: { ";
m_sequencer->writeDescriptionTo(stream);
stream << " }";
}
private:
void checkBounds(std::int32_t argumentCount, std::int32_t batchStartsAt, std::int32_t batchSize)
{
checkBatchSizing(batchStartsAt, batchSize);
batchOverRuns(argumentCount, batchStartsAt, batchSize);
}
void checkBatchSizing(std::int32_t batchStartsAt, std::int32_t batchSize)
{
if (batchStartsAt < 0 || batchSize < 0)
{
DISRUPTOR_THROW_ARGUMENT_EXCEPTION("Both batchStartsAt and batchSize must be positive but got: batchStartsAt " << batchStartsAt << " and batchSize " << batchSize);
}
if (batchSize > bufferSize())
{
DISRUPTOR_THROW_ARGUMENT_EXCEPTION("The ring buffer cannot accommodate " << batchSize << " it only has space for " << bufferSize() << " entities.");
}
}
static void batchOverRuns(std::int32_t argumentCount, std::int32_t batchStartsAt, std::int32_t batchSize)
{
if (batchStartsAt + batchSize > argumentCount)
{
DISRUPTOR_THROW_ARGUMENT_EXCEPTION("A batchSize of: " << batchSize <<
" with batchStartsAt of: " << batchStartsAt <<
" will overrun the available number of arguments: " << (argumentCount - batchStartsAt));
}
}
template <class TTranslator>
typename std::enable_if
<
std::is_base_of< IEventTranslator< T >, TTranslator >::value
>
::type translateAndPublish(const std::shared_ptr< TTranslator >& translator, std::int64_t sequence)
{
try
{
translator->translateTo((*this)[sequence], sequence);
}
catch (...)
{
}
m_sequencer->publish(sequence);
}
template <class TTranslator, class... TArgs>
typename std::enable_if
<
std::is_base_of< IEventTranslatorVararg< T, TArgs... >, TTranslator >::value
>
::type translateAndPublish(const std::shared_ptr< TTranslator >& translator, std::int64_t sequence, const TArgs&... args)
{
try
{
translator->translateTo((*this)[sequence], sequence, args...);
}
catch (...)
{
}
m_sequencer->publish(sequence);
}
template <class TTranslators, class = decltype(std::declval< TTranslators >().begin())>
typename std::enable_if
<
std::is_base_of
<
IEventTranslator< T >,
typename DeduceTranslatorFromContainer< TTranslators >::Type
>::value
>
::type translateAndPublishBatch(const TTranslators& translators, std::int32_t batchStartsAt, std::int32_t batchSize, std::int64_t finalSequence)
{
std::int64_t initialSequence = finalSequence - (batchSize - 1);
try
{
auto sequence = initialSequence;
auto batchEndsAt = batchStartsAt + batchSize;
auto translatorIt = translators.begin() + batchStartsAt;
for (std::int32_t i = batchStartsAt; i < batchEndsAt; ++i, ++sequence, ++translatorIt)
{
auto& translator = *translatorIt;
translator->translateTo((*this)[sequence], sequence);
}
}
catch (...)
{
}
m_sequencer->publish(initialSequence, finalSequence);
}
template <class TTranslator, class... TArgs>
typename std::enable_if
<
std::is_base_of
<
IEventTranslatorVararg< T, TArgs... >,
TTranslator
>::value
>
::type translateAndPublishBatch(const std::shared_ptr< TTranslator >& translator,
std::int32_t batchStartsAt,
std::int32_t batchSize,
std::int64_t finalSequence,
const std::initializer_list< TArgs >&... args)
{
std::int64_t initialSequence = finalSequence - (batchSize - 1);
try
{
auto sequence = initialSequence;
auto batchEndsAt = batchStartsAt + batchSize;
for (std::int32_t i = batchStartsAt; i < batchEndsAt; i++, sequence++)
{
translator->translateTo((*this)[sequence], sequence, *(args.begin() + i)...);
}
}
catch (...)
{
}
m_sequencer->publish(initialSequence, finalSequence);
}
private:
char padding0[56];
mutable std::vector< T > m_entries;
std::int32_t m_indexMask;
std::int32_t m_bufferSize;
std::shared_ptr< ISequencer< T > > m_sequencer;
char padding1[40];
};
} // namespace Disruptor
namespace std
{
template <class T>
ostream& operator<<(ostream& stream, const Disruptor::RingBuffer< T >& ringBuffer)
{
stream << "RingBuffer: { ";
ringBuffer.writeDescriptionTo(stream);
stream << " }";
return stream;
}
} // namespace std