-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy patheventstreamer.go
More file actions
66 lines (55 loc) · 2.26 KB
/
eventstreamer.go
File metadata and controls
66 lines (55 loc) · 2.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package workflow
import (
"context"
"time"
)
// EventStreamer defines the event streaming adapter interface / api and all implementations should all be
// tested with adaptertest.TestEventStreamer to ensure the behaviour is compatible with workflow.
type EventStreamer interface {
NewSender(ctx context.Context, topic string) (EventSender, error)
NewReceiver(ctx context.Context, topic string, name string, opts ...ReceiverOption) (EventReceiver, error)
}
// EventSender defines the common interface that the EventStreamer adapter must implement for allowing the workflow
// to send events to the event streamer.
type EventSender interface {
Send(ctx context.Context, foreignID string, statusType int, headers map[Header]string) error
Close() error
}
// EventReceiver defines the common interface that the EventStreamer adapter must implement for allowing the workflow
// to receive events.
type EventReceiver interface {
Recv(ctx context.Context) (*Event, Ack, error)
Close() error
}
// Ack is used for the event streamer to safeUpdate its cursor of what messages have
// been consumed. If Ack is not called then the event streamer, depending on implementation,
// will likely not keep track of which records / events have been consumed.
type Ack func() error
type Header string
const (
HeaderWorkflowName Header = "workflow_name"
HeaderForeignID Header = "foreign_id"
HeaderTopic Header = "topic"
HeaderRunID Header = "run_id"
HeaderRunState Header = "run_state"
HeaderRecordVersion Header = "record_version"
HeaderConnectorData Header = "connector_data"
)
type ReceiverOptions struct {
PollFrequency time.Duration
StreamFromLatest bool
}
type ReceiverOption func(*ReceiverOptions)
func WithReceiverPollFrequency(d time.Duration) ReceiverOption {
return func(opt *ReceiverOptions) {
opt.PollFrequency = d
}
}
// StreamFromLatest tells the event streamer to start streaming events from the most recent event if there is no
// commited/stored offset (cursor for some event streaming platforms). If a consumer has received events before then
// this should have no affect and consumption should resume from where it left off previously.
func StreamFromLatest() ReceiverOption {
return func(opt *ReceiverOptions) {
opt.StreamFromLatest = true
}
}