forked from sourcegraph/sourcegraph-public-snapshot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcleanup.go
More file actions
275 lines (243 loc) · 7.19 KB
/
cleanup.go
File metadata and controls
275 lines (243 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package codegraph
import (
"context"
"time"
"github.com/keegancsmith/sqlf"
"github.com/lib/pq"
"go.opentelemetry.io/otel/attribute"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/observation"
)
func (s *store) IDsWithMeta(ctx context.Context, ids []int) (_ []int, err error) {
ctx, _, endObservation := s.operations.idsWithMeta.With(ctx, &err, observation.Args{Attrs: []attribute.KeyValue{
attribute.Int("numIDs", len(ids)),
attribute.IntSlice("ids", ids),
}})
defer endObservation(1, observation.Args{})
return basestore.ScanInts(s.db.Query(ctx, sqlf.Sprintf(
idsWithMetaQuery,
pq.Array(ids),
)))
}
const idsWithMetaQuery = `
SELECT m.upload_id
FROM codeintel_scip_metadata m
WHERE m.upload_id = ANY(%s)
`
func (s *store) ReconcileCandidates(ctx context.Context, batchSize int) (_ []int, err error) {
ctx, _, endObservation := s.operations.reconcileCandidates.With(ctx, &err, observation.Args{Attrs: []attribute.KeyValue{
attribute.Int("batchSize", batchSize),
}})
defer endObservation(1, observation.Args{})
return s.ReconcileCandidatesWithTime(ctx, batchSize, time.Now().UTC())
}
func (s *store) ReconcileCandidatesWithTime(ctx context.Context, batchSize int, now time.Time) (_ []int, err error) {
return basestore.ScanInts(s.db.Query(ctx, sqlf.Sprintf(reconcileQuery, batchSize, batchSize, now, now)))
}
const reconcileQuery = `
WITH
unscanned_candidates AS (
SELECT m.upload_id
FROM codeintel_scip_metadata m
WHERE NOT EXISTS (SELECT 1 FROM codeintel_last_reconcile lr WHERE lr.dump_id = m.upload_id)
ORDER BY m.upload_id
),
scanned_candidates AS (
SELECT lr.dump_id AS upload_id
FROM codeintel_last_reconcile lr
ORDER BY lr.last_reconcile_at, lr.dump_id
),
ordered_candidates AS (
(
SELECT upload_id FROM unscanned_candidates
LIMIT %s
) UNION ALL (
SELECT upload_id FROM scanned_candidates
)
LIMIT %s
)
INSERT INTO codeintel_last_reconcile
SELECT upload_id, %s FROM ordered_candidates
ON CONFLICT (dump_id) DO UPDATE
SET last_reconcile_at = %s
RETURNING dump_id
`
func (s *store) DeleteLsifDataByUploadIds(ctx context.Context, bundleIDs ...int) (err error) {
ctx, _, endObservation := s.operations.deleteLsifDataByUploadIds.With(ctx, &err, observation.Args{Attrs: []attribute.KeyValue{
attribute.Int("numBundleIDs", len(bundleIDs)),
attribute.IntSlice("bundleIDs", bundleIDs),
}})
defer endObservation(1, observation.Args{})
if len(bundleIDs) == 0 {
return nil
}
return s.withTransaction(ctx, func(tx *store) error {
if err := tx.db.Exec(ctx, sqlf.Sprintf(deleteSCIPDocumentLookupQuery, pq.Array(bundleIDs))); err != nil {
return err
}
if err := tx.db.Exec(ctx, sqlf.Sprintf(deleteSCIPMetadataQuery, pq.Array(bundleIDs))); err != nil {
return err
}
if err := tx.db.Exec(ctx, sqlf.Sprintf(deleteSCIPSymbolNamesQuery, pq.Array(bundleIDs), pq.Array(bundleIDs))); err != nil {
return err
}
if err := tx.db.Exec(ctx, sqlf.Sprintf(deleteSCIPDocumentLookupSchemaVersionsQuery, pq.Array(bundleIDs))); err != nil {
return err
}
if err := tx.db.Exec(ctx, sqlf.Sprintf(deleteSCIPSymbolsSchemaVersionsQuery, pq.Array(bundleIDs))); err != nil {
return err
}
if err := s.db.Exec(ctx, sqlf.Sprintf(deleteLastReconcileQuery, pq.Array(bundleIDs))); err != nil {
return err
}
return nil
})
}
const deleteSCIPMetadataQuery = `
WITH
locked_metadata AS (
SELECT id
FROM codeintel_scip_metadata
WHERE upload_id = ANY(%s)
ORDER BY id
FOR UPDATE
)
DELETE FROM codeintel_scip_metadata
WHERE id IN (SELECT id FROM locked_metadata)
`
const deleteSCIPDocumentLookupQuery = `
WITH
locked_document_lookup AS (
SELECT id
FROM codeintel_scip_document_lookup
WHERE upload_id = ANY(%s)
ORDER BY id
FOR UPDATE
)
DELETE FROM codeintel_scip_document_lookup
WHERE id IN (SELECT id FROM locked_document_lookup)
`
const deleteSCIPSymbolNamesQuery = `
WITH
locked_symbol_names AS (
SELECT id
FROM codeintel_scip_symbol_names
WHERE upload_id = ANY(%s)
ORDER BY id
FOR UPDATE
)
DELETE FROM codeintel_scip_symbol_names
WHERE upload_id = ANY(%s) AND id IN (SELECT id FROM locked_symbol_names)
`
const deleteSCIPDocumentLookupSchemaVersionsQuery = `
DELETE FROM codeintel_scip_document_lookup_schema_versions WHERE upload_id = ANY(%s)
`
const deleteSCIPSymbolsSchemaVersionsQuery = `
DELETE FROM codeintel_scip_symbols_schema_versions WHERE upload_id = ANY(%s)
`
const deleteLastReconcileQuery = `
WITH locked_rows AS (
SELECT dump_id
FROM codeintel_last_reconcile
WHERE dump_id = ANY(%s)
ORDER BY dump_id
FOR UPDATE
)
DELETE FROM codeintel_last_reconcile
WHERE dump_id IN (SELECT dump_id FROM locked_rows)
`
func (s *store) DeleteAbandonedSchemaVersionsRecords(ctx context.Context) (_ int, err error) {
tx, err := s.db.Transact(ctx)
if err != nil {
return 0, err
}
defer func() { err = tx.Done(err) }()
count1, _, err := basestore.ScanFirstInt(tx.Query(ctx, sqlf.Sprintf(deleteAbandonedSymbolsSchemaVersionsQuery)))
if err != nil {
return 0, err
}
count2, _, err := basestore.ScanFirstInt(tx.Query(ctx, sqlf.Sprintf(deleteAbandonedDocumentLookupSchemaVersionsQuery)))
if err != nil {
return 0, err
}
return count1 + count2, nil
}
const deleteAbandonedSymbolsSchemaVersionsQuery = `
WITH del AS (
DELETE FROM codeintel_scip_symbols_schema_versions sv
WHERE NOT EXISTS (
SELECT 1
FROM codeintel_scip_metadata m
WHERE m.upload_id = sv.upload_id
)
RETURNING 1
)
SELECT COUNT(*) FROM del
`
const deleteAbandonedDocumentLookupSchemaVersionsQuery = `
WITH del AS (
DELETE FROM codeintel_scip_document_lookup_schema_versions sv
WHERE NOT EXISTS (
SELECT 1
FROM codeintel_scip_metadata m
WHERE m.upload_id = sv.upload_id
)
RETURNING 1
)
SELECT COUNT(*) FROM del
`
func (s *store) DeleteUnreferencedDocuments(ctx context.Context, batchSize int, maxAge time.Duration, now time.Time) (_, _ int, err error) {
ctx, _, endObservation := s.operations.idsWithMeta.With(ctx, &err, observation.Args{Attrs: []attribute.KeyValue{
attribute.Stringer("maxAge", maxAge),
}})
defer endObservation(1, observation.Args{})
rows, err := s.db.Query(ctx, sqlf.Sprintf(
deleteUnreferencedDocumentsQuery,
now,
maxAge/time.Second,
batchSize,
))
if err != nil {
return 0, 0, err
}
defer func() { err = basestore.CloseRows(rows, err) }()
var c1, c2 int
for rows.Next() {
if err := rows.Scan(&c1, &c2); err != nil {
return 0, 0, err
}
}
return c1, c2, nil
}
const deleteUnreferencedDocumentsQuery = `
WITH
candidates AS (
SELECT id, document_id
FROM codeintel_scip_documents_dereference_logs log
WHERE %s - log.last_removal_time > (%s * interval '1 second')
ORDER BY last_removal_time DESC, document_id
LIMIT %s
FOR UPDATE SKIP LOCKED
),
locked_documents AS (
SELECT sd.id
FROM candidates d
JOIN codeintel_scip_documents sd ON sd.id = d.document_id
WHERE NOT EXISTS (SELECT 1 FROM codeintel_scip_document_lookup sdl WHERE sdl.document_id = sd.id)
ORDER BY sd.id
FOR UPDATE OF sd
),
deleted_documents AS (
DELETE FROM codeintel_scip_documents
WHERE id IN (SELECT id FROM locked_documents)
RETURNING id
),
deleted_candidates AS (
DELETE FROM codeintel_scip_documents_dereference_logs
WHERE id IN (SELECT id FROM candidates)
RETURNING id
)
SELECT
(SELECT COUNT(*) FROM candidates),
(SELECT COUNT(*) FROM deleted_documents)
`