Skip to content

Commit cc4873e

Browse files
committed
switch to blocking poll + fix config, signals, time, non-game login
1 parent ddcba8e commit cc4873e

7 files changed

Lines changed: 157 additions & 69 deletions

File tree

config.cfg

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
1-
# Database Config
2-
DatabaseFile = "tibia.db"
3-
MaxCachedStatements = 100
4-
51
# HostCache Config
62
MaxCachedHostNames = 100
73
HostNameExpireTime = 30m
84

5+
# Database Config
6+
MaxCachedStatements = 100
7+
DatabaseFile = "tibia.db"
8+
# TODO(fusion): This is not great. Probably switch to database specific options?
9+
#DatabaseHost = "localhost"
10+
#DatabasePort = 5432
11+
#DatabaseUser = "postgres"
12+
#DatabasePassword = ""
13+
#DatabaseName = "tibia"
14+
#DatabaseTLS = true
15+
916
# Connection Config
10-
UpdateRate = 20
1117
QueryManagerPort = 7173
1218
QueryManagerPassword = "a6glaf0c"
19+
QueryWorkerThreads = 1
20+
QueryBufferSize = 1M
21+
QueryMaxAttempts = 3
1322
MaxConnections = 25
1423
MaxConnectionIdleTime = 5m
15-
MaxConnectionPacketSize = 1M

src/connections.cc

Lines changed: 115 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
# include <fcntl.h>
99
# include <netinet/in.h>
1010
# include <poll.h>
11+
# include <sys/eventfd.h>
1112
# include <sys/socket.h>
1213
# include <unistd.h>
1314
# include <time.h>
@@ -16,6 +17,7 @@
1617
#endif
1718

1819
static int g_Listener = -1;
20+
static int g_UpdateEvent = -1;
1921
static TConnection *g_Connections;
2022

2123
// Connection Handling
@@ -139,7 +141,7 @@ TConnection *AssignConnection(int Socket, uint32 Addr, uint16 Port){
139141
Connection = &g_Connections[ConnectionIndex];
140142
Connection->State = CONNECTION_READING;
141143
Connection->Socket = Socket;
142-
Connection->LastActive = g_MonotonicTimeMS;
144+
Connection->LastActive = GetMonotonicUptimeMS();
143145
snprintf(Connection->RemoteAddress,
144146
sizeof(Connection->RemoteAddress),
145147
"%d.%d.%d.%d:%d",
@@ -213,7 +215,7 @@ void CheckConnectionInput(TConnection *Connection, int Events){
213215
if(Connection->RWPosition >= ReadSize){
214216
if(Connection->RWSize != 0){
215217
Connection->State = CONNECTION_REQUEST;
216-
Connection->LastActive = g_MonotonicTimeMS;
218+
Connection->LastActive = GetMonotonicUptimeMS();
217219
Connection->Query->Request = TReadBuffer(Buffer, Connection->RWSize);
218220
break;
219221
}else if(Connection->RWPosition == 2){
@@ -252,8 +254,9 @@ void ProcessQuery(TConnection *Connection){
252254

253255
void SendQueryResponse(TConnection *Connection){
254256
ASSERT(Connection->Query != NULL);
255-
if(Connection->State != CONNECTION_RESPONSE){
256-
LOG_ERR("Connection %s is not in a RESPONSE state (State: %d)",
257+
if(Connection->State != CONNECTION_REQUEST
258+
&& Connection->State != CONNECTION_RESPONSE){
259+
LOG_ERR("Connection %s is not in a REQUEST/RESPONSE state (State: %d)",
257260
Connection->RemoteAddress, Connection->State);
258261
CloseConnection(Connection);
259262
return;
@@ -453,8 +456,16 @@ void CheckConnectionQueryResponse(TConnection *Connection){
453456
}
454457
}
455458

456-
457459
void CheckConnectionOutput(TConnection *Connection, int Events){
460+
// TODO(fusion): We're only polling `POLLOUT` when the connection is WRITING
461+
// meaning that a writes will be delayed at least one cycle after a response
462+
// is available. This could be solved by adding a `CanWrite` boolean to the
463+
// connection struct that is set to false when `write` returns `EAGAIN`, and
464+
// is used to determine whether we should poll `POLLOUT`. That said, it may
465+
// not even make that big of a difference.
466+
// if(!Connection->CanWrite) { Events |= POLLOUT; }
467+
// if((Events & POLLOUT) != 0) { Connection->CanWrite = true; }
468+
// if(errno == EAGAIN) { Connection->CanWrite = false; }
458469
if((Events & POLLOUT) == 0 || Connection->Socket == -1){
459470
return;
460471
}
@@ -500,7 +511,7 @@ void CheckConnection(TConnection *Connection, int Events){
500511
}
501512

502513
if(g_Config.MaxConnectionIdleTime > 0){
503-
int IdleTime = (g_MonotonicTimeMS - Connection->LastActive);
514+
int IdleTime = (GetMonotonicUptimeMS() - Connection->LastActive);
504515
if(IdleTime >= g_Config.MaxConnectionIdleTime){
505516
LOG_WARN("Dropping connection %s due to inactivity",
506517
Connection->RemoteAddress);
@@ -513,8 +524,37 @@ void CheckConnection(TConnection *Connection, int Events){
513524
}
514525
}
515526

516-
void ProcessConnections(void){
517-
// NOTE(fusion): Accept new connections.
527+
void WakeConnections(void){
528+
if(g_UpdateEvent != -1){
529+
uint64 One = 1;
530+
int Written = (int)write(g_UpdateEvent, &One, sizeof(One));
531+
if(Written != sizeof(One)){
532+
LOG_ERR("Failed to signal update event: (%d) %s",
533+
errno, strerrordesc_np(errno));
534+
}
535+
}
536+
}
537+
538+
static void ConsumeUpdateEvent(int Events){
539+
ASSERT(g_UpdateEvent != -1);
540+
if((Events & POLLIN) == 0){
541+
return;
542+
}
543+
544+
uint64 Dummy;
545+
int Read = (int)read(g_UpdateEvent, &Dummy, sizeof(Dummy));
546+
if(Read != sizeof(Dummy)){
547+
LOG_ERR("Failed to consume update event: (%d) %s",
548+
errno, strerrordesc_np(errno));
549+
}
550+
}
551+
552+
static void AcceptConnections(int Events){
553+
ASSERT(g_Listener != -1);
554+
if((Events & POLLIN) == 0){
555+
return;
556+
}
557+
518558
while(true){
519559
uint32 Addr;
520560
uint16 Port;
@@ -530,50 +570,88 @@ void ProcessConnections(void){
530570
close(Socket);
531571
}
532572
}
573+
}
574+
575+
void ProcessConnections(void){
576+
int NumFds = 0;
577+
int MaxFds = g_Config.MaxConnections + 2;
578+
pollfd *Fds = (pollfd*)alloca(MaxFds * sizeof(pollfd));
579+
int *ConnectionIndices = (int*)alloca(MaxFds * sizeof(int));
580+
581+
if(g_UpdateEvent != -1){
582+
Fds[NumFds].fd = g_UpdateEvent;
583+
Fds[NumFds].events = POLLIN;
584+
Fds[NumFds].revents = 0;
585+
ConnectionIndices[NumFds] = -1;
586+
NumFds += 1;
587+
}
588+
589+
if(g_Listener != -1){
590+
Fds[NumFds].fd = g_Listener;
591+
Fds[NumFds].events = POLLIN;
592+
Fds[NumFds].revents = 0;
593+
ConnectionIndices[NumFds] = -1;
594+
NumFds += 1;
595+
}
533596

534-
// NOTE(fusion): Gather active connections.
535-
int NumConnections = 0;
536-
int *ConnectionIndices = (int*)alloca(g_Config.MaxConnections * sizeof(int));
537-
pollfd *ConnectionFds = (pollfd*)alloca(g_Config.MaxConnections * sizeof(pollfd));
538597
for(int i = 0; i < g_Config.MaxConnections; i += 1){
539598
if(g_Connections[i].State == CONNECTION_FREE || g_Connections[i].Socket == -1){
540599
continue;
541600
}
542601

543-
ConnectionIndices[NumConnections] = i;
544-
ConnectionFds[NumConnections].fd = g_Connections[i].Socket;
545-
ConnectionFds[NumConnections].events = POLLIN | POLLOUT;
546-
ConnectionFds[NumConnections].revents = 0;
547-
NumConnections += 1;
548-
}
549-
550-
if(NumConnections <= 0){
551-
return;
602+
Fds[NumFds].fd = g_Connections[i].Socket;
603+
Fds[NumFds].events = POLLIN;
604+
if(g_Connections[i].State == CONNECTION_WRITING){
605+
Fds[NumFds].events |= POLLOUT;
606+
}
607+
Fds[NumFds].revents = 0;
608+
ConnectionIndices[NumFds] = i;
609+
NumFds += 1;
552610
}
553611

554-
// NOTE(fusion): Poll connections.
555-
int NumEvents = poll(ConnectionFds, NumConnections, 0);
612+
ASSERT(NumFds > 0);
613+
int NumEvents = poll(Fds, NumFds, -1);
556614
if(NumEvents == -1){
557-
LOG_ERR("Failed to poll connections: (%d) %s", errno, strerrordesc_np(errno));
615+
if(errno != ETIMEDOUT){
616+
LOG_ERR("Failed to poll connections: (%d) %s",
617+
errno, strerrordesc_np(errno));
618+
}
558619
return;
559620
}
560621

561622
// NOTE(fusion): Process connections.
562-
for(int i = 0; i < NumConnections; i += 1){
563-
TConnection *Connection = &g_Connections[ConnectionIndices[i]];
564-
int Events = (int)ConnectionFds[i].revents;
565-
CheckConnectionInput(Connection, Events);
566-
CheckConnectionQueryRequest(Connection);
567-
CheckConnectionQueryResponse(Connection);
568-
CheckConnectionOutput(Connection, Events);
569-
CheckConnection(Connection, Events);
623+
for(int i = 0; i < NumFds; i += 1){
624+
int Index = ConnectionIndices[i];
625+
int Events = (int)Fds[i].revents;
626+
if(Index >= 0 && Index < g_Config.MaxConnections){
627+
TConnection *Connection = &g_Connections[Index];
628+
CheckConnectionInput(Connection, Events);
629+
CheckConnectionQueryRequest(Connection);
630+
CheckConnectionQueryResponse(Connection);
631+
CheckConnectionOutput(Connection, Events);
632+
CheckConnection(Connection, Events);
633+
}else if(Index == -1 && Fds[i].fd == g_UpdateEvent){
634+
ConsumeUpdateEvent(Events);
635+
}else if(Index == -1 && Fds[i].fd == g_Listener){
636+
AcceptConnections(Events);
637+
}else{
638+
LOG_ERR("Unknown connection index %d", Index);
639+
}
570640
}
571641
}
572642

573643
bool InitConnections(void){
644+
ASSERT(g_UpdateEvent == -1);
574645
ASSERT(g_Listener == -1);
575646
ASSERT(g_Connections == NULL);
576647

648+
g_UpdateEvent = eventfd(0, EFD_NONBLOCK);
649+
if(g_UpdateEvent == -1){
650+
LOG_ERR("Failed to create eventfd: (%d) (%s)",
651+
errno, strerrordesc_np(errno));
652+
return false;
653+
}
654+
577655
g_Listener = ListenerBind((uint16)g_Config.QueryManagerPort);
578656
if(g_Listener == -1){
579657
LOG_ERR("Failed to bind listener");
@@ -590,6 +668,11 @@ bool InitConnections(void){
590668
}
591669

592670
void ExitConnections(void){
671+
if(g_UpdateEvent != -1){
672+
close(g_UpdateEvent);
673+
g_UpdateEvent = -1;
674+
}
675+
593676
if(g_Listener != -1){
594677
close(g_Listener);
595678
g_Listener = -1;

src/database_sqlite.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ static sqlite3_stmt *PrepareQuery(TDatabase *Database, const char *Text){
9090
ASSERT(EntryText != NULL);
9191
if(strcmp(EntryText, Text) == 0){
9292
Stmt = Entry->Stmt;
93-
Entry->LastUsed = g_MonotonicTimeMS;
93+
Entry->LastUsed = GetMonotonicUptimeMS();
9494
break;
9595
}
9696
}
@@ -109,7 +109,7 @@ static sqlite3_stmt *PrepareQuery(TDatabase *Database, const char *Text){
109109
}
110110

111111
Entry->Stmt = Stmt;
112-
Entry->LastUsed = g_MonotonicTimeMS;
112+
Entry->LastUsed = GetMonotonicUptimeMS();
113113
Entry->Hash = Hash;
114114
}else{
115115
if(sqlite3_stmt_busy(Stmt) != 0){

src/hostcache.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,11 @@ bool ResolveHostName(const char *HostName, int *OutAddr){
6262
THostCacheEntry *Entry = NULL;
6363
int LeastRecentlyUsedIndex = 0;
6464
int LeastRecentlyUsedTime = g_CachedHostNames[0].ResolveTime;
65+
int TimeNow = GetMonotonicUptimeMS();
6566
for(int i = 0; i < g_Config.MaxCachedHostNames; i += 1){
6667
THostCacheEntry *Current = &g_CachedHostNames[i];
6768

68-
if((g_MonotonicTimeMS - Current->ResolveTime) >= g_Config.HostNameExpireTime){
69+
if((TimeNow - Current->ResolveTime) >= g_Config.HostNameExpireTime){
6970
memset(Current, 0, sizeof(THostCacheEntry));
7071
}
7172

@@ -89,7 +90,7 @@ bool ResolveHostName(const char *HostName, int *OutAddr){
8990
(int)strlen(HostName), (int)sizeof(Entry->HostName));
9091
}
9192
Entry->Resolved = DoResolveHostName(HostName, &Entry->IPAddress);
92-
Entry->ResolveTime = g_MonotonicTimeMS;
93+
Entry->ResolveTime = TimeNow;
9394
}
9495

9596
if(Entry && Entry->Resolved){

src/query.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ static void *WorkerThread(void *Data){
257257
}
258258

259259
QueryDone(Query);
260+
WakeConnections();
260261
}
261262

262263
LOG("Worker#%d: DONE...", Worker->WorkerID);

0 commit comments

Comments
 (0)