Skip to content

Commit a199143

Browse files
committed
Use last_updated_at instead of last_sync_at for incremental sync
1 parent 22044b4 commit a199143

5 files changed

Lines changed: 1091 additions & 68 deletions

File tree

manifest.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@
4747
"type": "string",
4848
"format": "hidden"
4949
},
50+
{
51+
"name": "last_updated_at",
52+
"title": "Last updated_at",
53+
"type": "string",
54+
"format": "hidden"
55+
},
5056
{
5157
"name": "last_job_id",
5258
"title": "Last job ID",

server/adapters/postgres.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,19 @@ export function closeConnection(client) {
4141
*
4242
* Params:
4343
* @query String*
44-
* @last_sync_at String
44+
* @last_updated_at String
4545
*
4646
* Return:
4747
* @wrappedQuery String
4848
*/
4949

50-
export function wrapQuery(query, last_sync_at) {
50+
export function wrapQuery(query, last_updated_at) {
5151
// Wrap the query.
5252
const wrappedQuery = `WITH __qry__ AS (${query}) SELECT * FROM __qry__`;
5353

5454
// Add a condition if needed.
55-
if (query.match(/updated_at/) && last_sync_at) {
56-
return `${wrappedQuery} WHERE updated_at >= '${last_sync_at}'`;
55+
if (query.match(/updated_at/) && last_updated_at) {
56+
return `${wrappedQuery} WHERE updated_at > '${last_updated_at}'`;
5757
}
5858
return wrappedQuery;
5959
}

server/sync-agent.js

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
const _ = require("lodash");
66

77
// Map each record of the stream.
8-
const transform = require("./utils/transform");
98
const map = require("through2-map");
109

1110
/**
@@ -26,7 +25,6 @@ const awsS3 = require("s3-upload-stream")(awsAccount);
2625

2726
import * as Adapters from "./adapters";
2827

29-
3028
/**
3129
* Export the sync agent for the SQL ship.
3230
*/
@@ -98,14 +96,13 @@ export default class SyncAgent {
9896
}
9997

10098
streamQuery(query, options = {}) {
101-
const { last_sync_at } = options;
99+
const { last_updated_at } = options;
102100

103101
// Wrap the query.
104-
const wrappedQuery = this.adapter.wrapQuery(query, last_sync_at);
105-
102+
const wrappedQuery = this.adapter.wrapQuery(query, last_updated_at);
106103
// Run the method for the specific adapter.
107104
return this.adapter.streamQuery(this.client, wrappedQuery).then(stream => {
108-
stream.on("error", err => this.hull.logger.error("Query error", { message: err.toString() }));
105+
stream.on("error", err => this.hull.logger.error("sync.error", { message: err.toString() }));
109106
return stream;
110107
});
111108
}
@@ -127,17 +124,43 @@ export default class SyncAgent {
127124
startSync(stream, started_sync_at) {
128125
this.hull.logger.info("sync.start");
129126
let processed = 0;
130-
const progress = map({ objectMode: true }, (user) => {
127+
let last_updated_at;
128+
129+
130+
const transform = map({ objectMode: true }, (record) => {
131+
const user = {};
131132
processed += 1;
132-
if (processed % 100 === 0) this.hull.logger.info("sync.progress", { processed, elapsed: new Date() - started_sync_at });
133+
134+
if (processed % 1000 === 0) {
135+
this.hull.logger.info("sync.progress", { processed, elapsed: new Date() - started_sync_at });
136+
}
137+
138+
// Add the user id if exists.
139+
if (record.external_id) {
140+
user.userId = record.external_id.toString();
141+
}
142+
143+
// console.warn("Hello record", { record });
144+
145+
if (record.updated_at) {
146+
last_updated_at = last_updated_at || record.updated_at;
147+
if (record.updated_at > last_updated_at) {
148+
last_updated_at = record.updated_at;
149+
}
150+
}
151+
152+
// Register eveything else inside the "traits" object.
153+
user.traits = _.omit(record, "external_id", "updated_at");
154+
133155
return `${JSON.stringify(user)}\n`;
134156
});
135157

136-
return this.uploadStream(stream.pipe(transform()).pipe(progress), started_sync_at)
158+
return this.uploadStream(stream.pipe(transform), started_sync_at)
137159
.then(url => this.startImportJob(url))
138160
.then(job => {
139161
return this.updateShipSettings({
140162
last_sync_at: started_sync_at,
163+
last_updated_at: last_updated_at || started_sync_at,
141164
last_job_id: job.id
142165
});
143166
})

server/utils/transform.js

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)