-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathrunstate.go
More file actions
158 lines (140 loc) · 4.85 KB
/
runstate.go
File metadata and controls
158 lines (140 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package workflow
import (
"context"
"fmt"
"strconv"
)
type RunState int
const (
RunStateUnknown RunState = 0
RunStateInitiated RunState = 1
RunStateRunning RunState = 2
RunStatePaused RunState = 3
RunStateCancelled RunState = 4
RunStateCompleted RunState = 5
RunStateDataDeleted RunState = 6
RunStateRequestedDataDeleted RunState = 7
runStateSentinel RunState = 8
)
func (rs RunState) String() string {
switch rs {
case RunStateUnknown:
return "Unknown"
case RunStateInitiated:
return "Initiated"
case RunStateRunning:
return "Running"
case RunStatePaused:
return "Paused"
case RunStateCancelled:
return "Cancelled"
case RunStateCompleted:
return "Completed"
case RunStateDataDeleted:
return "Data Deleted"
case RunStateRequestedDataDeleted:
return "Requested Data Deleted"
default:
return "RunState(" + strconv.FormatInt(int64(rs), 10) + ")"
}
}
func (rs RunState) Valid() bool {
return rs > RunStateUnknown && rs < runStateSentinel
}
func (rs RunState) Finished() bool {
switch rs {
case RunStateCompleted, RunStateCancelled, RunStateRequestedDataDeleted, RunStateDataDeleted:
return true
default:
return false
}
}
// Stopped is the type of status that requires consumers to ignore the workflow run as it is in a stopped state. Only
// paused workflow runs can be resumed and must be done so via the workflow API or the Run methods. All cancelled
// workflow runs are cancelled permanently and cannot be undone whereas Pausing can be resumed.
func (rs RunState) Stopped() bool {
switch rs {
case RunStatePaused, RunStateCancelled, RunStateRequestedDataDeleted, RunStateDataDeleted:
return true
default:
return false
}
}
// RunStateController allows the interaction with a specific workflow run.
type RunStateController interface {
// Pause will take the workflow run specified and move it into a temporary state where it will no longer be processed.
// A paused workflow run can be resumed by calling Resume. ErrUnableToPause is returned when a workflow is not in a
// state to be paused.
Pause(ctx context.Context, reason string) error
// Cancel can be called after Pause has been called. A paused run of the workflow can be indefinitely cancelled.
// Once cancelled, DeleteData can be called and will move the run into an indefinite state of DataDeleted.
// ErrUnableToCancel is returned when the workflow record is not in a state to be cancelled.
Cancel(ctx context.Context, reason string) error
// Resume can be called on a workflow run that has been paused. ErrUnableToResume is returned when the workflow
// run is not in a state to be resumed.
Resume(ctx context.Context) error
// DeleteData can be called after a workflow run has been completed or cancelled. DeleteData should be used to
// comply with the right to be forgotten such as complying with GDPR. ErrUnableToDelete is returned when the
// workflow run is not in a state to be deleted.
DeleteData(ctx context.Context, reason string) error
}
func NewRunStateController(store storeFunc, wr *Record) RunStateController {
return &runStateControllerImpl{
record: wr,
store: store,
}
}
type customDelete func(wr *Record) ([]byte, error)
type runStateControllerImpl struct {
record *Record
store storeFunc
}
func (rsc *runStateControllerImpl) Pause(ctx context.Context, reason string) error {
return rsc.update(ctx, RunStatePaused, reason)
}
func (rsc *runStateControllerImpl) Resume(ctx context.Context) error {
return rsc.update(ctx, RunStateRunning, "")
}
func (rsc *runStateControllerImpl) Cancel(ctx context.Context, reason string) error {
return rsc.update(ctx, RunStateCancelled, reason)
}
func (rsc *runStateControllerImpl) DeleteData(ctx context.Context, reason string) error {
return rsc.update(ctx, RunStateRequestedDataDeleted, reason)
}
func (rsc *runStateControllerImpl) update(ctx context.Context, rs RunState, reason string) error {
valid, ok := runStateTransitions[rsc.record.RunState]
if !ok || !valid[rs] {
return fmt.Errorf("invalid RunState: from %s | to %s", rsc.record.RunState, rs)
}
previousRunState := rsc.record.RunState
rsc.record.RunState = rs
rsc.record.Meta.RunStateReason = reason
return updateRecord(ctx, rsc.store, rsc.record, previousRunState, rsc.record.Meta.StatusDescription)
}
var runStateTransitions = map[RunState]map[RunState]bool{
RunStateInitiated: {
RunStateRunning: true,
RunStatePaused: true,
},
RunStateRunning: {
RunStateCompleted: true,
RunStatePaused: true,
RunStateCancelled: true,
},
RunStatePaused: {
RunStateRunning: true,
RunStateCancelled: true,
},
RunStateCompleted: {
RunStateRequestedDataDeleted: true,
},
RunStateCancelled: {
RunStateRequestedDataDeleted: true,
},
RunStateRequestedDataDeleted: {
RunStateDataDeleted: true,
},
RunStateDataDeleted: {
RunStateRequestedDataDeleted: true,
},
}