Skip to content

Commit 710a503

Browse files
author
Lee Chang Ha at saturn
committed
...
1 parent 0f5b8d2 commit 710a503

2 files changed

Lines changed: 71 additions & 1 deletion

File tree

cpp.rx/Rakefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- ruby -*-
22

3-
SRC = "finally"
3+
SRC = "buffer"
44
SRCS = %W{ ./#{SRC}.cpp }
55
TEST_SRCS = %W{ ./test_#{SRC}.cpp }
66
APP = "#{SRC}"

cpp.rx/buffer.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#define CATCH_CONFIG_MAIN
2+
3+
#include "catch.hpp"
4+
#include "rxcpp/rx.hpp"
5+
6+
#include <sstream>
7+
8+
using namespace std;
9+
namespace rx = rxcpp;
10+
11+
string get_pid()
12+
{
13+
stringstream ss;
14+
ss << this_thread::get_id();
15+
16+
return ss.str();
17+
}
18+
19+
SCENARIO("buffer count sample")
20+
{
21+
auto on_next = [](vector<int> vs) {
22+
cout << "OnNext";
23+
for (auto v: vs) cout << " " << v;
24+
cout << endl;
25+
};
26+
27+
auto val = rx::observable<>::range(1, 10).buffer(3);
28+
val.subscribe(on_next);
29+
}
30+
31+
SCENARIO("buffer count+skip sample")
32+
{
33+
auto on_next = [](vector<int> vs) {
34+
cout << "OnNext";
35+
for (auto v: vs) cout << " " << v;
36+
cout << endl;
37+
};
38+
39+
auto val = rx::observable<>::range(1, 7).buffer(2, 3);
40+
val.subscribe(on_next);
41+
}
42+
43+
SCENARIO("buffer count+skip+coordination sample")
44+
{
45+
auto period = chrono::milliseconds(4);
46+
auto skip = chrono::milliseconds(6);
47+
auto values = rx::observable<>::interval(chrono::steady_clock::now() + chrono::milliseconds(1), chrono::milliseconds(2))
48+
.map([](long v) {
49+
printf("[thread %s] Interval OnNext: %ld\n", get_pid().c_str(), v);
50+
return v;
51+
})
52+
.take(7)
53+
.buffer_with_time(period, skip, rx::observe_on_new_thread());
54+
55+
auto on_next = [](vector<long> vs) {
56+
printf("[thread %s] OnNext:", get_pid().c_str());
57+
for (auto v: vs) printf(" %ld", v);
58+
printf("\n");
59+
};
60+
61+
auto on_complete = []() {
62+
printf("[thread %s] OnCompleted\n", get_pid().c_str());
63+
};
64+
65+
values.as_blocking()
66+
.subscribe(on_next);
67+
// .subscribe(on_next, on_complete);
68+
}
69+
70+

0 commit comments

Comments
 (0)