forked from hull-ships/hull-sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostgres.js
More file actions
150 lines (131 loc) · 2.98 KB
/
postgres.js
File metadata and controls
150 lines (131 loc) · 2.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* Module dependencies.
*/
import Pg from "pg";
import QueryStream from "pg-query-stream";
import Promise from "bluebird";
import SequelizeUtils from "sequelize/lib/utils";
import moment from "moment";
/**
* PostgreSQL adapter.
*/
/**
* Open a new connection.
*
* Params:
* @connection_string String*
*
* Return:
* @client Instance
*/
export function openConnection(connection_string) {
const client = new Pg.Client(connection_string);
return client;
}
/**
* Close the connection.
*
* Params:
* @client Instance
*/
export function closeConnection(client) {
client.end();
}
/**
* Wrap the user query
* inside a PostgreSQL request.
*
* Params:
* @query String*
* @last_updated_at String
*
* Return:
* @wrappedQuery String
*/
export function wrapQuery(sql, last_updated_at) {
const replacements = { last_updated_at: (last_updated_at || moment(0).toISOString()) };
const query = SequelizeUtils.formatNamedParameters(sql, replacements, "postgres");
return `WITH __qry__ AS (${query}) SELECT * FROM __qry__`;
}
function cancelQuery(client) {
const { processID } = client;
const c2 = new Pg.Client(client.connectionParameters);
c2.connect((cErr) => {
if (cErr) return false;
c2.query(`SELECT pg_cancel_backend(${processID})`, () => {
c2.end();
});
return c2;
});
}
/**
* Run a wrapped query.
*
* Params:
* @client Instance*
* @wrappedQuery String*
* @callback Function*
*
* Return:
* @callback Function
* - @error Object
* - @rows Array
*/
export function runQuery(client, query, options = {}) {
return new Promise((resolve, reject) => {
// Limit the result.
query = `${query} LIMIT 100`;
let timer;
let currentQuery;
if (options.timeout) {
timer = setTimeout(() => {
reject(new Error("Timeout error"));
cancelQuery(client);
}, options.timeout);
}
// Connect the connection.
client.connect((connectionError) => {
if (connectionError) {
connectionError.status = 401;
return reject(connectionError);
}
// Run the query.
currentQuery = client.query(query, (queryError, result) => {
if (timer) clearTimeout(timer);
client.end();
if (queryError) {
queryError.status = 400;
return reject(queryError);
}
return resolve(result);
});
return currentQuery;
});
});
}
/**
* Stream a wrapped query.
*
* Params:
* @client Instance*
* @wrappedQuery String*
* @callback Function*
*
* Return:
* @callback Function
* - @error Object
* - @stream Stream
*/
export function streamQuery(client, query) {
return new Promise((resolve, reject) => {
// After connecting the connection, stream the query.
client.connect((connectionError) => {
if (connectionError) {
return reject(connectionError);
}
const stream = client.query(new QueryStream(query));
resolve(stream);
return stream;
});
});
}