-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbaseprocess.go
More file actions
169 lines (146 loc) · 4.81 KB
/
baseprocess.go
File metadata and controls
169 lines (146 loc) · 4.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package flowbase
import "fmt"
// BaseProcess provides a skeleton for processes, such as the main Process
// component, and the custom components in the flowbase/components library
type BaseProcess struct {
name string
workflow *Network
inPorts map[string]*InPort
outPorts map[string]*OutPort
}
// NewBaseProcess returns a new BaseProcess, connected to the provided workflow,
// and with the name name
func NewBaseProcess(net *Network, name string) BaseProcess {
return BaseProcess{
workflow: net,
name: name,
inPorts: make(map[string]*InPort),
outPorts: make(map[string]*OutPort),
}
}
// Name returns the name of the process
func (p *BaseProcess) Name() string {
return p.name
}
// Network returns the workflow the process is connected to
func (p *BaseProcess) Network() *Network {
return p.workflow
}
// ------------------------------------------------
// In-port stuff
// ------------------------------------------------
// InPort returns the in-port with name portName
func (p *BaseProcess) InPort(portName string) *InPort {
if _, ok := p.inPorts[portName]; !ok {
p.Failf("No such in-port ('%s'). Please check your workflow code!", portName)
}
return p.inPorts[portName]
}
// InitInPort adds the in-port port to the process, with name portName
func (p *BaseProcess) InitInPort(node Node, portName string) {
if _, ok := p.inPorts[portName]; ok {
p.Failf("Such an in-port ('%s') already exists. Please check your workflow code!", portName)
}
ipt := NewInPort(portName)
ipt.process = node
p.inPorts[portName] = ipt
}
// InPorts returns a map of all the in-ports of the process, keyed by their
// names
func (p *BaseProcess) InPorts() map[string]*InPort {
return p.inPorts
}
// DeleteInPort deletes an InPort object from the process
func (p *BaseProcess) DeleteInPort(portName string) {
if _, ok := p.inPorts[portName]; !ok {
p.Failf("No such in-port ('%s'). Please check your workflow code!", portName)
}
delete(p.inPorts, portName)
}
// ------------------------------------------------
// Out-port stuff
// ------------------------------------------------
// InitOutPort adds the out-port port to the process, with name portName
func (p *BaseProcess) InitOutPort(node Node, portName string) {
if _, ok := p.outPorts[portName]; ok {
p.Failf("Such an out-port ('%s') already exists. Please check your workflow code!", portName)
}
opt := NewOutPort(portName)
opt.process = node
p.outPorts[portName] = opt
}
// OutPort returns the out-port with name portName
func (p *BaseProcess) OutPort(portName string) *OutPort {
if _, ok := p.outPorts[portName]; !ok {
p.Failf("No such out-port ('%s'). Please check your workflow code!", portName)
}
return p.outPorts[portName]
}
// OutPorts returns a map of all the out-ports of the process, keyed by their
// names
func (p *BaseProcess) OutPorts() map[string]*OutPort {
return p.outPorts
}
// DeleteOutPort deletes a OutPort object from the process
func (p *BaseProcess) DeleteOutPort(portName string) {
if _, ok := p.outPorts[portName]; !ok {
p.Failf("No such out-port ('%s'). Please check your workflow code!", portName)
}
delete(p.outPorts, portName)
}
// ------------------------------------------------
// Other stuff
// ------------------------------------------------
// Ready checks whether all the process' ports are connected
func (p *BaseProcess) Ready() (isReady bool) {
isReady = true
for portName, port := range p.inPorts {
if !port.Ready() {
p.Failf("InPort (%s) is not connected - check your workflow code!", portName)
isReady = false
}
}
for portName, port := range p.outPorts {
if !port.Ready() {
p.Failf("OutPort (%s) is not connected - check your workflow code!", portName)
isReady = false
}
}
return isReady
}
// CloseOutPorts closes all (normal) out-ports
func (p *BaseProcess) CloseOutPorts() {
for _, p := range p.OutPorts() {
p.Close()
}
}
// Failf fails with a message that includes the process name
func (p *BaseProcess) Failf(msg string, parts ...interface{}) {
p.Fail(fmt.Sprintf(msg, parts...))
}
// Fail fails with a message that includes the process name
func (p *BaseProcess) Fail(msg interface{}) {
Failf("[Process:%s] %s", p.Name(), msg)
}
func (p *BaseProcess) Auditf(msg string, parts ...interface{}) {
p.Audit(fmt.Sprintf(msg, parts...))
}
func (p *BaseProcess) Audit(msg interface{}) {
Audit.Printf("[Process:%s] %s"+"\n", p.Name(), msg)
}
func (p *BaseProcess) receiveOnInPorts() (ips map[string]*Packet, inPortsOpen bool) {
inPortsOpen = true
ips = make(map[string]*Packet)
// Read input IPs on in-ports and set up path mappings
for inpName, inPort := range p.InPorts() {
Debug.Printf("[Process %s]: Receieving on inPort (%s) ...", p.name, inpName)
ip, open := <-inPort.Chan
if !open {
inPortsOpen = false
continue
}
Debug.Printf("[Process %s]: Got ip (%s) ...", p.name, ip.ID())
ips[inpName] = ip
}
return
}