-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathlistener.cpp
More file actions
111 lines (89 loc) · 2.7 KB
/
listener.cpp
File metadata and controls
111 lines (89 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include "expresscpp/impl/listener.hpp"
#include "expresscpp/console.hpp"
namespace expresscpp {
namespace net = boost::asio;
Listener::Listener(const std::string& address, const uint16_t port, ExpressCpp* express_cpp,
ready_fn_cb_error_code_t error_callback)
: express_cpp_(express_cpp), io_threads(threads_), acceptor_(ioc_), strand_(ioc_.get_executor()), socket_(ioc_) {
assert(express_cpp_ != nullptr);
const auto ip_address = boost::asio::ip::make_address(address);
auto asio_endpoint = boost::asio::ip::tcp::endpoint{ip_address, port};
Init(asio_endpoint, error_callback);
}
Listener::~Listener() {
Console::Debug("destroying listener");
Stop();
}
void Listener::Init(boost::asio::ip::tcp::endpoint endpoint, ready_fn_cb_error_code_t error_callback) {
boost::beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if (ec) {
error_callback(ec);
Console::Trace(fmt::format("{}:{}", "open", ec.message()));
return;
}
// Allow address reuse
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
if (ec) {
error_callback(ec);
Console::Trace(fmt::format("{}:{}", "set_option", ec.message()));
return;
}
// Bind to the server address
acceptor_.bind(endpoint, ec);
if (ec) {
error_callback(ec);
Console::Trace(fmt::format("{}:{}", "bind", ec.message()));
return;
}
// Start listening for connections
acceptor_.listen(net::socket_base::max_listen_connections, ec);
if (ec) {
error_callback(ec);
Console::Trace(fmt::format("{}:{}", "listen", ec.message()));
return;
}
}
void Listener::run() {
do_accept();
}
void Listener::launch_threads() {
// Run the I/O service on the requested number of threads
io_threads.reserve(threads_);
for (auto i = threads_; i > 0; --i) {
io_threads.emplace_back([this] { ioc_.run(); });
}
listening_ = true;
}
void Listener::Stop() {
if (!listening_) {
return;
}
ioc_.stop();
acceptor_.close();
for (auto& t : io_threads) {
if (t.joinable()) {
t.join();
}
}
Console::Debug("stopping listener");
listening_ = false;
}
void Listener::do_accept() {
std::scoped_lock<std::mutex> lock(mutex_);
// The new connection gets its own strand
acceptor_.async_accept(
socket_, boost::asio::bind_executor(strand_, std::bind(&Listener::on_accept, this, std::placeholders::_1)));
}
void Listener::on_accept(boost::beast::error_code ec) {
if (ec) {
Console::Trace(fmt::format("{}:{}", "accept", ec.message()));
} else {
// Create the session and run it
std::make_shared<Session>(std::move(socket_), express_cpp_)->run();
}
// Accept another connection
do_accept();
}
} // namespace expresscpp