88# include < fcntl.h>
99# include < netinet/in.h>
1010# include < poll.h>
11+ # include < sys/eventfd.h>
1112# include < sys/socket.h>
1213# include < unistd.h>
1314# include < time.h>
1617#endif
1718
1819static int g_Listener = -1 ;
20+ static int g_UpdateEvent = -1 ;
1921static TConnection *g_Connections;
2022
2123// Connection Handling
@@ -139,7 +141,7 @@ TConnection *AssignConnection(int Socket, uint32 Addr, uint16 Port){
139141 Connection = &g_Connections[ConnectionIndex];
140142 Connection->State = CONNECTION_READING;
141143 Connection->Socket = Socket;
142- Connection->LastActive = g_MonotonicTimeMS ;
144+ Connection->LastActive = GetMonotonicUptimeMS () ;
143145 snprintf (Connection->RemoteAddress ,
144146 sizeof (Connection->RemoteAddress ),
145147 " %d.%d.%d.%d:%d" ,
@@ -213,7 +215,7 @@ void CheckConnectionInput(TConnection *Connection, int Events){
213215 if (Connection->RWPosition >= ReadSize){
214216 if (Connection->RWSize != 0 ){
215217 Connection->State = CONNECTION_REQUEST;
216- Connection->LastActive = g_MonotonicTimeMS ;
218+ Connection->LastActive = GetMonotonicUptimeMS () ;
217219 Connection->Query ->Request = TReadBuffer (Buffer, Connection->RWSize );
218220 break ;
219221 }else if (Connection->RWPosition == 2 ){
@@ -252,8 +254,9 @@ void ProcessQuery(TConnection *Connection){
252254
253255void SendQueryResponse (TConnection *Connection){
254256 ASSERT (Connection->Query != NULL );
255- if (Connection->State != CONNECTION_RESPONSE){
256- LOG_ERR (" Connection %s is not in a RESPONSE state (State: %d)" ,
257+ if (Connection->State != CONNECTION_REQUEST
258+ && Connection->State != CONNECTION_RESPONSE){
259+ LOG_ERR (" Connection %s is not in a REQUEST/RESPONSE state (State: %d)" ,
257260 Connection->RemoteAddress , Connection->State );
258261 CloseConnection (Connection);
259262 return ;
@@ -453,8 +456,16 @@ void CheckConnectionQueryResponse(TConnection *Connection){
453456 }
454457}
455458
456-
457459void CheckConnectionOutput (TConnection *Connection, int Events){
460+ // TODO(fusion): We're only polling `POLLOUT` when the connection is WRITING
461+ // meaning that a writes will be delayed at least one cycle after a response
462+ // is available. This could be solved by adding a `CanWrite` boolean to the
463+ // connection struct that is set to false when `write` returns `EAGAIN`, and
464+ // is used to determine whether we should poll `POLLOUT`. That said, it may
465+ // not even make that big of a difference.
466+ // if(!Connection->CanWrite) { Events |= POLLOUT; }
467+ // if((Events & POLLOUT) != 0) { Connection->CanWrite = true; }
468+ // if(errno == EAGAIN) { Connection->CanWrite = false; }
458469 if ((Events & POLLOUT) == 0 || Connection->Socket == -1 ){
459470 return ;
460471 }
@@ -500,7 +511,7 @@ void CheckConnection(TConnection *Connection, int Events){
500511 }
501512
502513 if (g_Config.MaxConnectionIdleTime > 0 ){
503- int IdleTime = (g_MonotonicTimeMS - Connection->LastActive );
514+ int IdleTime = (GetMonotonicUptimeMS () - Connection->LastActive );
504515 if (IdleTime >= g_Config.MaxConnectionIdleTime ){
505516 LOG_WARN (" Dropping connection %s due to inactivity" ,
506517 Connection->RemoteAddress );
@@ -513,8 +524,37 @@ void CheckConnection(TConnection *Connection, int Events){
513524 }
514525}
515526
516- void ProcessConnections (void ){
517- // NOTE(fusion): Accept new connections.
527+ void WakeConnections (void ){
528+ if (g_UpdateEvent != -1 ){
529+ uint64 One = 1 ;
530+ int Written = (int )write (g_UpdateEvent, &One, sizeof (One));
531+ if (Written != sizeof (One)){
532+ LOG_ERR (" Failed to signal update event: (%d) %s" ,
533+ errno, strerrordesc_np (errno));
534+ }
535+ }
536+ }
537+
538+ static void ConsumeUpdateEvent (int Events){
539+ ASSERT (g_UpdateEvent != -1 );
540+ if ((Events & POLLIN) == 0 ){
541+ return ;
542+ }
543+
544+ uint64 Dummy;
545+ int Read = (int )read (g_UpdateEvent, &Dummy, sizeof (Dummy));
546+ if (Read != sizeof (Dummy)){
547+ LOG_ERR (" Failed to consume update event: (%d) %s" ,
548+ errno, strerrordesc_np (errno));
549+ }
550+ }
551+
552+ static void AcceptConnections (int Events){
553+ ASSERT (g_Listener != -1 );
554+ if ((Events & POLLIN) == 0 ){
555+ return ;
556+ }
557+
518558 while (true ){
519559 uint32 Addr;
520560 uint16 Port;
@@ -530,50 +570,88 @@ void ProcessConnections(void){
530570 close (Socket);
531571 }
532572 }
573+ }
574+
575+ void ProcessConnections (void ){
576+ int NumFds = 0 ;
577+ int MaxFds = g_Config.MaxConnections + 2 ;
578+ pollfd *Fds = (pollfd*)alloca (MaxFds * sizeof (pollfd));
579+ int *ConnectionIndices = (int *)alloca (MaxFds * sizeof (int ));
580+
581+ if (g_UpdateEvent != -1 ){
582+ Fds[NumFds].fd = g_UpdateEvent;
583+ Fds[NumFds].events = POLLIN;
584+ Fds[NumFds].revents = 0 ;
585+ ConnectionIndices[NumFds] = -1 ;
586+ NumFds += 1 ;
587+ }
588+
589+ if (g_Listener != -1 ){
590+ Fds[NumFds].fd = g_Listener;
591+ Fds[NumFds].events = POLLIN;
592+ Fds[NumFds].revents = 0 ;
593+ ConnectionIndices[NumFds] = -1 ;
594+ NumFds += 1 ;
595+ }
533596
534- // NOTE(fusion): Gather active connections.
535- int NumConnections = 0 ;
536- int *ConnectionIndices = (int *)alloca (g_Config.MaxConnections * sizeof (int ));
537- pollfd *ConnectionFds = (pollfd*)alloca (g_Config.MaxConnections * sizeof (pollfd));
538597 for (int i = 0 ; i < g_Config.MaxConnections ; i += 1 ){
539598 if (g_Connections[i].State == CONNECTION_FREE || g_Connections[i].Socket == -1 ){
540599 continue ;
541600 }
542601
543- ConnectionIndices[NumConnections] = i;
544- ConnectionFds[NumConnections].fd = g_Connections[i].Socket ;
545- ConnectionFds[NumConnections].events = POLLIN | POLLOUT;
546- ConnectionFds[NumConnections].revents = 0 ;
547- NumConnections += 1 ;
548- }
549-
550- if (NumConnections <= 0 ){
551- return ;
602+ Fds[NumFds].fd = g_Connections[i].Socket ;
603+ Fds[NumFds].events = POLLIN;
604+ if (g_Connections[i].State == CONNECTION_WRITING){
605+ Fds[NumFds].events |= POLLOUT;
606+ }
607+ Fds[NumFds].revents = 0 ;
608+ ConnectionIndices[NumFds] = i;
609+ NumFds += 1 ;
552610 }
553611
554- // NOTE(fusion): Poll connections.
555- int NumEvents = poll (ConnectionFds, NumConnections, 0 );
612+ ASSERT (NumFds > 0 );
613+ int NumEvents = poll (Fds, NumFds, - 1 );
556614 if (NumEvents == -1 ){
557- LOG_ERR (" Failed to poll connections: (%d) %s" , errno, strerrordesc_np (errno));
615+ if (errno != ETIMEDOUT){
616+ LOG_ERR (" Failed to poll connections: (%d) %s" ,
617+ errno, strerrordesc_np (errno));
618+ }
558619 return ;
559620 }
560621
561622 // NOTE(fusion): Process connections.
562- for (int i = 0 ; i < NumConnections; i += 1 ){
563- TConnection *Connection = &g_Connections[ConnectionIndices[i]];
564- int Events = (int )ConnectionFds[i].revents ;
565- CheckConnectionInput (Connection, Events);
566- CheckConnectionQueryRequest (Connection);
567- CheckConnectionQueryResponse (Connection);
568- CheckConnectionOutput (Connection, Events);
569- CheckConnection (Connection, Events);
623+ for (int i = 0 ; i < NumFds; i += 1 ){
624+ int Index = ConnectionIndices[i];
625+ int Events = (int )Fds[i].revents ;
626+ if (Index >= 0 && Index < g_Config.MaxConnections ){
627+ TConnection *Connection = &g_Connections[Index];
628+ CheckConnectionInput (Connection, Events);
629+ CheckConnectionQueryRequest (Connection);
630+ CheckConnectionQueryResponse (Connection);
631+ CheckConnectionOutput (Connection, Events);
632+ CheckConnection (Connection, Events);
633+ }else if (Index == -1 && Fds[i].fd == g_UpdateEvent){
634+ ConsumeUpdateEvent (Events);
635+ }else if (Index == -1 && Fds[i].fd == g_Listener){
636+ AcceptConnections (Events);
637+ }else {
638+ LOG_ERR (" Unknown connection index %d" , Index);
639+ }
570640 }
571641}
572642
573643bool InitConnections (void ){
644+ ASSERT (g_UpdateEvent == -1 );
574645 ASSERT (g_Listener == -1 );
575646 ASSERT (g_Connections == NULL );
576647
648+ g_UpdateEvent = eventfd (0 , EFD_NONBLOCK);
649+ if (g_UpdateEvent == -1 ){
650+ LOG_ERR (" Failed to create eventfd: (%d) (%s)" ,
651+ errno, strerrordesc_np (errno));
652+ return false ;
653+ }
654+
577655 g_Listener = ListenerBind ((uint16)g_Config.QueryManagerPort );
578656 if (g_Listener == -1 ){
579657 LOG_ERR (" Failed to bind listener" );
@@ -590,6 +668,11 @@ bool InitConnections(void){
590668}
591669
592670void ExitConnections (void ){
671+ if (g_UpdateEvent != -1 ){
672+ close (g_UpdateEvent);
673+ g_UpdateEvent = -1 ;
674+ }
675+
593676 if (g_Listener != -1 ){
594677 close (g_Listener);
595678 g_Listener = -1 ;
0 commit comments