-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathnetwork.go
More file actions
395 lines (345 loc) · 12.5 KB
/
network.go
File metadata and controls
395 lines (345 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
package flowbase
import (
"fmt"
"os"
"regexp"
"runtime"
"sort"
"strings"
"sync"
"time"
)
// ----------------------------------------------------------------------------
// Network
// ----------------------------------------------------------------------------
// Network is the centerpiece of the functionality in FlowBase, and is a
// container for a pipeline of processes making up a workflow. It has various
// methods for coordination the execution of the pipeline as a whole, such as
// keeping track of the maxiumum number of concurrent tasks, as well as helper
// methods for creating new processes, that automatically gets plugged in to the
// workflow on creation
type Network struct {
name string
procs map[string]Node
concurrentTasks chan struct{}
concurrentTasksMx sync.Mutex
sink *Sink
driver Node
logFile string
PlotConf NetworkPlotConf
}
// NetworkPlotConf contains configuraiton for plotting the workflow as a graph
// with graphviz
type NetworkPlotConf struct {
EdgeLabels bool
}
// Node is an interface for processes to be handled by Network
type Node interface {
Name() string
InPorts() map[string]*InPort
OutPorts() map[string]*OutPort
Ready() bool
Run()
Fail(interface{})
Failf(string, ...interface{})
}
// ----------------------------------------------------------------------------
// Factory function(s)
// ----------------------------------------------------------------------------
func NewNetworkWithFileLogging(name string, maxConcurrentTasks int) *Network {
net := NewNetworkWithMaxTasks(name, maxConcurrentTasks)
// Set up logging
allowedCharsPtrn := regexp.MustCompile("[^a-z0-9_]")
wfNameNormalized := allowedCharsPtrn.ReplaceAllString(strings.ToLower(name), "-")
net.logFile = "log/flowbase-" + time.Now().Format("20060102-150405") + "-" + wfNameNormalized + ".log"
InitLogAuditToFile(net.logFile)
return net
}
// NewNetworkCustomLogFile returns a new Network, with
func NewNetworkWithCustomLogFile(name string, maxConcurrentTasks int, logFile string) *Network {
net := NewNetworkWithMaxTasks(name, maxConcurrentTasks)
net.logFile = logFile
InitLogAuditToFile(logFile)
return net
}
func NewNetwork(name string) *Network {
InitLogError()
return NewNetworkWithMaxTasks(name, runtime.NumCPU())
}
func NewNetworkWithMaxTasks(name string, maxConcurrentTasks int) *Network {
net := &Network{
name: name,
procs: map[string]Node{},
concurrentTasks: make(chan struct{}, maxConcurrentTasks),
PlotConf: NetworkPlotConf{EdgeLabels: true},
}
sink := NewSink(net, name+"_default_sink")
net.sink = sink
net.driver = sink
return net
}
// ----------------------------------------------------------------------------
// Main API methods
// ----------------------------------------------------------------------------
// Name returns the name of the workflow
func (net *Network) Name() string {
return net.name
}
// Proc returns the process with name procName from the workflow
func (net *Network) Proc(procName string) Node {
if _, ok := net.procs[procName]; !ok {
net.Failf("No process named (%s)", procName)
}
return net.procs[procName]
}
// ProcsSorted returns the processes of the workflow, in an array, sorted by the
// process names
func (net *Network) ProcsSorted() []Node {
keys := []string{}
for k := range net.Procs() {
keys = append(keys, k)
}
sort.Strings(keys)
procs := []Node{}
for _, k := range keys {
procs = append(procs, net.Proc(k))
}
return procs
}
// Procs returns a map of all processes keyed by their names in the workflow
func (net *Network) Procs() map[string]Node {
return net.procs
}
// AddProc adds a Process to the workflow, to be run when the workflow runs
func (net *Network) AddProc(node Node) {
if net.procs[node.Name()] != nil {
net.Failf("A process with name (%s) already exists in the workflow! Use a more unique name!", node.Name())
}
net.procs[node.Name()] = node
}
// AddProcs takes one or many Processes and adds them to the workflow, to be run
// when the workflow runs.
func (net *Network) AddProcs(procs ...Node) {
for _, node := range procs {
net.AddProc(node)
}
}
// Sink returns the sink process of the workflow
func (net *Network) Sink() *Sink {
return net.sink
}
// SetSink sets the sink of the workflow to the provided sink process
func (net *Network) SetSink(sink *Sink) {
if net.sink.Ready() {
net.Fail("Trying to replace a sink which is already connected. Are you combining SetSink() with ConnectFinalOutPort()? That is not allowed!")
}
net.sink = sink
}
// IncConcurrentTasks increases the conter for how many concurrent tasks are
// currently running in the workflow
func (net *Network) IncConcurrentTasks(slots int) {
// We must lock so that multiple processes don't end up with partially "filled slots"
net.concurrentTasksMx.Lock()
for i := 0; i < slots; i++ {
net.concurrentTasks <- struct{}{}
Debug.Println("Increased concurrent tasks")
}
net.concurrentTasksMx.Unlock()
}
// DecConcurrentTasks decreases the conter for how many concurrent tasks are
// currently running in the workflow
func (net *Network) DecConcurrentTasks(slots int) {
for i := 0; i < slots; i++ {
<-net.concurrentTasks
Debug.Println("Decreased concurrent tasks")
}
}
// PlotGraph writes the workflow structure to a dot file
func (net *Network) PlotGraph(filePath string) {
dot := net.DotGraph()
createDirs(filePath)
dotFile, err := os.Create(filePath)
CheckWithMsg(err, "Could not create dot file "+filePath)
_, errDot := dotFile.WriteString(dot)
if errDot != nil {
net.Failf("Could not write to DOT-file %s: %s", dotFile.Name(), errDot)
}
}
// PlotGraphPDF writes the workflow structure to a dot file, and also runs the
// graphviz dot command to produce a PDF file (requires graphviz, with the dot
// command, installed on the system)
func (net *Network) PlotGraphPDF(filePath string) {
net.PlotGraph(filePath)
ExecCmd(fmt.Sprintf("dot -Tpdf %s -o %s.pdf", filePath, filePath))
}
// DotGraph generates a graph description in DOT format
// (See https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29)
// If Network.PlotConf.EdgeLabels is set to true, a label containing the
// in-port and out-port to which edges are connected to, will be printed.
func (net *Network) DotGraph() (dot string) {
dot = fmt.Sprintf(`digraph "%s" {`+"\n", net.Name())
dot += ` rankdir=LR;` + "\n"
dot += ` graph [fontname="Arial",fontsize=13,color="#384A52",fontcolor="#384A52"];` + "\n"
dot += ` node [fontname="Arial",fontsize=11,color="#384A52",fontcolor="#384A52",fillcolor="#EFF2F5",shape=box,style=filled];` + "\n"
dot += ` edge [fontname="Arial",fontsize=9, color="#384A52",fontcolor="#384A52"];` + "\n"
con := ""
remToDotPtn := regexp.MustCompile(`^.*\.`)
for _, p := range net.ProcsSorted() {
dot += fmt.Sprintf(` "%s" [shape=box];`+"\n", p.Name())
// File connections
for opname, op := range p.OutPorts() {
for rpname, rp := range op.RemotePorts {
if net.PlotConf.EdgeLabels {
con += fmt.Sprintf(` "%s" -> "%s" [taillabel="%s", headlabel="%s"];`+"\n", op.Process().Name(), rp.Process().Name(), remToDotPtn.ReplaceAllString(opname, ""), remToDotPtn.ReplaceAllString(rpname, ""))
} else {
con += fmt.Sprintf(` "%s" -> "%s";`+"\n", op.Process().Name(), rp.Process().Name())
}
}
}
}
dot += con
dot += "}\n"
return
}
// ----------------------------------------------------------------------------
// Run methods
// ----------------------------------------------------------------------------
// Run runs all the processes of the workflow
func (net *Network) Run() {
net.runProcs(net.procs)
}
// RunTo runs all processes upstream of, and including, the process with
// names provided as arguments
func (net *Network) RunTo(finalProcNames ...string) {
procs := []Node{}
for _, procName := range finalProcNames {
procs = append(procs, net.Proc(procName))
}
net.RunToProcs(procs...)
}
// RunToRegex runs all processes upstream of, and including, the process
// whose name matches any of the provided regexp patterns
func (net *Network) RunToRegex(procNamePatterns ...string) {
procsToRun := []Node{}
for _, pattern := range procNamePatterns {
regexpPtrn := regexp.MustCompile(pattern)
for procName, node := range net.Procs() {
matches := regexpPtrn.MatchString(procName)
if matches {
procsToRun = append(procsToRun, node)
}
}
}
net.RunToProcs(procsToRun...)
}
// RunToProcs runs all processes upstream of, and including, the process strucs
// provided as arguments
func (net *Network) RunToProcs(finalProcs ...Node) {
procsToRun := map[string]Node{}
for _, finalProc := range finalProcs {
procsToRun = mergeWFMaps(procsToRun, upstreamProcsForProc(finalProc))
procsToRun[finalProc.Name()] = finalProc
}
net.runProcs(procsToRun)
}
// ----------------------------------------------------------------------------
// Helper methods for running the workflow
// ----------------------------------------------------------------------------
// runProcs runs a specified set of processes only
func (net *Network) runProcs(procs map[string]Node) {
net.reconnectDeadEndConnections(procs)
if !net.readyToRun(procs) {
net.Fail("Network not ready to run, due to previously reported errors, so exiting.")
}
for _, node := range procs {
Debug.Printf(net.name+": Starting process (%s) in new go-routine", node.Name())
go node.Run()
}
Debug.Printf("%s: Starting driver process (%s) in main go-routine", net.name, net.driver.Name())
net.Auditf("Starting workflow (Writing log to %s)", net.logFile)
net.driver.Run()
net.Auditf("Finished workflow (Log written to %s)", net.logFile)
}
func (net *Network) readyToRun(procs map[string]Node) bool {
if len(procs) == 0 {
Error.Println(net.name + ": The workflow is empty. Did you forget to add the processes to it?")
return false
}
if net.sink == nil {
Error.Println(net.name + ": sink is nil!")
return false
}
for _, node := range procs {
if !node.Ready() {
Error.Println(net.name + ": Not everything connected. Network shutting down.")
return false
}
}
return true
}
// reconnectDeadEndConnections disonnects connections to processes which are
// not in the set of processes to be run, and, if an out-port for a process
// supposed to be run gets disconnected, its out-port(s) will be connected to
// the sink instead, to make sure it is properly executed.
func (net *Network) reconnectDeadEndConnections(procs map[string]Node) {
foundNewDriverProc := false
for _, node := range procs {
// OutPorts
for _, opt := range node.OutPorts() {
for iptName, ipt := range opt.RemotePorts {
// If the remotely connected process is not among the ones to run ...
if ipt.Process() == nil {
Debug.Printf("Disconnecting in-port (%s) from out-port (%s)", ipt.Name(), opt.Name())
opt.Disconnect(iptName)
} else if _, ok := procs[ipt.Process().Name()]; !ok {
Debug.Printf("Disconnecting in-port (%s) from out-port (%s)", ipt.Name(), opt.Name())
opt.Disconnect(iptName)
}
}
if !opt.Ready() {
Debug.Printf("Connecting disconnected out-port (%s) of process (%s) to workflow sink", opt.Name(), opt.Process().Name())
net.sink.From(opt)
}
}
if len(node.OutPorts()) == 0 {
if foundNewDriverProc {
net.Failf("Found more than one process without out-ports. Cannot use both as drivers (One of them being '%s'). Adapt your workflow accordingly.", node.Name())
}
foundNewDriverProc = true
net.driver = node
}
}
if foundNewDriverProc && len(procs) > 1 { // Allow for a workflow with a single process
// A process can't both be the driver and be included in the main procs
// map, so if we have an alerative driver, it should not be in the main
// procs map
delete(net.procs, net.driver.Name())
}
}
// upstreamProcsForProc returns all processes it is connected to, either
// directly or indirectly, via its in-ports and param-in-ports
func upstreamProcsForProc(node Node) map[string]Node {
procs := map[string]Node{}
for _, inp := range node.InPorts() {
for _, rpt := range inp.RemotePorts {
procs[rpt.Process().Name()] = rpt.Process()
mergeWFMaps(procs, upstreamProcsForProc(rpt.Process()))
}
}
return procs
}
func mergeWFMaps(a map[string]Node, b map[string]Node) map[string]Node {
for k, v := range b {
a[k] = v
}
return a
}
func (net *Network) Auditf(msg string, parts ...interface{}) {
Audit.Printf("[Network:%s] %s\n", net.Name(), fmt.Sprintf(msg, parts...))
}
func (net *Network) Failf(msg string, parts ...interface{}) {
net.Fail(fmt.Sprintf(msg, parts...))
}
func (net *Network) Fail(msg interface{}) {
Failf("[Network:%s] %s", net.Name(), msg)
}