Skip to content

Commit e1c2920

Browse files
committed
the remainder of postgres helpers (probably)
This should give us everything we need to implement all the supported queries.
1 parent a0f918b commit e1c2920

3 files changed

Lines changed: 138 additions & 33 deletions

File tree

src/database_postgres.cc

Lines changed: 127 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,30 @@
11
#if DATABASE_POSTGRESQL
22
#include "querymanager.hh"
33
#include "libpq-fe.h"
4+
#include <netinet/ip.h>
45

56
// IMPORTANT(fusion): With PostgreSQL being a distributed database, we cannot
67
// rely on automatic schema upgrades like in the case of SQLite. It must be
78
// managed manually and there must be an agreement on the current version
89
// which is why there is a `SchemaInfo` table.
910
#define POSTGRESQL_SCHEMA_VERSION 1
1011

12+
// IMPORTANT(fusion): PostgreSQL timestamps will count the number of microseconds
13+
// since 2000-01-01 00:00:00, with negative values for timestamps before it. To be
14+
// able to convert between PostgreSQL and UNIX timestamps we need the EPOCH of one
15+
// represented as a timestamp of the other, which is exactly what this is. This is
16+
// the PostgreSQL EPOCH represented as an UNIX timestamp.
17+
#define POSTGRESQL_EPOCH 946684800
18+
19+
// IMPORTANT(fusion): Address families used with INET and CIDR binary format.
20+
// They're taken from `utils/inet.h` which is not included with libpq but should
21+
// be stable across different systems, mostly because AF_INET should be stable
22+
// across different systems.
23+
#define POSTGRESQL_AF_INET (AF_INET + 0)
24+
#define POSTGRESQL_AF_INET6 (AF_INET + 1)
25+
STATIC_ASSERT(POSTGRESQL_AF_INET == 2);
26+
STATIC_ASSERT(POSTGRESQL_AF_INET6 == 3);
27+
1128
// IMPORTANT(fusion): These are the OIDs for a few of built-in data types in
1229
// PostgreSQL. They're taken from `catalog/pg_type_d.h` which is not included
1330
// with libpq but should be STABLE across different versions and are needed
@@ -81,16 +98,20 @@ static T *ParamAlloc(ParamBuffer *Params, int Count){
8198
}
8299

83100
static void ParamBegin(ParamBuffer *Params, int MaxParams, int PreferredFormat){
84-
ASSERT(MaxParams > 0);
85-
86101
// NOTE(fusion): Reset arena.
87102
memset(Params->Arena, 0, sizeof(Params->Arena));
88103
Params->ArenaPos = 0;
89104

90105
// NOTE(fusion): Reset params.
91-
Params->Values = ParamAlloc<const char*>(Params, MaxParams);
92-
Params->Lengths = ParamAlloc<int>(Params, MaxParams);
93-
Params->Formats = ParamAlloc<int>(Params, MaxParams);
106+
if(MaxParams > 0){
107+
Params->Values = ParamAlloc<const char*>(Params, MaxParams);
108+
Params->Lengths = ParamAlloc<int>(Params, MaxParams);
109+
Params->Formats = ParamAlloc<int>(Params, MaxParams);
110+
}else{
111+
Params->Values = NULL;
112+
Params->Lengths = NULL;
113+
Params->Formats = NULL;
114+
}
94115
Params->NumParams = 0;
95116
Params->MaxParams = MaxParams;
96117
Params->PreferredFormat = PreferredFormat;
@@ -136,7 +157,7 @@ static void ParamInteger(ParamBuffer *Params, int Value){
136157
BufferWrite32BE(Data, (uint32)Value);
137158
InsertBinaryParam(Params, Data, 4);
138159
}else{
139-
char Text[16] = {};
160+
char Text[16];
140161
StringBufFormat(Text, "%d", Value);
141162
InsertTextParam(Params, Text);
142163
}
@@ -152,6 +173,41 @@ static void ParamByteA(ParamBuffer *Params, const uint8 *Data, int Length){
152173
InsertBinaryParam(Params, Data, Length);
153174
}
154175

176+
static void ParamIPAddress(ParamBuffer *Params, int IPAddress){
177+
if(Params->PreferredFormat == 1){ // BINARY FORMAT
178+
uint8 Data[8];
179+
Data[0] = POSTGRESQL_AF_INET; // AddressFamily
180+
Data[1] = 32; // MaskBits
181+
Data[2] = 0; // IsCIDR
182+
Data[3] = 4; // AddressSize
183+
BufferWrite32BE(Data + 4, (uint32)IPAddress);
184+
InsertBinaryParam(Params, Data, 8);
185+
}else{
186+
char Text[16];
187+
StringBufFormat(Text, "%d.%d.%d.%d",
188+
((IPAddress >> 24) & 0xFF),
189+
((IPAddress >> 16) & 0xFF),
190+
((IPAddress >> 8) & 0xFF),
191+
((IPAddress >> 0) & 0xFF));
192+
InsertTextParam(Params, Text);
193+
}
194+
}
195+
196+
static void ParamTimestamp(ParamBuffer *Params, int Timestamp){
197+
if(Params->PreferredFormat == 1){ // BINARY FORMAT
198+
// NOTE(fusion): See `POSTGRESQL_EPOCH`.
199+
uint8 Data[8];
200+
int64 PGTimestamp = (int64)(Timestamp - POSTGRESQL_EPOCH) * 1000000;
201+
BufferWrite64BE(Data, (uint64)PGTimestamp);
202+
InsertBinaryParam(Params, Data, 8);
203+
}else{
204+
char Text[32];
205+
struct tm tm = GetGMTime((time_t)Timestamp);
206+
strftime(Text, sizeof(Text), "%Y-%m-%d %H:%M:%S+00", &tm);
207+
InsertTextParam(Params, Text);
208+
}
209+
}
210+
155211
// Result Helpers
156212
//==============================================================================
157213
struct AutoResultClear{
@@ -385,11 +441,11 @@ static int GetResultIPAddress(PGresult *Result, int Row, int Col){
385441
int Size = PQgetlength(Result, Row, Col);
386442
const uint8 *Data = (const uint8*)PQgetvalue(Result, Row, Col);
387443
if(Size >= 4){
388-
int AddressType = (int)Data[0]; // 0x02 for IPV4, 0x03 for IPV6
389-
// Data[1]; // mask bits
390-
// Data[2]; // always ZERO for INET, always ONE for CIDR
444+
int AddressFamily = (int)Data[0];
445+
// Data[1]; // MaskBits
446+
// Data[2]; // IsCIDR
391447
int AddressSize = (int)Data[3];
392-
if(AddressType == 2 && AddressSize == 4 && Size >= 8){
448+
if(AddressFamily == POSTGRESQL_AF_INET && AddressSize == 4 && Size >= 8){
393449
IPAddress = (int)BufferRead32BE(Data + 4);
394450
}else{
395451
LOG_ERR("CIDR/INET column (%d) %s doesn't contain IPV4 address",
@@ -433,18 +489,22 @@ static bool ParseTimestamp(int *Dest, const char *String){
433489
}
434490

435491
// NOTE(fusion): Parse optional timezone.
436-
int TimezoneOffset = 0;
437-
if(Rem[0] == '-' || Rem[0] == '+'){
438-
if(isdigit(Rem[1]) && isdigit(Rem[2])){
439-
TimezoneOffset = (Rem[1] - '0') * 10
440-
+ (Rem[2] - '0');
441-
if(Rem[0] == '+'){
442-
TimezoneOffset = -TimezoneOffset;
443-
}
492+
int GMTOffset = 0;
493+
if((Rem[0] == '-' || Rem[0] == '+') && isdigit(Rem[1]) && isdigit(Rem[2])){
494+
// NOTE(fusion): Hours.
495+
GMTOffset += ((Rem[1] - '0') * 10 + (Rem[2] - '0')) * 3600;
496+
497+
// NOTE(fusion): Optional minutes.
498+
if(isdigit(Rem[3]) && isdigit(Rem[4])){
499+
GMTOffset += ((Rem[3] - '0') * 10 + (Rem[4] - '0')) * 60;
500+
}
501+
502+
if(Rem[0] == '+'){
503+
GMTOffset = -GMTOffset;
444504
}
445505
}
446506

447-
*Dest = (int)timegm(&tm) + TimezoneOffset * 3600;
507+
*Dest = (int)timegm(&tm) + GMTOffset;
448508
return true;
449509
}
450510

@@ -490,12 +550,9 @@ static int GetResultTimestamp(PGresult *Result, int Row, int Col){
490550
case TIMESTAMPOID:
491551
case TIMESTAMPTZOID:{
492552
ASSERT(PQgetlength(Result, Row, Col) == 8);
493-
// IMPORTANT(fusion): The timestamp used by PostgreSQL is the number
494-
// of microseconds since 2000-01-01 00:00:00, with negative values
495-
// for timestamps before it.
496-
constexpr int64 PGEpoch = 946692000; // 2000-01-01 00:00:00
553+
// NOTE(fusion): See `POSTGRESQL_EPOCH`.
497554
int64 PGTimestamp = (int64)BufferRead64BE((const uint8*)PQgetvalue(Result, Row, Col));
498-
int64 Timestamp64 = ((PGTimestamp / 1000000) + PGEpoch);
555+
int64 Timestamp64 = ((PGTimestamp / 1000000) + POSTGRESQL_EPOCH);
499556
if(Timestamp64 < INT_MIN){
500557
Timestamp = INT_MIN;
501558
}else if(Timestamp64 > INT_MAX){
@@ -605,10 +662,10 @@ void DeleteStatementCache(TDatabase *Database){
605662
}
606663
}
607664

608-
// NOTE(fusion): This function would usually be called along with `PQreset`
609-
// or `PQfinish` but it's probably a good idea to close all prepared statements
610-
// if the connection is still going. There is no libpq wrapper but we can
611-
// execute `DEALLOCATE ALL`.
665+
// NOTE(fusion): This function would usually be called along with `PQreset` or
666+
// `PQfinish` but it's probably a good idea to close all prepared statements if
667+
// the connection is still going. There is no libpq wrapper but we can execute
668+
// `DEALLOCATE ALL`.
612669
if(PQstatus(Database->Handle) == CONNECTION_OK){
613670
if(!ExecInternal(Database, "DEALLOCATE ALL")){
614671
LOG_WARN("Failed to close all prepared statements");
@@ -621,11 +678,18 @@ void DeleteStatementCache(TDatabase *Database){
621678
}
622679
}
623680

624-
// IMPORTANT(fusion): Even though it is possible to declare parameter types with
625-
// OIDs, it is simpler to use explicit casts such as `$1::INTEGER` to enforce types.
626-
// It also makes so all relevant information about the query is packed into `Text`
627-
// so we don't need to track anything else to ensure statements with different
628-
// types are kept separate.
681+
// IMPORTANT(fusion): Even though it is possible to declare parameter types
682+
// with OIDs, it is simpler to use explicit casts such as `$1::INTEGER` to
683+
// enforce types. It also makes so all relevant information about the query
684+
// is packed into `Text` so we don't need to track anything else to ensure
685+
// queries with with different types are kept separate.
686+
// Keep in mind that using the same parameter multiple times with different
687+
// explicit type casts will make so only the first one is used when inferring
688+
// the actual parameter type. Others are considered casts from it.
689+
// For example, `SELECT $1::TIMESTAMP, $1::TIMESTAMPTZ` will make so $1 is
690+
// inferred as `TIMESTAMP`, so `$1::TIMESTAMPTZ` will actually be a cast from
691+
// `TIMESTAMP` into `TIMESTAMPTZ`, which will most likely yield unexpected
692+
// results.
629693
const char *PrepareQuery(TDatabase *Database, const char *Text){
630694
ASSERT(Database != NULL);
631695
EnsureStatementCache(Database);
@@ -828,6 +892,36 @@ TDatabase *DatabaseOpen(void){
828892
return NULL;
829893
}
830894

895+
#if 1
896+
{
897+
// TODO(fusion): REMOVE. This was for testing TIMESTAMP input/output, to make
898+
// sure they were consistent across different formats (text/binary).
899+
const char *Stmt = PrepareQuery(Database, "SELECT $1::TIMESTAMP, $2::TIMESTAMPTZ");
900+
ASSERT(Stmt != NULL);
901+
902+
int Timestamp = (int)time(NULL);
903+
LOG("TIMESTAMP: %d", Timestamp);
904+
905+
for(int i = 0; i <= 1; i += 1)
906+
for(int j = 0; j <= 1; j += 1){
907+
LOG("TEXT (%d, %d)", i, j);
908+
ParamBuffer Params;
909+
ParamBegin(&Params, 2, i);
910+
ParamTimestamp(&Params, Timestamp);
911+
ParamTimestamp(&Params, Timestamp);
912+
PGresult *Result = PQexecPrepared(Database->Handle, Stmt, Params.NumParams,
913+
Params.Values, Params.Lengths, Params.Formats, j);
914+
AutoResultClear ResultGuard(Result);
915+
if(PQresultStatus(Result) == PGRES_TUPLES_OK){
916+
LOG("0: %d", GetResultTimestamp(Result, 0, 0));
917+
LOG("1: %d", GetResultTimestamp(Result, 0, 1));
918+
}else{
919+
LOG_ERR("Failed to execute query: %s", PQerrorMessage(Database->Handle));
920+
}
921+
}
922+
}
923+
#endif
924+
831925
return Database;
832926
}
833927

src/querymanager.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,16 @@ struct tm GetLocalTime(time_t t){
7474
return result;
7575
}
7676

77+
struct tm GetGMTime(time_t t){
78+
struct tm result;
79+
#if COMPILER_MSVC
80+
gmtime_s(&result, &t);
81+
#else
82+
gmtime_r(&t, &result);
83+
#endif
84+
return result;
85+
}
86+
7787
int64 GetClockMonotonicMS(void){
7888
#if OS_WINDOWS
7989
LARGE_INTEGER Counter, Frequency;

src/querymanager.hh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ void LogAddVerbose(const char *Prefix, const char *Function,
146146
const char *File, int Line, const char *Format, ...) ATTR_PRINTF(5, 6);
147147

148148
struct tm GetLocalTime(time_t t);
149+
struct tm GetGMTime(time_t t);
149150
int64 GetClockMonotonicMS(void);
150151
int GetMonotonicUptimeMS(void);
151152
void SleepMS(int DurationMS);

0 commit comments

Comments
 (0)