Skip to content

Commit b834275

Browse files
committed
Expose new LocalCluster interface and use in example
1 parent 0118731 commit b834275

4 files changed

Lines changed: 81 additions & 136 deletions

File tree

build.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ int main(int argc, char **argv) {
99
char *CXX = strcpy(calloc(1024, 1), or_else(getenv("CXX"), "g++"));
1010
char *EXEC_SUFFIX = strcpy(calloc(1024, 1), maybe(getenv("EXEC_SUFFIX")));
1111

12-
char *EXAMPLE_FILES[] = {"LoadBalancer", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName",
12+
char *EXAMPLE_FILES[] = {"HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName",
1313
"EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"};
1414

1515
strcat(CXXFLAGS, " -march=native -O3 -Wpedantic -Wall -Wextra -Wsign-conversion -Wconversion -std=c++20 -Isrc -IuSockets/src");

examples/HelloWorldThreaded.cpp

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,24 @@
11
#include "App.h"
2-
#include <thread>
3-
#include <algorithm>
4-
#include <mutex>
5-
6-
/* Note that SSL is disabled unless you build with WITH_OPENSSL=1 */
7-
const int SSL = 1;
8-
std::mutex stdoutMutex;
2+
#include "LocalCluster.h"
93

104
int main() {
11-
/* Overly simple hello world app, using multiple threads */
12-
std::vector<std::thread *> threads(std::thread::hardware_concurrency());
13-
14-
std::transform(threads.begin(), threads.end(), threads.begin(), [](std::thread */*t*/) {
15-
return new std::thread([]() {
16-
17-
uWS::SSLApp({
18-
.key_file_name = "misc/key.pem",
19-
.cert_file_name = "misc/cert.pem",
20-
.passphrase = "1234"
21-
}).get("/*", [](auto *res, auto * /*req*/) {
22-
res->end("Hello world!");
23-
}).listen(3000, [](auto *listen_socket) {
24-
stdoutMutex.lock();
25-
if (listen_socket) {
26-
/* Note that us_listen_socket_t is castable to us_socket_t */
27-
std::cout << "Thread " << std::this_thread::get_id() << " listening on port " << us_socket_local_port(SSL, (struct us_socket_t *) listen_socket) << std::endl;
28-
} else {
29-
std::cout << "Thread " << std::this_thread::get_id() << " failed to listen on port 3000" << std::endl;
30-
}
31-
stdoutMutex.unlock();
32-
}).run();
33-
5+
/* Note that SSL is disabled unless you build with WITH_OPENSSL=1 */
6+
uWS::LocalCluster({
7+
.key_file_name = "misc/key.pem",
8+
.cert_file_name = "misc/cert.pem",
9+
.passphrase = "1234"
10+
},
11+
[](uWS::SSLApp &app) {
12+
/* Here this App instance is defined */
13+
app.get("/*", [](auto *res, auto * /*req*/) {
14+
res->end("Hello world!");
15+
}).listen(3000, [](auto *listen_socket) {
16+
if (listen_socket) {
17+
/* Note that us_listen_socket_t is castable to us_socket_t */
18+
std::cout << "Thread " << std::this_thread::get_id() << " listening on port " << us_socket_local_port(true, (struct us_socket_t *) listen_socket) << std::endl;
19+
} else {
20+
std::cout << "Thread " << std::this_thread::get_id() << " failed to listen on port 3000" << std::endl;
21+
}
3422
});
3523
});
36-
37-
std::for_each(threads.begin(), threads.end(), [](std::thread *t) {
38-
t->join();
39-
});
4024
}

examples/LoadBalancer.cpp

Lines changed: 0 additions & 101 deletions
This file was deleted.

src/LocalCluster.h

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/* This header is highly experimental and needs refactorings but will do for now */
2+
3+
#include <thread>
4+
#include <algorithm>
5+
#include <mutex>
6+
7+
unsigned int roundRobin = 0;
8+
unsigned int hardwareConcurrency = std::thread::hardware_concurrency();
9+
std::vector<std::thread *> threads(hardwareConcurrency);
10+
std::vector<uWS::SSLApp *> apps;
11+
std::mutex m;
12+
13+
namespace uWS {
14+
struct LocalCluster {
15+
16+
//std::vector<std::thread *> threads = std::thread::hardware_concurrency();
17+
//std::vector<uWS::SSLApp *> apps;
18+
//std::mutex m;
19+
20+
21+
static void loadBalancer() {
22+
static std::atomic<unsigned int> roundRobin = 0; // atomic fetch_add
23+
}
24+
25+
LocalCluster(SocketContextOptions options = {}, std::function<void(uWS::SSLApp &)> cb = nullptr) {
26+
std::transform(threads.begin(), threads.end(), threads.begin(), [options, &cb](std::thread *) {
27+
28+
return new std::thread([options, &cb]() {
29+
30+
// lock this
31+
m.lock();
32+
apps.emplace_back(new uWS::SSLApp(options));
33+
uWS::SSLApp *app = apps.back();
34+
35+
cb(*app);
36+
37+
app->preOpen([](LIBUS_SOCKET_DESCRIPTOR fd) {
38+
39+
/* Distribute this socket in round robin fashion */
40+
//std::cout << "About to load balance " << fd << " to " << roundRobin << std::endl;
41+
42+
auto receivingApp = apps[roundRobin];
43+
apps[roundRobin]->getLoop()->defer([fd, receivingApp]() {
44+
receivingApp->adoptSocket(fd);
45+
});
46+
47+
roundRobin = (roundRobin + 1) % hardwareConcurrency;
48+
return -1;
49+
});
50+
m.unlock();
51+
app->run();
52+
std::cout << "Fallthrough!" << std::endl;
53+
delete app;
54+
});
55+
});
56+
57+
std::for_each(threads.begin(), threads.end(), [](std::thread *t) {
58+
t->join();
59+
});
60+
}
61+
};
62+
}

0 commit comments

Comments
 (0)