Skip to content

Commit 8ae02e4

Browse files
author
Thomas Kgaevski
committed
cut SQL imports into smaller batch jobs of 10000 users
1 parent d00f4de commit 8ae02e4

7 files changed

Lines changed: 256 additions & 66 deletions

File tree

package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@
4242
"hull-ship"
4343
],
4444
"dependencies": {
45+
"JSONStream": "^1.3.1",
4546
"aws-sdk": "^2.7.13",
4647
"babel-cli": "^6.10.1",
4748
"babel-loader": "^6.2.4",
4849
"babel-preset-es2015": "^6.9.0",
4950
"babel-preset-stage-0": "^6.5.0",
5051
"babel-register": "^6.9.0",
52+
"batch-stream": "^0.1.3",
5153
"bluebird": "^3.4.1",
5254
"body-parser": "^1.15.2",
5355
"camelize": "^1.0.0",
@@ -67,6 +69,7 @@
6769
"newrelic": "^1.28.1",
6870
"pg": "^6.1.1",
6971
"pg-query-stream": "^1.0.0",
72+
"promise-streams": "^1.0.1",
7073
"raw-body": "^2.1.7",
7174
"rimraf": "^2.4.3",
7275
"s3-upload-stream": "^1.0.7",

server/adapters/filesystem.js

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/**
2+
* Module dependencies.
3+
*/
4+
5+
import Promise from "bluebird";
6+
import SequelizeUtils from "sequelize/lib/utils";
7+
import moment from "moment";
8+
import fs from "fs";
9+
import JSONStream from "JSONStream";
10+
11+
/**
12+
* File system adapter.
13+
*/
14+
15+
/**
16+
* Open a new connection.
17+
*
18+
* Params:
19+
* @connection_string String*
20+
*
21+
* Return:
22+
* @client Instance
23+
*/
24+
25+
export function openConnection(connection_string) {}
26+
27+
/**
28+
* Close the connection.
29+
*
30+
* Params:
31+
* @client Instance
32+
*/
33+
34+
export function closeConnection(client) {}
35+
36+
/**
37+
* Wrap the user query
38+
* inside a PostgreSQL request.
39+
*
40+
* Params:
41+
* @query String*
42+
* @last_updated_at String
43+
*
44+
* Return:
45+
* @wrappedQuery String
46+
*/
47+
48+
export function wrapQuery(sql, last_updated_at) {
49+
return "data.json";
50+
}
51+
52+
function cancelQuery(client) {}
53+
54+
/**
55+
* Run a wrapped query.
56+
*
57+
* Params:
58+
* @client Instance*
59+
* @wrappedQuery String*
60+
* @callback Function*
61+
*
62+
* Return:
63+
* @callback Function
64+
* - @error Object
65+
* - @rows Array
66+
*/
67+
68+
export function runQuery(client, query, options = {}) {
69+
return new Promise((resolve, reject) => {
70+
return resolve(query);
71+
});
72+
}
73+
74+
/**
75+
* Stream a wrapped query.
76+
*
77+
* Params:
78+
* @client Instance*
79+
* @wrappedQuery String*
80+
* @callback Function*
81+
*
82+
* Return:
83+
* @callback Function
84+
* - @error Object
85+
* - @stream Stream
86+
*/
87+
88+
export function streamQuery(client, query) {
89+
return new Promise((resolve, reject) => {
90+
const stream = fs.createReadStream(query).pipe(JSONStream.parse());
91+
resolve(stream);
92+
return stream;
93+
});
94+
}
95+
96+
export function upload(users, shipId, partNumber) {
97+
const data = users.map(user => JSON.stringify(user)).join("\n");
98+
return new Promise((resolve, reject) => {
99+
const filename = `extracts/${shipId}/${new Date().getTime()}-${partNumber}.json`
100+
fs.writeFile(filename, data, function(err) {
101+
if (err) {
102+
return reject(err);
103+
} else {
104+
return resolve({ url: `http://fake.url/${filename}`, partNumber, size: users.length });
105+
}
106+
});
107+
});
108+
}

server/adapters/index.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
11
import * as postgres from "./postgres";
2+
import * as filesystem from "./filesystem";
3+
import * as s3 from "./s3";
4+
25
export { postgres };
6+
export { filesystem };
7+
export { s3 };

server/adapters/s3.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Configure the streaming to AWS.
3+
*/
4+
5+
import Stream from "stream";
6+
import Aws from "aws-sdk";
7+
8+
export function upload(users, shipId, partNumber) {
9+
// Convert users array to stream
10+
const stream = new Stream.Readable();
11+
users.forEach(user => stream.push(`${JSON.stringify(user)}\n`));
12+
stream.push(null);
13+
14+
const Body = new Stream.PassThrough();
15+
const Bucket = process.env.BUCKET_PATH;
16+
const Key = `extracts/${shipId}/${new Date().getTime()}-${partNumber}.json`;
17+
const params = {
18+
Bucket, Key, Body,
19+
ACL: "private",
20+
ContentType: "application/json",
21+
};
22+
23+
return new Promise((resolve, reject) => {
24+
const s3 = new Aws.S3();
25+
26+
s3.upload(params, (err) => {
27+
if (err) {
28+
reject(err);
29+
} else {
30+
resolve({
31+
url: s3.getSignedUrl("getObject", { Bucket, Key, Expires: 86400 }),
32+
partNumber,
33+
size: users.length,
34+
});
35+
}
36+
});
37+
stream.pipe(Body);
38+
});
39+
}

server/server.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ module.exports = function server(options = {}) {
4242

4343
function checkConfiguration({ agent }, res, next) {
4444
if (!agent.isEnabled()) {
45+
console.error({ status: "ignored" });
4546
res.status(403).json({ status: "ignored" });
4647
} else if (!agent.isConfigured()) {
48+
console.error({ status: "not configured" });
4749
res.status(403).json({ status: "not configured" });
4850
} else {
4951
next();

0 commit comments

Comments
 (0)