forked from Abc-Arbitrage/Disruptor-cpp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBatchingTests.cpp
More file actions
123 lines (94 loc) · 3.34 KB
/
BatchingTests.cpp
File metadata and controls
123 lines (94 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include "stdafx.h"
#include <boost/mpl/vector.hpp>
#include "Disruptor/Disruptor.h"
#include "Disruptor/RoundRobinThreadAffinedTaskScheduler.h"
#include "Disruptor/SleepingWaitStrategy.h"
#include "LongEvent.h"
namespace Disruptor
{
namespace Tests
{
class ParallelEventHandler : public IEventHandler< LongEvent >
{
public:
ParallelEventHandler(std::int64_t mask, std::int64_t ordinal)
: processed(0)
{
m_mask = mask;
m_ordinal = ordinal;
}
void onEvent(LongEvent& event, std::int64_t sequence, bool endOfBatch) override
{
if ((sequence & m_mask) == m_ordinal)
{
eventCount++;
tempValue = event.value;
}
if (endOfBatch || ++batchCount >= m_batchSize)
{
publishedValue = tempValue;
batchCount = 0;
}
else
{
std::this_thread::yield();
}
processed = sequence;
}
std::int64_t eventCount = 0;
std::int64_t batchCount = 0;
std::int64_t publishedValue = 0;
std::int64_t tempValue = 0;
std::atomic< std::int64_t > processed;
private:
std::int64_t m_mask = 0;
std::int64_t m_ordinal = 0;
static const std::int32_t m_batchSize = 10;
};
class EventTranslator : public IEventTranslator< LongEvent >
{
public:
void translateTo(LongEvent& eventData, std::int64_t sequence) override
{
eventData.value = sequence;
}
};
} // namespace Tests
} // namespace Disruptor
using namespace Disruptor;
using namespace ::Disruptor::Tests;
BOOST_AUTO_TEST_SUITE(BatchingTests)
typedef boost::mpl::vector
<
std::integral_constant< ProducerType, ProducerType::Single >,
std::integral_constant< ProducerType, ProducerType::Multi >
>
ProducerTypes;
BOOST_AUTO_TEST_CASE_TEMPLATE(ShouldBatch, TProducerType, ProducerTypes)
{
auto scheduler = std::make_shared< RoundRobinThreadAffinedTaskScheduler >();
scheduler->start(std::thread::hardware_concurrency());
auto d = std::make_shared< disruptor< LongEvent > >([] { return LongEvent(); }, 2048, scheduler, TProducerType::value, std::make_shared< SleepingWaitStrategy >());
auto handler1 = std::make_shared< ParallelEventHandler >(1, 0);
auto handler2 = std::make_shared< ParallelEventHandler >(1, 1);
d->handleEventsWith({ handler1, handler2 });
auto buffer = d->start();
auto translator = std::make_shared< EventTranslator >();
const std::int32_t eventCount = 10000;
for (auto i = 0; i < eventCount; ++i)
{
buffer->publishEvent(translator);
}
while (handler1->processed != eventCount - 1 ||
handler2->processed != eventCount - 1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
BOOST_CHECK_EQUAL(handler1->publishedValue, static_cast< std::int64_t >(eventCount) - 2);
BOOST_CHECK_EQUAL(handler1->eventCount, static_cast< std::int64_t >(eventCount) / 2);
BOOST_CHECK_EQUAL(handler2->publishedValue, static_cast< std::int64_t >(eventCount) - 1);
BOOST_CHECK_EQUAL(handler2->eventCount, static_cast< std::int64_t >(eventCount) / 2);
d->shutdown();
scheduler->stop();
}
BOOST_AUTO_TEST_SUITE_END()