-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathhook.go
More file actions
82 lines (73 loc) · 1.81 KB
/
hook.go
File metadata and controls
82 lines (73 loc) · 1.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package workflow
import (
"context"
"github.com/luno/workflow/internal/metrics"
)
// RunStateChangeHookFunc defines the function signature for all hooks associated to the run.
type RunStateChangeHookFunc[Type any, Status StatusType] func(ctx context.Context, record *TypedRecord[Type, Status]) error
func runStateChangeHookConsumer[Type any, Status StatusType](
w *Workflow[Type, Status],
runState RunState,
hook RunStateChangeHookFunc[Type, Status],
) {
role := makeRole(
w.Name(),
runState.String(),
"run-state-change-hook",
"consumer",
)
processName := makeRole(runState.String(), "run-state-change-hook", "consumer")
w.run(role, processName, func(ctx context.Context) error {
topic := RunStateChangeTopic(w.Name())
stream, err := w.eventStreamer.NewReceiver(
ctx,
topic,
role,
WithReceiverPollFrequency(w.defaultOpts.pollingFrequency),
)
if err != nil {
return err
}
defer stream.Close()
return consume(
ctx,
w.Name(),
processName,
stream,
runHook(
w.Name(),
processName,
w.recordStore.Lookup,
hook,
),
w.clock,
0,
w.defaultOpts.lagAlert,
filterByRunState(runState),
)
}, w.defaultOpts.errBackOff)
}
func runHook[Type any, Status StatusType](
workflowName string,
processName string,
lookup lookupFunc,
hook RunStateChangeHookFunc[Type, Status],
) func(ctx context.Context, e *Event) error {
return func(ctx context.Context, e *Event) error {
record, err := lookup(ctx, e.ForeignID)
if err != nil {
return err
}
var t Type
err = Unmarshal(record.Object, &t)
if err != nil {
metrics.ProcessSkippedEvents.WithLabelValues(workflowName, processName, "unable to unmarshal object").Inc()
return nil
}
return hook(ctx, &TypedRecord[Type, Status]{
Record: *record,
Status: Status(record.Status),
Object: &t,
})
}
}