-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchannel.h
More file actions
143 lines (117 loc) · 3.96 KB
/
channel.h
File metadata and controls
143 lines (117 loc) · 3.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/*
Realization of channel concept from GoLang.
Allows to create a queue of data and read it asynchroniously
ext::Channel<int> channel;
std::thread([&]()
{
for (auto val : channel) {
...
}
});
channel.add(1);
channel.add(10);
channel.close();
*/
#pragma once
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <functional>
#include <optional>
#include <mutex>
#include <queue>
#include <ext/core/defines.h>
#include <ext/core/noncopyable.h>
namespace ext {
template <typename T>
class Channel : ::ext::NonCopyable {
private:
mutable std::mutex m_queue_mutex;
mutable std::condition_variable m_queue_not_full;
mutable std::condition_variable m_queue_not_empty;
std::queue<T> m_queue;
const size_t m_max_size;
std::atomic<bool> m_closed = false;
class ChannelIterator {
public:
using value_type = T;
private:
Channel* m_channel;
std::optional<value_type> m_value;
constexpr ChannelIterator(Channel* channel, std::optional<value_type>&& value = std::nullopt)
: m_channel(channel)
, m_value(std::move(value))
{}
friend class Channel;
public:
ChannelIterator(const ChannelIterator& other) = delete;
constexpr explicit ChannelIterator(ChannelIterator&& other) noexcept
: m_channel(other.m_channel)
, m_value(std::move(other.m_value))
{
other.m_value = std::nullopt;
}
constexpr bool operator==(const ChannelIterator& other) const {
return m_channel == other.m_channel && !m_value.has_value() && !other.m_value.has_value();
}
constexpr bool operator!=(const ChannelIterator& other) const {
return !operator==(other);
}
constexpr const value_type& operator*() const { return m_value.value(); }
constexpr value_type& operator*() { return m_value.value(); }
constexpr value_type* operator->() { return &m_value.value(); }
constexpr const value_type* operator->() const { return &m_value.value(); }
ChannelIterator& operator++() EXT_THROWS(std::bad_function_call) {
if (!m_value.has_value()) {
throw std::bad_function_call();
}
m_value = m_channel->get();
return *this;
}
};
public:
using iterator = ChannelIterator;
Channel(size_t size = 1)
: m_max_size(size)
{}
template <typename ...Args>
void add(Args&& ...args) EXT_THROWS(std::bad_function_call) {
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_queue_not_full.wait(lock, [&]() {
return (m_queue.size() < m_max_size) || m_closed;
});
std::lock_guard<std::mutex> guard(*lock.release(), std::adopt_lock);
if (m_closed) {
throw std::bad_function_call();
}
m_queue.emplace(std::forward<Args>(args)...);
m_queue_not_empty.notify_one();
}
[[nodiscard]] std::optional<T> get() noexcept
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_queue_not_empty.wait(lock, [this] { return !m_queue.empty() || m_closed; });
std::lock_guard<std::mutex> guard(*lock.release(), std::adopt_lock);
if (m_closed && m_queue.empty()) {
return std::nullopt;
}
const auto result = std::move(m_queue.front());
m_queue.pop();
m_queue_not_full.notify_one();
return result;
}
void close() {
std::lock_guard<std::mutex> lock(m_queue_mutex);
m_closed = true;
m_queue_not_full.notify_all();
m_queue_not_empty.notify_all();
}
void reset() {
std::lock_guard<std::mutex> lock(m_queue_mutex);
m_closed = false;
m_queue = {};
}
[[nodiscard]] iterator begin() { return ChannelIterator(this, get()); }
[[nodiscard]] iterator end() { return ChannelIterator(this, std::nullopt); }
};
} // namespace dadrian