Skip to content

Commit ca74d19

Browse files
authored
Gateway .z.ws handling, always execute join function, removed .gw.placehold (#388)
* Added .z.ws callback to .gw.call dictionary handling * Treat WebSockets as asynchronous connections and always execute join function * Removed placeholder system for gw results * Fix for old dataaccess code
1 parent 54559b1 commit ca74d19

2 files changed

Lines changed: 14 additions & 17 deletions

File tree

code/gateway/dataaccess.q

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ getdata:{[o]
9090
// join results together if from multiple processes
9191
autojoin:{[options]
9292
// if there is only one proc queried output the table
93-
if[1=count options`procs;:{::}];
93+
if[1=count options`procs;:first];
9494
// if there is no need for map reducable adjustment, return razed results
9595
if[not options`mapreduce;:raze];
9696
:mapreduceres[options;];

code/processes/gateway.q

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,9 @@ queryqueue:([queryid:`u#`long$()] time:`timestamp$(); clienth:`g#`int$(); query:
104104
// client details
105105
clients:([]time:`timestamp$(); clienth:`g#`int$(); user:`symbol$(); ip:`int$(); host:`symbol$())
106106

107-
//Function to generate random placeholder
108-
genrand:{system"S ",string `int$.z.T;rand 0Ng}
109-
110-
//Generate random placeholder
111-
placehold:.gw.genrand[]
112-
113107
// structure to store query results from back end servers
114-
// structure is queryid!(clienthandle;servertype!(handle;results))
115-
// structure is queryid!(clienthandle;(servertype or serverIDs)!(serverID;results))
116-
results:(enlist 0Nj)!enlist(0Ni;(enlist `)!enlist(0Ni;.gw.placehold))
108+
// structure is queryid!(clienthandle;(servertype or serverIDs)!(serverID;results;resultbool))
109+
results:(enlist 0Nj)!enlist(0Ni;(enlist `)!enlist(0Ni;(::);0b))
117110

118111
// server handles - whether the server is currently running a query
119112
servers:([serverid:`u#`int$()]handle:`int$(); servertype:`symbol$(); inuse:`boolean$();active:`boolean$();querycount:`int$();lastquery:`timestamp$();usage:`timespan$();attributes:();disconnecttime:`timestamp$())
@@ -194,14 +187,17 @@ finishquery:{[qid;err;serverh]
194187
getqueue:{select queryid,time,clienth,query,servertype,status:?[null submittime;`pending;`running],submittime from .gw.queryqueue where null returntime}
195188

196189
// manage the result set dictionaries
197-
addemptyresult:{[queryid; clienth; servertypes] results[queryid]:(clienth;servertypes!(count servertypes,:())#enlist(0Ni;.gw.placehold))}
190+
addemptyresult:{[queryid; clienth; servertypes] results[queryid]:(clienth;servertypes!(count servertypes,:())#enlist(0Ni;(::);0b))}
198191
addservertoquery:{[queryid;servertype;serverh] .[`.gw.results;(queryid;1);{.[x;(y 0;0);:;y 1]};(servertype;serverh)]}
199192
deleteresult:{[queryid] .gw.results : (queryid,()) _ .gw.results}
200193

201194
// add a result coming back from a server
202195
addserverresult:{[queryid;results]
203196
serverid:first exec serverid from .gw.servers where active, handle=.z.w;
204-
if[queryid in key .gw.results; .[`.gw.results;(queryid;1;.gw.results[queryid;1;;0]?serverid;1);:;results]];
197+
if[queryid in key .gw.results;
198+
.[`.gw.results;(queryid;1;.gw.results[queryid;1;;0]?serverid;1);:;results];
199+
.[`.gw.results;(queryid;1;.gw.results[queryid;1;;0]?serverid;2);:;1b]
200+
];
205201
setserverstate[.z.w;0b];
206202
runnextquery[];
207203
checkresults[queryid]}
@@ -216,12 +212,11 @@ addservererror:{[queryid;error]
216212
}
217213
// check if all results are in. If so, send the results to the client
218214
checkresults:{[queryid]
219-
if[not any .gw.placehold~/: value (r:.gw.results[queryid])[1;;1];
215+
if[all value (r:.gw.results[queryid])[1;;2];
220216
// get the rest of the detail from the query table
221217
querydetails:queryqueue[queryid];
222218
// apply the join function to the results
223-
// If there only is one result, then just return it - ignore the join function
224-
res:@[{(0b;$[1<count y;$[10h=type x;value(x;y); x @ y];first y])}[querydetails[`join]];value r[1;;1];{(1b;.gw.errorprefix,"failed to apply join function to result sets: ",x)}];
219+
res:@[{(0b;$[10h=type x;value(x;y); x @ y])}[querydetails[`join]];value r[1;;1];{(1b;.gw.errorprefix,"failed to apply join function to result sets: ",x)}];
225220
// send the results back to the client.
226221
sendclientreply[queryid;last res;not res 0];
227222
// finish the query
@@ -262,7 +257,7 @@ removeserverhandle:{[serverh]
262257
// get the list of effected query ids
263258

264259
// 1) queries sent to this server but no reply back yet
265-
qids:where {[res;id] any .gw.placehold~/:res[1;where id=res[1;;0];1]}[;serverid] each results;
260+
qids:where {[res;id] any not res[1;where id=res[1;;0];2]}[;serverid] each results;
266261
// propagate an error back to each client
267262
sendclientreply[;.gw.errorprefix,"backend ",string[servertype]," server handling query closed the connection";0b] each qids;
268263
finishquery[qids;1b;serverh];
@@ -272,7 +267,7 @@ removeserverhandle:{[serverh]
272267
activeServerTypes:distinct exec servertype from .gw.servers where active, handle<>serverh;
273268

274269
qids2:where {[res;id;aIDs;aTypes]
275-
s:where .gw.placehold~/:res[1;;1];
270+
s:where not res[1;;2];
276271
$[11h=type s; not all s in aTypes; not all any each s in\: aIDs]
277272
}[;serverid;activeServerIDs;activeServerTypes] each results _ 0Ni;
278273
sendclientreply[;.gw.errorprefix,"backend ",string[servertype]," server for running query closed the connection";0b] each qids2;
@@ -507,6 +502,8 @@ pgs:{.gw.call,:enlist[x]!enlist y};
507502
.z.po:{x@y;.gw.po[y]}@[value;`.z.po;{{[x]}}];
508503
.z.pg:{.gw.pgs[.z.w;1b];x@y}@[value;`.z.pg;{{[x]}}];
509504
.z.ps:{.gw.pgs[.z.w;0b];x@y}@[value;`.z.ps;{{[x]}}];
505+
// only wrap .z.ws if it is already defined
506+
if[@[{value x;1b};`.z.ws;{0b}];.z.ws:{.gw.pgs[.z.w;0b];x@y}.z.ws];
510507

511508
// START UP
512509
// initialise connections

0 commit comments

Comments
 (0)