-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjob.go
More file actions
77 lines (64 loc) · 2.89 KB
/
job.go
File metadata and controls
77 lines (64 loc) · 2.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package pgqueue
import "time"
// TaskInfo is a read-only view of the underlying Task record, exposing
// only the fields handlers need for decisions without mutation access.
type TaskInfo struct {
ID int64
Queue string
Try int16
RunAt time.Time
CreatedAt time.Time
}
// Job is the generic wrapper passed to handlers. It pairs a read-only
// task view with the decoded typed payload.
type Job[P any] struct {
Task TaskInfo
Payload P
}
// Result is returned by handlers to signal what the framework should do.
type Result struct {
action resultAction
err error
}
// Err returns the optional error carried by the result, for diagnostics.
func (r Result) Err() error { return r.err }
type resultAction uint8
const (
resultDone resultAction = iota
resultRetry
resultFail
resultSkip // ticker-only
)
// Done signals successful completion. For jobs: mark completed. For tickers: reschedule.
func Done() Result { return Result{action: resultDone} }
// Retry signals a transient failure. The framework computes the delay from the
// spec's RetryDelay and the current Try count (linear backoff: try * retryDelay).
func Retry(err error) Result { return Result{action: resultRetry, err: err} }
// Fail signals a permanent failure — terminal for both jobs and tickers.
func Fail(err error) Result { return Result{action: resultFail, err: err} }
// Skip is ticker-only: skip this run, reschedule at Every, reset try count.
// Payload mutations are persisted (handler may have advanced a cursor).
// If returned by a job handler, treated as Fail.
func Skip(err error) Result { return Result{action: resultSkip, err: err} }
// JobSpec configures a one-shot job handler.
type JobSpec[P any] struct {
Queue string // queue name (required)
HashFn func(P) *string // dedup key; nil func or nil return = no dedup
PollInterval time.Duration // how often to poll; default: 5s
MaxRetries int16 // 0 = no retries; negative = use default (3)
RetryDelay time.Duration // linear backoff base; default: 30s
LeaseDuration time.Duration // claim lease; default: 5m
FinalizeBuffer time.Duration // reserved for finalize after handler; default: 10s
}
// TickerSpec configures a recurring ticker handler.
type TickerSpec[P any] struct {
Queue string // queue name (required)
Key string // stable identity for upsert; unique per queue (required)
InitialPayload P // payload for first-ever creation
Every time.Duration // reschedule interval (required)
PollInterval time.Duration // how often to poll; default: min(Every/2, 30s)
MaxRetries int16 // 0 = no retries; negative = use default (3)
RetryDelay time.Duration // linear backoff base; default: 30s
LeaseDuration time.Duration // claim lease; default: 5m
FinalizeBuffer time.Duration // reserved for finalize after handler; default: 10s
}