@@ -3,13 +3,20 @@ const util = require('util');
33const path = require ( 'path' ) ;
44const fs = require ( 'fs' ) ;
55const mkdirp = require ( 'mkdirp' ) ;
6+ const { promisify } = require ( 'util' ) ;
7+ const LRU = require ( 'lru-cache' ) ;
8+ const redis = require ( 'redis' ) ;
69const { Op } = require ( 'sequelize' ) ;
710const ensureJson = require ( './ensure-json' ) ;
811const writeFile = util . promisify ( fs . writeFile ) ;
912const readFile = util . promisify ( fs . readFile ) ;
1013const unlink = util . promisify ( fs . unlink ) ;
1114const access = util . promisify ( fs . access ) ;
1215
16+ function redisDbKey ( id ) {
17+ return `statement-result/${ id } ` ;
18+ }
19+
1320class Statements {
1421 /**
1522 * @param {import('../sequelize-db') } sequelizeDb
@@ -18,6 +25,35 @@ class Statements {
1825 constructor ( sequelizeDb , config ) {
1926 this . sequelizeDb = sequelizeDb ;
2027 this . config = config ;
28+ this . queryResultStore = config . get ( 'queryResultStore' ) ;
29+
30+ if ( this . queryResultStore === 'redis' ) {
31+ const client = redis . createClient ( config . get ( 'redisUri' ) ) ;
32+ this . redisClient = client ;
33+ this . redisGetAsync = promisify ( client . get ) . bind ( client ) ;
34+ this . redisSetexAsync = promisify ( client . setex ) . bind ( client ) ;
35+ this . redisDelAsync = promisify ( client . del ) . bind ( client ) ;
36+ }
37+
38+ if ( this . queryResultStore === 'memory' ) {
39+ this . memoryCache = new LRU ( { max : 1000 , maxAge : 1000 * 60 * 60 } ) ;
40+ }
41+ }
42+
43+ isFileStore ( ) {
44+ return this . queryResultStore === 'file' ;
45+ }
46+
47+ isDatabaseStore ( ) {
48+ return this . queryResultStore === 'database' ;
49+ }
50+
51+ isRedisStore ( ) {
52+ return this . queryResultStore === 'redis' ;
53+ }
54+
55+ isMemoryStore ( ) {
56+ return this . queryResultStore === 'memory' ;
2157 }
2258
2359 async findOneById ( id ) {
@@ -46,11 +82,15 @@ class Statements {
4682 return items ;
4783 }
4884
85+ /**
86+ * Remove statement by id
87+ * @param {string } id - statement id
88+ */
4989 async removeById ( id ) {
5090 const statement = await this . findOneById ( id ) ;
5191 const { resultsPath } = statement ;
5292
53- if ( resultsPath ) {
93+ if ( this . isFileStore ( ) && resultsPath ) {
5494 const dbPath = this . config . get ( 'dbPath' ) ;
5595 const fullPath = path . join ( dbPath , resultsPath ) ;
5696
@@ -66,6 +106,18 @@ class Statements {
66106 }
67107 }
68108
109+ if ( this . isRedisStore ( ) ) {
110+ await this . redisDelAsync ( redisDbKey ( id ) ) ;
111+ }
112+
113+ if ( this . isMemoryStore ( ) ) {
114+ this . memoryCache . del ( id ) ;
115+ }
116+
117+ if ( this . isDatabaseStore ( ) ) {
118+ await this . sequelizeDb . Cache . destroy ( { where : { id : redisDbKey ( id ) } } ) ;
119+ }
120+
69121 return this . sequelizeDb . Statements . destroy ( { where : { id } } ) ;
70122 }
71123
@@ -90,21 +142,57 @@ class Statements {
90142 }
91143
92144 async updateFinished ( id , queryResult , stopTime , durationMs ) {
93- const dbPath = this . config . get ( 'dbPath' ) ;
145+ const { config } = this ;
146+ const dbPath = config . get ( 'dbPath' ) ;
94147 const rowCount = queryResult . rows . length ;
95148
96149 let resultsPath ;
97150
98151 // If rows returned write results csv
99152 if ( rowCount > 0 ) {
100- const dir = id . slice ( 0 , 3 ) ;
101- await mkdirp ( path . join ( dbPath , 'results' , dir ) ) ;
102- resultsPath = path . join ( 'results' , dir , `${ id } .json` ) ;
103- const fullPath = path . join ( dbPath , resultsPath ) ;
104153 const arrOfArr = queryResult . rows . map ( ( row ) => {
105154 return queryResult . columns . map ( ( col ) => row [ col . name ] ) ;
106155 } ) ;
107- await writeFile ( fullPath , JSON . stringify ( arrOfArr ) ) ;
156+
157+ if ( this . isFileStore ( ) ) {
158+ const dir = id . slice ( 0 , 3 ) ;
159+ await mkdirp ( path . join ( dbPath , 'results' , dir ) ) ;
160+ resultsPath = path . join ( 'results' , dir , `${ id } .json` ) ;
161+ const fullPath = path . join ( dbPath , resultsPath ) ;
162+ await writeFile ( fullPath , JSON . stringify ( arrOfArr ) ) ;
163+ }
164+
165+ if ( this . isRedisStore ( ) ) {
166+ // Redis results can be removed by redis itself
167+ // In the event seconds does not exist or is zero, default to 1 hour
168+ let seconds =
169+ parseInt ( config . get ( 'queryHistoryRetentionTimeInDays' ) , 10 ) * 86400 ;
170+ if ( ! seconds || seconds <= 0 ) {
171+ seconds = 60 * 60 ;
172+ }
173+ await this . redisSetexAsync (
174+ redisDbKey ( id ) ,
175+ seconds ,
176+ JSON . stringify ( arrOfArr )
177+ ) ;
178+ }
179+
180+ if ( this . isDatabaseStore ( ) ) {
181+ const ONE_DAY = 1000 * 60 * 60 * 24 ;
182+ const daysMs =
183+ parseInt ( config . get ( 'queryHistoryRetentionTimeInDays' ) , 10 ) * ONE_DAY ;
184+ const expiryDate = new Date ( Date . now ( ) + daysMs ) ;
185+ await this . sequelizeDb . Cache . create ( {
186+ id : redisDbKey ( id ) ,
187+ data : arrOfArr ,
188+ expiryDate,
189+ name : 'statement results' ,
190+ } ) ;
191+ }
192+
193+ if ( this . isMemoryStore ( ) ) {
194+ this . memoryCache . set ( id , arrOfArr ) ;
195+ }
108196 }
109197
110198 const update = {
@@ -125,29 +213,66 @@ class Statements {
125213 if ( ! statement ) {
126214 throw new Error ( 'Statement not found' ) ;
127215 }
128- const { resultsPath } = statement ;
129216
130- // If no result path the query had no rows.
131- // Return empty array
132- if ( ! resultsPath ) {
217+ const { config } = this ;
218+
219+ if ( this . isFileStore ( ) ) {
220+ const { resultsPath } = statement ;
221+
222+ // If no result path the query had no rows.
223+ // Return empty array
224+ if ( ! resultsPath ) {
225+ return [ ] ;
226+ }
227+
228+ const fullPath = path . join ( config . get ( 'dbPath' ) , resultsPath ) ;
229+
230+ let exists = true ;
231+ try {
232+ await access ( fullPath ) ;
233+ } catch ( error ) {
234+ exists = false ;
235+ }
236+
237+ if ( exists ) {
238+ const fileData = await readFile ( fullPath , 'utf8' ) ;
239+ return JSON . parse ( fileData ) ;
240+ }
241+
133242 return [ ] ;
134243 }
135244
136- const fullPath = path . join ( this . config . get ( 'dbPath' ) , resultsPath ) ;
137-
138- let exists = true ;
139- try {
140- await access ( fullPath ) ;
141- } catch ( error ) {
142- exists = false ;
245+ if ( this . isRedisStore ( ) ) {
246+ const json = await this . redisGetAsync ( redisDbKey ( statement . id ) ) ;
247+ if ( json ) {
248+ const parsed = JSON . parse ( json ) ;
249+ if ( Array . isArray ( parsed ) ) {
250+ return parsed ;
251+ }
252+ }
253+ return [ ] ;
143254 }
144255
145- if ( exists ) {
146- const fileData = await readFile ( fullPath , 'utf8' ) ;
147- return JSON . parse ( fileData ) ;
256+ if ( this . isDatabaseStore ( ) ) {
257+ const doc = await this . sequelizeDb . Cache . findOne ( {
258+ where : { id : redisDbKey ( statement . id ) } ,
259+ } ) ;
260+ if ( doc ) {
261+ const result = ensureJson ( doc . data ) ;
262+ if ( Array . isArray ( result ) ) {
263+ return result ;
264+ }
265+ }
266+ return [ ] ;
148267 }
149268
150- return [ ] ;
269+ if ( this . isMemoryStore ( ) ) {
270+ const result = this . memoryCache . get ( statement . id ) ;
271+ if ( Array . isArray ( result ) ) {
272+ return result ;
273+ }
274+ return [ ] ;
275+ }
151276 }
152277
153278 async removeOldEntries ( ) {
0 commit comments