-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathdelete.go
More file actions
77 lines (69 loc) · 1.45 KB
/
delete.go
File metadata and controls
77 lines (69 loc) · 1.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package workflow
import (
"context"
)
func deleteConsumer[Type any, Status StatusType](w *Workflow[Type, Status]) {
role := makeRole(
w.Name(),
"delete",
"consumer",
)
processName := makeRole("delete", "consumer")
w.run(role, processName, func(ctx context.Context) error {
topic := DeleteTopic(w.Name())
stream, err := w.eventStreamer.NewReceiver(
ctx,
topic,
role,
WithReceiverPollFrequency(w.defaultOpts.pollingFrequency),
)
if err != nil {
return err
}
defer stream.Close()
return consume(
ctx,
w.Name(),
processName,
stream,
runDelete(
w.recordStore.Store,
w.recordStore.Lookup,
w.customDelete,
),
w.clock,
0,
w.defaultOpts.lagAlert,
)
}, w.defaultOpts.errBackOff)
}
func runDelete(
store storeFunc,
lookup lookupFunc,
customDeleteFn customDelete,
) func(ctx context.Context, e *Event) error {
return func(ctx context.Context, e *Event) error {
record, err := lookup(ctx, e.ForeignID)
if err != nil {
return err
}
replacementData := []byte(`{"result":"deleted"}`)
// If a custom delete has been configured then use the custom delete
if customDeleteFn != nil {
bytes, err := customDeleteFn(record)
if err != nil {
return err
}
replacementData = bytes
}
record.Object = replacementData
record.RunState = RunStateDataDeleted
return updateRecord(
ctx,
store,
record,
RunStateRequestedDataDeleted,
record.Meta.StatusDescription,
)
}
}