1- import { Connection , ConnectionSyncStatus , PrismaClient , Prisma } from "@sourcebot/db" ;
1+ import { Connection , ConnectionSyncStatus , PrismaClient , Prisma , Repo } from "@sourcebot/db" ;
22import { Job , Queue , Worker } from 'bullmq' ;
33import { Settings , WithRequired } from "./types.js" ;
44import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type" ;
55import { createLogger } from "./logger.js" ;
66import os from 'os' ;
77import { Redis } from 'ioredis' ;
88import { RepoData , compileGithubConfig , compileGitlabConfig , compileGiteaConfig , compileGerritConfig } from "./repoCompileUtils.js" ;
9- import { CONFIG_REPO_UPSERT_TIMEOUT_MS } from "./environment.js" ;
109
1110interface IConnectionManager {
1211 scheduleConnectionSync : ( connection : Connection ) => Promise < void > ;
@@ -23,15 +22,18 @@ type JobPayload = {
2322} ;
2423
2524export class ConnectionManager implements IConnectionManager {
26- private queue = new Queue < JobPayload > ( QUEUE_NAME ) ;
2725 private worker : Worker ;
26+ private queue : Queue < JobPayload > ;
2827 private logger = createLogger ( 'ConnectionManager' ) ;
2928
3029 constructor (
3130 private db : PrismaClient ,
3231 private settings : Settings ,
3332 redis : Redis ,
3433 ) {
34+ this . queue = new Queue < JobPayload > ( QUEUE_NAME , {
35+ connection : redis ,
36+ } ) ;
3537 const numCores = os . cpus ( ) . length ;
3638 this . worker = new Worker ( QUEUE_NAME , this . runSyncJob . bind ( this ) , {
3739 connection : redis ,
@@ -113,6 +115,7 @@ export class ConnectionManager implements IConnectionManager {
113115 // appear in the repoData array above, and so the RepoToConnection record won't be re-created.
114116 // Repos that have no RepoToConnection records are considered orphaned and can be deleted.
115117 await this . db . $transaction ( async ( tx ) => {
118+ const deleteStart = performance . now ( ) ;
116119 await tx . connection . update ( {
117120 where : {
118121 id : job . data . connectionId ,
@@ -123,21 +126,124 @@ export class ConnectionManager implements IConnectionManager {
123126 }
124127 }
125128 } ) ;
129+ const deleteDuration = performance . now ( ) - deleteStart ;
130+ this . logger . info ( `Deleted all RepoToConnection records for connection ${ job . data . connectionId } in ${ deleteDuration } ms` ) ;
126131
127- await Promise . all ( repoData . map ( ( repo ) => {
128- return tx . repo . upsert ( {
129- where : {
130- external_id_external_codeHostUrl : {
131- external_id : repo . external_id ,
132- external_codeHostUrl : repo . external_codeHostUrl ,
133- } ,
132+ const existingRepos : Repo [ ] = await tx . repo . findMany ( {
133+ where : {
134+ external_id : {
135+ in : repoData . map ( repo => repo . external_id ) ,
134136 } ,
135- create : repo ,
136- update : repo as Prisma . RepoUpdateInput ,
137+ external_codeHostUrl : {
138+ in : repoData . map ( repo => repo . external_codeHostUrl ) ,
139+ } ,
140+ } ,
141+ } ) ;
142+ const existingRepoKeys = existingRepos . map ( repo => `${ repo . external_id } -${ repo . external_codeHostUrl } ` ) ;
143+
144+ const existingRepoData = repoData . filter ( repo => existingRepoKeys . includes ( `${ repo . external_id } -${ repo . external_codeHostUrl } ` ) ) ;
145+ const [ toCreate , toUpdate ] = repoData . reduce < [ Prisma . RepoCreateManyInput [ ] , Prisma . RepoUpdateManyMutationInput [ ] ] > ( ( [ toCreate , toUpdate ] , repo ) => {
146+ const existingRepo = existingRepoData . find ( ( r : RepoData ) => r . external_id === repo . external_id && r . external_codeHostUrl === repo . external_codeHostUrl ) ;
147+ if ( existingRepo ) {
148+ // @note : make sure to reflect any changes here in the raw sql update below
149+ const updateRepo : Prisma . RepoUpdateManyMutationInput = {
150+ name : repo . name ,
151+ cloneUrl : repo . cloneUrl ,
152+ imageUrl : repo . imageUrl ,
153+ isFork : repo . isFork ,
154+ isArchived : repo . isArchived ,
155+ metadata : repo . metadata ,
156+ external_id : repo . external_id ,
157+ external_codeHostType : repo . external_codeHostType ,
158+ external_codeHostUrl : repo . external_codeHostUrl ,
159+ }
160+ toUpdate . push ( updateRepo ) ;
161+ } else {
162+ const createRepo : Prisma . RepoCreateManyInput = {
163+ name : repo . name ,
164+ cloneUrl : repo . cloneUrl ,
165+ imageUrl : repo . imageUrl ,
166+ isFork : repo . isFork ,
167+ isArchived : repo . isArchived ,
168+ metadata : repo . metadata ,
169+ orgId : job . data . orgId ,
170+ external_id : repo . external_id ,
171+ external_codeHostType : repo . external_codeHostType ,
172+ external_codeHostUrl : repo . external_codeHostUrl ,
173+ }
174+ toCreate . push ( createRepo ) ;
175+ }
176+ return [ toCreate , toUpdate ] ;
177+ } , [ [ ] , [ ] ] ) ;
178+
179+ if ( toCreate . length > 0 ) {
180+ const createStart = performance . now ( ) ;
181+ const createdRepos = await tx . repo . createManyAndReturn ( {
182+ data : toCreate ,
137183 } ) ;
138- } ) ) ;
139184
140- } , { timeout : parseInt ( CONFIG_REPO_UPSERT_TIMEOUT_MS ) } ) ;
185+ await tx . repoToConnection . createMany ( {
186+ data : createdRepos . map ( repo => ( {
187+ repoId : repo . id ,
188+ connectionId : job . data . connectionId ,
189+ } ) ) ,
190+ } ) ;
191+
192+ const createDuration = performance . now ( ) - createStart ;
193+ this . logger . info ( `Created ${ toCreate . length } repos in ${ createDuration } ms` ) ;
194+ }
195+
196+ if ( toUpdate . length > 0 ) {
197+ const updateStart = performance . now ( ) ;
198+
199+ // Build values string for update query
200+ const updateValues = toUpdate . map ( repo => `(
201+ '${ repo . name } ',
202+ '${ repo . cloneUrl } ',
203+ ${ repo . imageUrl ? `'${ repo . imageUrl } '` : 'NULL' } ,
204+ ${ repo . isFork } ,
205+ ${ repo . isArchived } ,
206+ '${ JSON . stringify ( repo . metadata ) } '::jsonb,
207+ '${ repo . external_id } ',
208+ '${ repo . external_codeHostType } ',
209+ '${ repo . external_codeHostUrl } '
210+ )` ) . join ( ',' ) ;
211+
212+ // Update repos and get their IDs in one quercy
213+ const updateSql = `
214+ WITH updated AS (
215+ UPDATE "Repo" r
216+ SET
217+ name = v.name,
218+ "cloneUrl" = v.clone_url,
219+ "imageUrl" = v.image_url,
220+ "isFork" = v.is_fork,
221+ "isArchived" = v.is_archived,
222+ metadata = v.metadata,
223+ "updatedAt" = NOW()
224+ FROM (
225+ VALUES ${ updateValues }
226+ ) AS v(name, clone_url, image_url, is_fork, is_archived, metadata, external_id, external_code_host_type, external_code_host_url)
227+ WHERE r.external_id = v.external_id
228+ AND r."external_codeHostUrl" = v.external_code_host_url
229+ RETURNING r.id
230+ )
231+ SELECT id FROM updated
232+ ` ;
233+ const updatedRepoIds = await tx . $queryRawUnsafe < { id : number } [ ] > ( updateSql ) ;
234+
235+ // Insert repo-connection mappings
236+ const createConnectionSql = `
237+ INSERT INTO "RepoToConnection" ("repoId", "connectionId", "addedAt")
238+ SELECT id, ${ job . data . connectionId } , NOW()
239+ FROM unnest(ARRAY[${ updatedRepoIds . map ( r => r . id ) . join ( ',' ) } ]) AS id
240+ ` ;
241+ await tx . $executeRawUnsafe ( createConnectionSql ) ;
242+
243+ const updateDuration = performance . now ( ) - updateStart ;
244+ this . logger . info ( `Updated ${ toUpdate . length } repos in ${ updateDuration } ms` ) ;
245+ }
246+ } ) ;
141247 }
142248
143249
0 commit comments