Skip to content

Commit b0a73dc

Browse files
authored
added hourly partitioning (#665)
1 parent 6f36c3d commit b0a73dc

16 files changed

Lines changed: 360 additions & 55 deletions

File tree

code/common/pubsub.q

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ init:{[t]
164164
$[10h~type last val;'last val;val]
165165
};
166166

167+
// converts timestamps to integers which represents hours.
168+
// 2024.09.24D16:23:13.445394106 -> 2024092416
169+
.ps.periodtohour:{{"J"$(ssr[;".";""]string x),string `hh$y} . "dv"$x};
170+
167171
// Striping data in a TorQ Installation
168172
// use mod to stripe into number of segments
169173
.ds.map:{[numseg;sym] sym!(sum each string sym)mod numseg};

code/common/subscriptions.q

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ subscribe:{[tabs;instrs;setschema;replaylog;proc]
130130
:d,{(where 101 = type each x)_x}(`i`icounts`d)!(details[`logfilelist][0;0];details[`rowcounts];details[`date])];
131131
if[tptype~`segmented;
132132
retdic:`logdir`subtables!(details[`logdir];details[`schemalist][;0]);
133-
:retdic,{(where 101 = type each x)_x}`i`icounts`d`tplogdate!details[`logfilelist`rowcounts`date`date];
133+
:retdic,{(where 101 = type each x)_x}`i`icounts`d`tplogdate`currperiod!details[`logfilelist`rowcounts`date`date`currperiod];
134134
]
135135
}
136136

code/processes/rdb.q

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ parvaluesrc:@[value;`parvaluesrc;`log]; //where to source th
3232
//anything else will return a null date which is will be filled by pardefault
3333
subfiltered:@[value;`subfiltered;0b]; //allows subscription filters to be loaded and applied in the rdb
3434

35-
pardefault:@[value;`pardefault;.z.D]; //if the src defined in parvaluesrc returns null, use this default date instead
35+
pardefault:@[value;`pardefault;.proc.cp[]]; //if the src defined in parvaluesrc returns null, use this default date instead
3636
tpcheckcycles:@[value;`tpcheckcycles;0W]; //specify the number of times the process will check for an available tickerplant
3737

3838
/ - if the timer is not enabled, then exit with error
@@ -82,9 +82,9 @@ notifyhdb:{[h;d]
8282
@[h;hdbmessage[d];{.lg.e[`notifyhdb;"failed to send reload message to hdb on handle: ",x]}];
8383
};
8484

85-
endofday:{[date;processdata]
85+
endofday:{[currp;nextp;processdata]
8686
/-add date+1 to the rdbpartition global
87-
rdbpartition,:: date +1;
87+
rdbpartition,:: nextp;
8888
.lg.o[`rdbpartition;"rdbpartition contains - ","," sv string rdbpartition];
8989
/ Need to download sym file to scratch directory if this is Finspace application
9090
if[.finspace.enabled;
@@ -103,51 +103,47 @@ endofday:{[date;processdata]
103103
/-get a list of pairs (tablename;columnname!attributes)
104104
a:{(x;raze exec {(enlist x)!enlist((#);enlist y;x)}'[c;a] from meta x where not null a)}each tables`.;
105105
/-save and wipe the tables
106-
writedown[hdbdir;date];
106+
writedown[hdbdir;currp];
107107
/-creates new changeset if this is a finspace application
108108
if[.finspace.enabled;
109109
changeset:.finspace.createchangeset[.finspace.database];
110110
];
111-
/-reset timeout to original timeout
112-
restoretimeout[];
113111
/-reapply the attributes
114112
/-functional update is equivalent of {update col:`att#col from tab}each tables
115113
(![;();0b;].)each a where 0<count each a[;1];
116-
rmdtfromgetpar[date];
114+
rmdtfromgetpar[currp];
117115
/-invoke any user defined post replay function
118-
.save.postreplay[hdbdir;date];
116+
.save.postreplay[hdbdir;currp];
119117
/-notify all hdbs
120118
hdbs:distinct raze {exec w from .servers.getservers[x;y;()!();1b;0b]}'[`proctype`procname;(hdbtypes;hdbnames)];
121119
$[.finspace.enabled;
122120
.finspace.notifyhdb[;changeset] each .finspace.hdbclusters;
123-
notifyhdb[;date] each hdbs
121+
notifyhdb[;currp] each hdbs
124122
];
125123
if[.finspace.enabled;.os.hdeldir[getenv[`KDBSCRATCH];0b]]
126124
};
127125

128-
reload:{[date]
126+
reload:{[pt]
129127
.lg.o[`reload;"reload command has been called remotely"];
130128
/-get all attributes from all tables before they are wiped
131129
/-get a list of pairs (tablename;columnname!attributes)
132130
a:{(x;raze exec {(enlist x)!enlist((#);enlist y;x)}'[c;a] from meta x where not null a)}each tabs:subtables except ignorelist;
133131
/-drop off the first eodtabcount[tab] for each of the tables
134132
dropfirstnrows each tabs;
135-
rmdtfromgetpar[date];
133+
rmdtfromgetpar[pt];
136134
/-reapply the attributes
137135
/-functional update is equivalent of {update col:`att#col from tab}each tables
138136
(![;();0b;].)each a where 0<count each a[;1];
139137
/-garbage collection if enabled
140138
if[gc;.gc.run[]];
141139
/-reset eodtabcount back to zero for each table (in case this is called more than once)
142140
eodtabcount[tabs]:0;
143-
/-restore original timeout back to rdb
144-
restoretimeout[];
145141
.lg.o[`reload;"Finished reloading RDB"];
146142
};
147143

148144
/-drop date from rdbpartition
149-
rmdtfromgetpar:{[date]
150-
rdbpartition:: rdbpartition except date;
145+
rmdtfromgetpar:{[pt]
146+
rdbpartition:: rdbpartition except pt;
151147
.lg.o[`rdbpartition;"rdbpartition contains - ","," sv string rdbpartition];
152148
}
153149

@@ -167,8 +163,8 @@ subscribe:{[]
167163
/-set the date that was returned by the subscription code i.e. the date for the tickerplant log file
168164
/-and a list of the tables that the process is now subscribing for
169165
subinfo:.sub.subscribe[subscribeto;subscribesyms;schema;replaylog;first s];
170-
/-setting subtables and tplogdate globals
171-
@[`.rdb;;:;]'[`subtables`tplogdate;subinfo`subtables`tplogdate];
166+
/-setting subtables and currperiod globals
167+
@[`.rdb;;:;]'[`subtables`currperiod;subinfo`subtables`currperiod];
172168
/-update metainfo table for the dataaccessapi
173169
if[`dataaccess in key .proc.params;.dataaccess.metainfo:.dataaccess.metainfo upsert .checkinputs.getmetainfo[]];
174170
/-apply subscription filters to replayed data
@@ -177,12 +173,12 @@ subscribe:{[]
177173

178174
setpartition:{[]
179175
part: $[parvaluesrc ~ `log; /-get date from the tickerplant log file
180-
[.lg.o[`setpartition;"setting rdbpartition from date in tickerplant log file name :",string tplogdate];tplogdate];
176+
[.lg.o[`setpartition;"setting rdbpartition from date+hour in tickerplant log file name :",string currperiod];currperiod];
181177
parvaluesrc ~ `tab; /-look at the time column in the biggest table and take the first time value and cast to date (time has be to be timestamp/datetime to get a valid date)
182178
[largesttab: first subtables idesc count each value each subtables;
183-
.lg.o[`setpartition;"setting rdbpartition from largest table (",string[largesttab],")."];.[$;(`date;first largesttab[`time]);0Nd]];
179+
.lg.o[`setpartition;"setting rdbpartition from largest table (",string[largesttab],")."];.[$;(`datetime;first largesttab[`time]);0Nd]];
184180
0Nd]; /-else just return null
185-
rdbpartition:: enlist pardefault ^ part;
181+
rdbpartition:: enlist .ps.periodtohour pardefault ^ part;
186182
.lg.o[`setpartition;"rdbpartition contains - ","," sv string rdbpartition];}
187183

188184
loadsubfilters:{[]
@@ -202,20 +198,20 @@ getpartition:{[] rdbpartition}
202198
notpconnected:{[]
203199
0 = count select from .sub.SUBSCRIPTIONS where proctype in .rdb.tickerplanttypes, active}
204200

205-
/-resets timeout to 0 before EOD writedown
206-
timeoutreset:{.rdb.timeout:system"T";system"T 0"};
207-
restoretimeout:{system["T ", string .rdb.timeout]};
208201
\d .
209202

210203
/- make sure that the process will make a connection to each of the gateways and hdb types
211204
.servers.CONNECTIONS:distinct .servers.CONNECTIONS,.rdb.hdbtypes,.rdb.gatewaytypes
212205

213206
/- adds endofday function to top level namespace
214-
endofday: .rdb.endofday;
207+
endofday:{}
215208

216209
/-set the reload the function
217210
reload:.rdb.reload
218211

212+
/-overwrite endofperiod with endofday
213+
endofperiod:{[currp;nextp;data] .rdb.endofday[.ps.periodtohour currp;.ps.periodtohour nextp;data]};
214+
219215
/-load the sort csv
220216
.sort.getsortcsv[.rdb.sortcsv]
221217

@@ -227,17 +223,12 @@ $[.rdb.connectonstart;
227223
.servers.startupdepcycles[.rdb.tickerplanttypes;.rdb.tpconnsleepintv;.rdb.tpcheckcycles];
228224
.rdb.subscribe[];
229225
];
230-
.rdb.tplogdate:.proc.cd[]; // defines tplogdate for setpartition
226+
.rdb.currperiod:.proc.cp[]; // defines currperiod for setpartition
231227
]
232228

233229
/-set the partition that is held in the rdb (for use by the gateway)
234230
.rdb.setpartition[]
235231

236-
/-change timeout to zero before eod flush
237-
/-GMT offset rounded to nearest 15 mins and added to roll time
238-
.timer.repeat[.eodtime.nextroll-00:01+{00:01*15*"j"$(`minute$x)%15}(.proc.cp[]-.z.p);0W;1D;
239-
(`.rdb.timeoutreset;`);"Set rdb timeout to 0 for EOD writedown"];
240-
241232
/-send a signal to the old rdb and wdb (excluding the most recently started process) that the new rdb is ready for the next period.
242233
.rdb.newrdbready:{[]
243234
if[1<count h:exec w from .servers.SERVERS where proctype=`wdb;

code/processes/segmentedtickerplant.q

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ tablelist:{.stpps.t}
2828

2929
// subscribers who want to replay need this info
3030
subdetails:{[tabs;instruments]
31-
`schemalist`logfilelist`rowcounts`date`logdir!(.ps.subscribe\:[tabs;instruments];.stplg.replaylog[tabs];tabs#.stplg `rowcount;(.eodtime `d);.stplg.kdbtplog)
31+
`schemalist`logfilelist`rowcounts`date`logdir`currperiod!(.ps.subscribe\:[tabs;instruments];.stplg.replaylog[tabs];tabs#.stplg `rowcount;(.eodtime `d);.stplg.kdbtplog;.stplg.currperiod)
3232
}
3333

3434
// Generate table and schema information and set up default table UPD functions

code/processes/wdb.q

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ notifyidbs:{[func;params]
179179
};
180180

181181
/- eod - flush remaining data to disk
182-
endofday:{[pt;processdata]
183-
.lg.o[`eod;"end of day message received - ",string pt];
182+
endofday:{[currp;nextp;data]
183+
.lg.o[`eod;"end of day message received - ",string currp];
184184
/- set what type of merge method to be used
185185
mergemethod:mergemode;
186186
/- create a dictionary of tables and merge limits, byte or row count limit depending on settings
@@ -194,16 +194,16 @@ endofday:{[pt;processdata]
194194
];
195195
/ - if save mode is enabled then flush all data to disk
196196
if[saveenabled;
197-
endofdaysave[savedir;pt];
197+
endofdaysave[savedir;currp];
198198
/ - if sort mode enable call endofdaysort within the process,else inform the sort and reload process to do it
199-
$[sortenabled;endofdaysort;informsortandreload] . (savedir;pt;tablist;writedownmode;mergelimits;hdbsettings;mergemethod);
199+
$[sortenabled;endofdaysort;informsortandreload] . (savedir;currp;tablist;writedownmode;mergelimits;hdbsettings;mergemethod);
200200
if[.finspace.enabled;changeset:.finspace.createchangeset[.finspace.database]];
201201
];
202202
.lg.o[`eod;"deleting data from ",$[r:writedownmode in partwritemodes;"partsizes";"tabsizes"]];
203203
$[r;@[`.merge;`partsizes;0#];@[`.wdb;`tabsizes;0#]];
204204
/-notify all finspace hdbs
205205
if[.finspace.enabled;.finspace.notifyhdb[;changeset] each .finspace.hdbclusters];
206-
currentpartition::pt+1;
206+
currentpartition::nextp;
207207
/- in case of default/partbyenum writedown mode we want to initialise the new partition with all the table schemas
208208
/- then notify idb processes of the new db
209209
if[writedownmode in `partbyenum`default;
@@ -507,15 +507,15 @@ subscribe:{[]
507507
/- function to rectify data written to wrong partition
508508
fixpartition:{[subto]
509509
/- check if the tp logdate matches current date
510-
if[not (tplogdate:subto[`tplogdate])~orig:currentpartition;
510+
if[not (tplogperiod:.ps.periodtohour subto[`currperiod])~orig:currentpartition;
511511
.lg.o[`fixpartition;"Current partiton date does not match the ticker plant log date"];
512-
/- set the current partiton date to the log date
513-
currentpartition::tplogdate;
512+
/- set the current partition date to the log date
513+
currentpartition::tplogperiod;
514514
/- move the data that has been written to correct partition
515515
pth1:.os.pth[-1 _ string .Q.par[savedir;orig;`]];
516-
pth2:.os.pth[-1 _ string .Q.par[savedir;tplogdate;`]];
516+
pth2:.os.pth[-1 _ string .Q.par[savedir;tplogperiod;`]];
517517
if[not ()~key hsym `$.os.pthq pth1;
518-
/- delete any data in the current partiton directory
518+
/- delete any data in the current partition directory
519519
clearwdbdata[];
520520
.lg.o[`fixpartition;"Moving data from partition ",(.os.pthq pth1) ," to partition ",.os.pthq pth2];
521521
.[.os.ren;(pth1;pth2);{.lg.e[`fixpartition;"Failed to move data from wdb partition ",x," to wdb partition ",y," : ",z]}[pth1;pth2]]];
@@ -551,7 +551,7 @@ replayupd:{[f;t;d]
551551
savetables[savedir;getpartition[];0b;t]]
552552
}[upd];
553553

554-
/ - if there is data in the wdb directory for the partition remove it before replay
554+
/ - if there is data in the wdb directory for the partition, if there is remove it before replay
555555
/ - is only for wdb processes that are saving data to disk
556556
clearwdbdata:{[]
557557
$[saveenabled and not () ~ key wdbpart:.Q.par[savedir;getpartition[];`];
@@ -601,11 +601,12 @@ getsortparams:{[]
601601
.servers.CONNECTIONS:(distinct .servers.CONNECTIONS,.wdb.hdbtypes,.wdb.rdbtypes,.wdb.gatewaytypes,.wdb.tickerplanttypes,.wdb.sorttypes,.wdb.sortworkertypes,.wdb.idbtypes) except `;
602602

603603
/- adds endofday function to top level namespace
604-
endofday: .wdb.endofday;
604+
//endofday: .wdb.endofday;
605+
endofday: {};
606+
endofperiod: {[currp;nextp;data] .wdb.endofday[.ps.periodtohour currp;.ps.periodtohour nextp;data]};
607+
605608
/- setting the upd and .u.end functions as the .wdb versions
606-
.u.end:{[pt]
607-
.wdb.endofday[.wdb.getpartition[];()!()];
608-
}
609+
.u.end: {};
609610

610611
/- set the replay upd
611612
.lg.o[`init;"setting the log replay upd function"];

code/wdb/writedown.q

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ maxrows:{[tabname] numrows^numtab[tabname]}; /- ex
1515
partitiontype:@[value;`partitiontype;`date]; /-set type of partition (defaults to `date)
1616

1717

18-
getpartition:@[value;`getpartition; /-function to determine the partition value
19-
{{@[value;`.wdb.currentpartition;
20-
(`date^partitiontype)$.proc.cd[]]}}];
18+
getpartition:{@[value;`.wdb.currentpartition;
19+
.ps.periodtohour .proc.cp[]]}; /- function to determine the partition value
2120

2221
currentpartition:.wdb.getpartition[]; /- Initialise current partiton
2322

config/settings/segmentedtickerplant.q

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ multilog:`tabperiod; // [tabperiod|none|periodic|tabular|custom]
66
multilogperiod:0D01; // Length of period for STP periodic logging modes
77
errmode:1b; // Enable error mode for STP
88
batchmode:`defaultbatch; // [memorybatch|defaultbatch|immediate]
9-
replayperiod:`day // [period|day|prior]
9+
replayperiod:`period // [period|day|prior]
1010
customcsv:hsym first .proc.getconfigfile["stpcustom.csv"]; // Location for custom logging mode csv
1111
kdbtplog:`$getenv`KDBTPLOG;
1212

config/settings/wdb.q

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ upd:insert
4949
replay:1b // replay the tickerplant log file
5050
schema:1b // retrieve schema from tickerplant
5151
settimer:0D00:00:10 // timer to check if data needs written to disk
52-
partitiontype:`date // set type of partition (defaults to `date, can be `date, `month or `year)
53-
getpartition:{@[value;`.wdb.currentpartition;(`date^partitiontype)$.proc.cd[]]} // function to determine the partition value
52+
partitiontype:`int // set type of partition (defaults to `date, can be `date, `month or `year)
53+
getpartition:{@[value;`.wdb.currentpartition;
54+
.ps.periodtohour .proc.cp[]]} // function to determine the partition value
5455
reloadorder:`hdb`rdb // order to reload hdbs and rdbs
5556
hdbdir:`:hdb // move wdb database to different location
5657
sortcsv:hsym first .proc.getconfigfile"sort.csv" // location of csv file

0 commit comments

Comments
 (0)