-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathoptions.go
More file actions
55 lines (46 loc) · 1.5 KB
/
options.go
File metadata and controls
55 lines (46 loc) · 1.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package reflex
import (
"time"
)
// StreamOptions provide options sent to the event stream source.
type StreamOptions struct {
// Lag defines the duration after an event is created before it becomes
// eligible for streaming.
Lag time.Duration
// StreamFromHead defines that the initial event be retrieved
// from the head of the evens table.
StreamFromHead bool
// StreamToHead defines that ErrHeadReached be returned as soon
// as no more events are available.
StreamToHead bool
}
// StreamOption defines a functional option that configures StreamOptions.
type StreamOption func(*StreamOptions)
// WithStreamFromHead provides an option to stream only new events from
// the head of events table. Note this overrides the "after" parameter.
func WithStreamFromHead() StreamOption {
return func(sc *StreamOptions) {
sc.StreamFromHead = true
}
}
// WithStreamToHead provides an option to return ErrHeadReached as soon
// as no more events are available. This is useful for testing or back-fills.
func WithStreamToHead() StreamOption {
return func(sc *StreamOptions) {
sc.StreamToHead = true
}
}
// WithStreamLag provides an option to stream events only after they are older than a duration.
func WithStreamLag(d time.Duration) StreamOption {
return func(sc *StreamOptions) {
sc.Lag = d
}
}
// ResolveOptions converts a list of options to the StreamOptions struct
func ResolveOptions(options ...StreamOption) StreamOptions {
var ret StreamOptions
for _, opt := range options {
opt(&ret)
}
return ret
}