55const _ = require ( "lodash" ) ;
66
77// Map each record of the stream.
8- const transform = require ( "./utils/transform" ) ;
98const map = require ( "through2-map" ) ;
109
1110/**
@@ -26,7 +25,6 @@ const awsS3 = require("s3-upload-stream")(awsAccount);
2625
2726import * as Adapters from "./adapters" ;
2827
29-
3028/**
3129 * Export the sync agent for the SQL ship.
3230 */
@@ -98,14 +96,13 @@ export default class SyncAgent {
9896 }
9997
10098 streamQuery ( query , options = { } ) {
101- const { last_sync_at } = options ;
99+ const { last_updated_at } = options ;
102100
103101 // Wrap the query.
104- const wrappedQuery = this . adapter . wrapQuery ( query , last_sync_at ) ;
105-
102+ const wrappedQuery = this . adapter . wrapQuery ( query , last_updated_at ) ;
106103 // Run the method for the specific adapter.
107104 return this . adapter . streamQuery ( this . client , wrappedQuery ) . then ( stream => {
108- stream . on ( "error" , err => this . hull . logger . error ( "Query error" , { message : err . toString ( ) } ) ) ;
105+ stream . on ( "error" , err => this . hull . logger . error ( "sync. error" , { message : err . toString ( ) } ) ) ;
109106 return stream ;
110107 } ) ;
111108 }
@@ -127,17 +124,43 @@ export default class SyncAgent {
127124 startSync ( stream , started_sync_at ) {
128125 this . hull . logger . info ( "sync.start" ) ;
129126 let processed = 0 ;
130- const progress = map ( { objectMode : true } , ( user ) => {
127+ let last_updated_at ;
128+
129+
130+ const transform = map ( { objectMode : true } , ( record ) => {
131+ const user = { } ;
131132 processed += 1 ;
132- if ( processed % 100 === 0 ) this . hull . logger . info ( "sync.progress" , { processed, elapsed : new Date ( ) - started_sync_at } ) ;
133+
134+ if ( processed % 1000 === 0 ) {
135+ this . hull . logger . info ( "sync.progress" , { processed, elapsed : new Date ( ) - started_sync_at } ) ;
136+ }
137+
138+ // Add the user id if exists.
139+ if ( record . external_id ) {
140+ user . userId = record . external_id . toString ( ) ;
141+ }
142+
143+ // console.warn("Hello record", { record });
144+
145+ if ( record . updated_at ) {
146+ last_updated_at = last_updated_at || record . updated_at ;
147+ if ( record . updated_at > last_updated_at ) {
148+ last_updated_at = record . updated_at ;
149+ }
150+ }
151+
152+ // Register eveything else inside the "traits" object.
153+ user . traits = _ . omit ( record , "external_id" , "updated_at" ) ;
154+
133155 return `${ JSON . stringify ( user ) } \n` ;
134156 } ) ;
135157
136- return this . uploadStream ( stream . pipe ( transform ( ) ) . pipe ( progress ) , started_sync_at )
158+ return this . uploadStream ( stream . pipe ( transform ) , started_sync_at )
137159 . then ( url => this . startImportJob ( url ) )
138160 . then ( job => {
139161 return this . updateShipSettings ( {
140162 last_sync_at : started_sync_at ,
163+ last_updated_at : last_updated_at || started_sync_at ,
141164 last_job_id : job . id
142165 } ) ;
143166 } )
0 commit comments