-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathbuffer.cpp
More file actions
70 lines (55 loc) · 1.58 KB
/
buffer.cpp
File metadata and controls
70 lines (55 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#define CATCH_CONFIG_MAIN
#include "catch.hpp"
#include "rxcpp/rx.hpp"
#include <sstream>
using namespace std;
namespace rx = rxcpp;
string get_pid()
{
stringstream ss;
ss << this_thread::get_id();
return ss.str();
}
SCENARIO("buffer count sample")
{
auto on_next = [](vector<int> vs) {
cout << "OnNext";
for (auto v: vs) cout << " " << v;
cout << endl;
};
auto val = rx::observable<>::range(1, 10).buffer(3);
val.subscribe(on_next);
}
SCENARIO("buffer count+skip sample")
{
auto on_next = [](vector<int> vs) {
cout << "OnNext";
for (auto v: vs) cout << " " << v;
cout << endl;
};
auto val = rx::observable<>::range(1, 7).buffer(2, 3);
val.subscribe(on_next);
}
SCENARIO("buffer count+skip+coordination sample")
{
auto period = chrono::milliseconds(4);
auto skip = chrono::milliseconds(6);
auto values = rx::observable<>::interval(chrono::steady_clock::now() + chrono::milliseconds(1), chrono::milliseconds(2))
.map([](long v) {
printf("[thread %s] Interval OnNext: %ld\n", get_pid().c_str(), v);
return v;
})
.take(7)
.buffer_with_time(period, skip, rx::observe_on_new_thread());
auto on_next = [](vector<long> vs) {
printf("[thread %s] OnNext:", get_pid().c_str());
for (auto v: vs) printf(" %ld", v);
printf("\n");
};
auto on_complete = []() {
printf("[thread %s] OnCompleted\n", get_pid().c_str());
};
values.as_blocking()
.subscribe(on_next);
// .subscribe(on_next, on_complete);
}