-
Notifications
You must be signed in to change notification settings - Fork 85
Expand file tree
/
Copy pathdataaccess.q
More file actions
231 lines (209 loc) · 10.4 KB
/
dataaccess.q
File metadata and controls
231 lines (209 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
\d .dataaccess
forceservers:0b;
// dictionary containing aggregate functions needed to calculate map-reducable
// values over multiple processes
aggadjust:(!). flip(
(`avg; {flip(`sum`count;2#x)});
(`cor; {flip(`wsum`count`sum`sum`sumsq`sumsq;@[x;(enlist(0;1);0;0;1;0;1)])});
(`count; `);
(`cov; {flip(`wsum`count`sum`sum;@[x;(enlist(0;1);0;0;1)])});
(`dev; {flip(`sumsq`count`sum;3#x)});
(`distinct;`);
(`first; `);
(`last; `);
(`max; `);
(`min; `);
(`prd; `);
(`sum; `);
(`var; {flip(`sumsq`count`sum;3#x)});
(`wavg; {flip(`wsum`sum;(enlist(x 0;x 1);x 0))});
(`wsum; {enlist(`wsum;enlist(x 0;x 1))}));
// function to make symbols strings with an upper case first letter
camel:{$[11h~type x;@[;0;upper]each string x;@[string x;0;upper]]};
// function that creates aggregation where X(X1,X2)=X(X(X1),X(X2)) where X is
// the aggregation and X1 and X2 are non overlapping subsets of a list
absagg:{enlist[`$x,y]!enlist(value x;`$x,y)};
// functions to calculate avg, cov and var in mapaggregate dictionary
avgf:{(%;(sum;`$"sum",x);scx y)};
covf:{(-;(%;swsum[x;y];scx x);(*;avgf[x;x];avgf[y;x]))};
varf:{(-;(%;(sum;`$"sumsq",y);scx x);(xexp;avgf[y;x];2))};
// functions to sum counts and wsums in mapaggregate dictioanry
scx:{(sum;`$"count",x)};
swsum:{(sum;`$"wsum",x,y)}
// dictionary containing the functions needed to aggregate results together for
// map reducable aggregations
mapaggregate:(!). flip(
(`avg; {enlist[`$"avg",x]!enlist(%;(sum;`$"sum",x);scx x)});
(`cor; {enlist[`$"cor",x,w]!enlist(%;covf[x;w];(*;(sqrt;varf[x;x]);(sqrt;varf[(x:x 0);w:x 1])))});
(`count; {enlist[`$"count",x]!enlist scx x});
(`cov; {enlist[`$"cov",x,w]!enlist covf[x:x 0;w:x 1]});
(`dev; {enlist[`$"dev",x]!enlist(sqrt;varf[x;x])});
(`first; {enlist[`$"first",x]!enlist(*:;`$"first",x)});
(`last; {absagg["last";x]});
(`max; {absagg["max";x]});
(`min; {absagg["min";x]});
(`prd; {absagg["prd";x]});
(`sum; {absagg["sum";x]});
(`var; {enlist[`$"var",x]!enlist varf[x;x]});
(`wavg; {enlist[`$"wavg",x,w]!enlist(%;swsum[x:x 0;w:x 1];(sum;`$"sum",x))});
(`wsum; {enlist[`$"wsum",x,w]!enlist swsum[x:x 0;w:x 1]}));
// function to convert sorting
go:{if[`asc=x[0];:(xasc;x[1])];:(xdesc;x[1])};
// Full generality dataaccess function in the gateway
getdata:{[o]
// Input checking in the gateway
reqno:.requests.initlogger[o];
o:@[.checkinputs.checkinputs;o;.requests.error[reqno]];
// Get the Procs
if[not `procs in key o;o[`procs]:attributesrouting[o;partdict[o]]];
// Get Default process behavior
default:`timeout`postback`sublist`getquery`queryoptimisation`postprocessing!(0Wn;();0W;0b;1b;{:x;});
// Use upserting logic to determine behaviour
options:default,o;
if[`ordering in key o;options[`ordering]: go each options`ordering];
o:adjustqueries[o;partdict o];
options[`mapreduce]:0b;
gr:$[`grouping in key options;options`grouping;`];
if[`aggregations in key options;
if[all key[options`aggregations]in key aggadjust;
options[`mapreduce]:not`date in gr]];
// Execute the queries
if[options`getquery;
$[.gw.call .z.w;
:.gw.syncexec[(`.dataaccess.buildquery;o);options[`procs]];
:.gw.asyncexec[(`.dataaccess.buildquery;o);options[`procs]]]];
:$[.gw.call .z.w;
// if sync
.gw.syncexecjt[(`getdata;o);options[`procs];autojoin[options];options[`timeout]];
// if async
.gw.asyncexecjpt[(`getdata;o);options[`procs];autojoin[options];options[`postback];options[`timeout]]];
};
// join results together if from multiple processes
autojoin:{[options]
// if there is only one proc queried output the table
if[1=count options`procs;:first];
// if there is no need for map reducable adjustment, return razed results
:$[not options`mapreduce;razeresults[options;];mapreduceres[options;]];
};
// raze results and call process res to apply postprocessing and sublist
razeresults:{[options;res]
res:raze res;
processres[options;res]
};
//apply sublist and post processing to joined results
processres:{[options;res]
res:(options`postprocessing)res;
:$[(options`sublist)<>0W;(options`sublist) sublist res;res];
};
// function to correctly reduce two tables to one
mapreduceres:{[options;res]
// raze the result sets together
res:$[all 99h=type each res;
(){x,0!y}/res;
(),/res];
aggs:options`aggregations;
aggs:flip(key[aggs]where count each value aggs;raze aggs);
// distinct may be present as only agg, so apply distinct again
if[all`distinct=first each aggs;:?[res;();1b;()]];
// collecting the appropriate grouping argument for map-reduce aggs
gr:$[all`grouping`timebar in key options;
a!a:options[`timebar;2],options`grouping;
`grouping in key options;
a!a:(),options`grouping;
`timebar in key options;
a!a:(),options[`timebar;2];
0b];
// select aggs by gr from res
res:?[res;();gr;raze{mapaggregate[x 0;camel x 1]}'[aggs]];
//apply sublist and postprocesing to map reduced results
processres[options;res]
};
// Dynamic routing finds all processes with relevant data
attributesrouting:{[options;procdict]
// Get the tablename and timespan
timespan:$[7h~tp:type first value procdict;`long$options[`starttime`endtime];`date$options[`starttime`endtime]];
//if int partitioned adjust rdb partition range to cover all periods up to days end to facilitate correct grouping of partitions
if[`rdb in key procdict and 7h~tp; procdict[`rdb]:(first[procdict `rdb];-1+`long$01D00 + last procdict `rdb)];
// See if any of the provided partitions are with the requested ones
procdict:{[x;timespan] (all x within timespan) or any timespan within x}[;timespan] each procdict;
// Only return appropriate partitions
types:(key procdict) where value procdict;
// If the partitions are out of scope of processes then error
if[0=count types;
'`$"gateway error - no info found for that table name and time range. Either table does not exist; attributes are incorect in .gw.servers on gateway, or the date range is outside the ones present"
];
:types;
};
// Generates a dictionary of `tablename!mindate;maxdate
partdict:{[input]
tabname:input[`tablename];
// Remove duplicate servertypes from the gw.servers
servers:select from .gw.servers where i=(first;i)fby servertype;
// extract the procs which have the table defined
servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes;
// Create a dictionary of the attributes against servertypes
procdict:exec servertype!attributes[;`partition] from servers;
// If the response is a dictionary index into the tablename
procdict:@[procdict;key procdict;{[x;tabname]if[99h=type x;:x[tabname]];:x}[;tabname]];
// returns the dictionary as min date/ max date
procdict:asc @[procdict;key procdict;{:(min x; max x)}];
// prevents overlap if more than one process contains a specified date
if[1<count procdict;
procdict:{:$[y~`date$();x;$[within[x 0;(min y;max y)];(1+max[y];x 1);x]]}':[procdict]];
:procdict;
};
// function to adjust the queries being sent to processes to prevent overlap of
// time clause and data being queried on more than one process
adjustqueries:{[options;part]
// if only one process then no need to adjust
if[2>count p:options`procs;:options];
// get the tablename
tabname:options[`tablename];
// remove duplicate servertypes from the gw.servers
servers:select from .gw.servers where i=(first;i)fby servertype;
// extract the procs which have the table defined
servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes;
// create a dictionary of the attributes against servertypes
procdict:exec servertype!attributes[;`partition] from servers;
// if the response is a dictionary index into the tablename
procdict:@[procdict;key procdict;{[x;tabname]if[99h=type x;:x[tabname]];:x}[;tabname]];
// create list of all available partitions
possparts:raze value procdict;
//group partitions to relevant process
partitions:group key[part]where each{within[y;]each value x}[part]'[possparts];
partitions:possparts{(min x;max x)}'[partitions];
partitions:`timestamp$partitions;
// adjust the times to account for period end time when int partitioned
c:first[partitions`hdb],-1+ first[partitions`rdb];
d:first[partitions`rdb],options `endtime;
partitions:@[@[partitions;`hdb;:;c];`rdb;:;d];
// if start/end time not a date, then adjust dates parameter for the correct types
if[not a:-12h~tp:type start:options`starttime;
// converts partitions dictionary to timestamps/datetimes
partitions:$[-15h~tp;"z"$;]{(0D+x 0;x[1]+1D-1)}'[partitions];
// convert first and last timestamp to start and end time
partitions:@[partitions;f;:;(start;partitions[f:first key partitions;1])];
partitions:@[partitions;l;:;(partitions[l:last key partitions;0];options`endtime)]];
// adjust map reducable aggregations to get correct components
if[(1<count partitions)&`aggregations in key options;
if[all key[o:options`aggregations]in key aggadjust;
aggs:mapreduce[o;$[`grouping in key options;options`grouping;`]];
options:@[options;`aggregations;:;aggs]]];
// create a dictionary of procs and different queries
:{@[@[x;`starttime;:;y 0];`endtime;:;y 1]}[options]'[partitions];
};
// function to grab the correct aggregations needed for aggregating over
// multiple processes
mapreduce:{[aggs;gr]
// if there is a date grouping any aggregation is allowed
if[`date in gr;:aggs];
// format aggregations into a paired list
aggs:flip(key[aggs]where count each value aggs;raze aggs);
// if aggregations are not map-reducable and there is no date grouping,
// then error
if[not all aggs[;0]in key aggadjust;
'`$"to perform non-map reducable aggregations automatically over multiple processes there must be a date grouping"];
// aggregations are map reducable (with potential non-date groupings)
aggs:distinct raze{$[`~a:.dataaccess.aggadjust x 0;enlist x;a x 1]}'[aggs];
:first'[aggs]!last'[aggs];
};