@@ -32,7 +32,7 @@ parvaluesrc:@[value;`parvaluesrc;`log]; //where to source th
3232 //anything else will return a null date which is will be filled by pardefault
3333subfiltered : @ [value ;`subfiltered ;0b ]; //allows subscription filters to be loaded and applied in the rdb
3434
35- pardefault : @ [value ;`pardefault ;.z.D ] ; //if the src defined in parvaluesrc returns null, use this default date instead
35+ pardefault : @ [value ;`pardefault ;.proc.cp []] ; //if the src defined in parvaluesrc returns null, use this default date instead
3636tpcheckcycles : @ [value ;`tpcheckcycles ;0W ]; //specify the number of times the process will check for an available tickerplant
3737
3838/ - if the timer is not enabled, then exit with error
@@ -82,9 +82,9 @@ notifyhdb:{[h;d]
8282 @ [h;hdbmessage [d];{.lg.e [`notifyhdb ;"failed to send reload message to hdb on handle: " , x]}];
8383 };
8484
85- endofday : {[date ;processdata ]
85+ endofday : {[currp ; nextp ;processdata ]
8686 /-add date+1 to the rdbpartition global
87- rdbpartition ,:: date + 1 ;
87+ rdbpartition ,:: nextp ;
8888 .lg.o [`rdbpartition ;"rdbpartition contains - " , "," sv string rdbpartition ];
8989 / Need to download sym file to scratch directory if this is Finspace application
9090 if [.finspace.enabled ;
@@ -103,51 +103,47 @@ endofday:{[date;processdata]
103103 /-get a list of pairs (tablename;columnname!attributes)
104104 a : {(x;raze exec {(enlist x)! enlist ((# );enlist y;x)}' [c;a] from meta x where not null a)}each tables `. ;
105105 /-save and wipe the tables
106- writedown [hdbdir ;date ];
106+ writedown [hdbdir ;currp ];
107107 /-creates new changeset if this is a finspace application
108108 if [.finspace.enabled ;
109109 changeset : .finspace.createchangeset [.finspace.database ];
110110 ];
111- /-reset timeout to original timeout
112- restoretimeout [];
113111 /-reapply the attributes
114112 /-functional update is equivalent of {update col:`att#col from tab}each tables
115113 (! [;();0b ;]. )each a where 0 <count each a[;1 ];
116- rmdtfromgetpar [date ];
114+ rmdtfromgetpar [currp ];
117115 /-invoke any user defined post replay function
118- .save.postreplay [hdbdir ;date ];
116+ .save.postreplay [hdbdir ;currp ];
119117 /-notify all hdbs
120118 hdbs : distinct raze {exec w from .servers.getservers [x;y;()! ();1b ;0b ]}' [`proctype`procname ;(hdbtypes ;hdbnames )];
121119 $ [.finspace.enabled ;
122120 .finspace.notifyhdb [;changeset ] each .finspace.hdbclusters ;
123- notifyhdb [;date ] each hdbs
121+ notifyhdb [;currp ] each hdbs
124122 ];
125123 if [.finspace.enabled ;.os.hdeldir [getenv [`KDBSCRATCH ];0b ]]
126124 };
127125
128- reload : {[date ]
126+ reload : {[pt ]
129127 .lg.o [`reload ;"reload command has been called remotely" ];
130128 /-get all attributes from all tables before they are wiped
131129 /-get a list of pairs (tablename;columnname!attributes)
132130 a : {(x;raze exec {(enlist x)! enlist ((# );enlist y;x)}' [c;a] from meta x where not null a)}each tabs : subtables except ignorelist ;
133131 /-drop off the first eodtabcount[tab] for each of the tables
134132 dropfirstnrows each tabs ;
135- rmdtfromgetpar [date ];
133+ rmdtfromgetpar [pt ];
136134 /-reapply the attributes
137135 /-functional update is equivalent of {update col:`att#col from tab}each tables
138136 (! [;();0b ;]. )each a where 0 <count each a[;1 ];
139137 /-garbage collection if enabled
140138 if [gc ;.gc.run []];
141139 /-reset eodtabcount back to zero for each table (in case this is called more than once)
142140 eodtabcount [tabs ]: 0 ;
143- /-restore original timeout back to rdb
144- restoretimeout [];
145141 .lg.o [`reload ;"Finished reloading RDB" ];
146142 };
147143
148144/-drop date from rdbpartition
149- rmdtfromgetpar : {[date ]
150- rdbpartition :: rdbpartition except date ;
145+ rmdtfromgetpar : {[pt ]
146+ rdbpartition :: rdbpartition except pt ;
151147 .lg.o [`rdbpartition ;"rdbpartition contains - " , "," sv string rdbpartition ];
152148 }
153149
@@ -167,8 +163,8 @@ subscribe:{[]
167163 /-set the date that was returned by the subscription code i.e. the date for the tickerplant log file
168164 /-and a list of the tables that the process is now subscribing for
169165 subinfo : .sub.subscribe [subscribeto ;subscribesyms ;schema ;replaylog ;first s];
170- /-setting subtables and tplogdate globals
171- @ [`.rdb ;;: ;]' [`subtables`tplogdate ;subinfo `subtables`tplogdate ];
166+ /-setting subtables and currperiod globals
167+ @ [`.rdb ;;: ;]' [`subtables`currperiod ;subinfo `subtables`currperiod ];
172168 /-update metainfo table for the dataaccessapi
173169 if [`dataaccess in key .proc.params ;.dataaccess.metainfo: .dataaccess.metainfo upsert .checkinputs.getmetainfo []];
174170 /-apply subscription filters to replayed data
@@ -177,12 +173,12 @@ subscribe:{[]
177173
178174setpartition : {[]
179175 part : $ [parvaluesrc ~ `log ; /-get date from the tickerplant log file
180- [.lg.o [`setpartition ;"setting rdbpartition from date in tickerplant log file name :" , string tplogdate ]; tplogdate ];
176+ [.lg.o [`setpartition ;"setting rdbpartition from date+hour in tickerplant log file name :" , string currperiod ]; currperiod ];
181177 parvaluesrc ~ `tab ; /-look at the time column in the biggest table and take the first time value and cast to date (time has be to be timestamp/datetime to get a valid date)
182178 [largesttab : first subtables idesc count each value each subtables ;
183- .lg.o [`setpartition ;"setting rdbpartition from largest table (" , string [largesttab ], ")." ];. [$ ;(`date ;first largesttab [`time ]);0Nd ]];
179+ .lg.o [`setpartition ;"setting rdbpartition from largest table (" , string [largesttab ], ")." ];. [$ ;(`datetime ;first largesttab [`time ]);0Nd ]];
184180 0Nd ]; /-else just return null
185- rdbpartition :: enlist pardefault ^ part ;
181+ rdbpartition :: enlist .ps.periodtohour pardefault ^ part ;
186182 .lg.o [`setpartition ;"rdbpartition contains - " , "," sv string rdbpartition ];}
187183
188184loadsubfilters : {[]
@@ -202,20 +198,20 @@ getpartition:{[] rdbpartition}
202198notpconnected : {[]
203199 0 = count select from .sub.SUBSCRIPTIONS where proctype in .rdb.tickerplanttypes , active }
204200
205- /-resets timeout to 0 before EOD writedown
206- timeoutreset : {.rdb.timeout: system "T" ;system "T 0" };
207- restoretimeout : {system ["T " , string .rdb.timeout ]};
208201\ d .
209202
210203/- make sure that the process will make a connection to each of the gateways and hdb types
211204.servers.CONNECTIONS: distinct .servers.CONNECTIONS , .rdb.hdbtypes , .rdb.gatewaytypes
212205
213206/- adds endofday function to top level namespace
214- endofday : .rdb.endofday ;
207+ endofday : {}
215208
216209/-set the reload the function
217210reload : .rdb.reload
218211
212+ /-overwrite endofperiod with endofday
213+ endofperiod : {[currp ;nextp ;data ] .rdb.endofday [.ps.periodtohour currp ;.ps.periodtohour nextp ;data ]};
214+
219215/-load the sort csv
220216.sort.getsortcsv [.rdb.sortcsv ]
221217
@@ -227,17 +223,12 @@ $[.rdb.connectonstart;
227223 .servers.startupdepcycles [.rdb.tickerplanttypes ;.rdb.tpconnsleepintv ;.rdb.tpcheckcycles ];
228224 .rdb.subscribe [];
229225 ];
230- .rdb.tplogdate : .proc.cd []; // defines tplogdate for setpartition
226+ .rdb.currperiod : .proc.cp []; // defines currperiod for setpartition
231227 ]
232228
233229/-set the partition that is held in the rdb (for use by the gateway)
234230.rdb.setpartition []
235231
236- /-change timeout to zero before eod flush
237- /-GMT offset rounded to nearest 15 mins and added to roll time
238- .timer.repeat [.eodtime.nextroll -00 : 01 +{00:01 *15 *"j" $ (`minute $ x)%15 }(.proc.cp []-.z.p );0W ;1D;
239- (`.rdb.timeoutreset ;` );"Set rdb timeout to 0 for EOD writedown" ];
240-
241232/-send a signal to the old rdb and wdb (excluding the most recently started process) that the new rdb is ready for the next period.
242233.rdb.newrdbready: {[]
243234 if [1 <count h : exec w from .servers.SERVERS where proctype =`wdb ;
0 commit comments