-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathoutputs.go
More file actions
274 lines (234 loc) · 8.06 KB
/
outputs.go
File metadata and controls
274 lines (234 loc) · 8.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package core
import (
"maps"
"reflect"
"strings"
)
type SetOutputValueOpts struct {
NotExistsIsNoError bool
ForceSet bool
StringTypeHint bool // If true, treat value as string type; otherwise determine from value
}
// HasOutputsInterface is a representation for all outputs of a node.
// The node that implements this interface has outgoing connections.
type HasOutputsInterface interface {
OutputDefsClone() map[OutputId]OutputDefinition
OutputDefByPortId(outputId string) (OutputDefinition, *IndexPortInfo, bool)
SetOutputDefs(outputs map[OutputId]OutputDefinition, opts SetDefsOpts)
OutputValueById(c *ExecutionState, outputId OutputId) (value any, err error)
SetOutputValue(c *ExecutionState, outputId OutputId, value any, opts SetOutputValueOpts) error
AddSubOutput(portId string, groupPortId string, portIndex int) error
IncrementConnectionCounter(outputId OutputId)
SetOwner(host NodeBaseInterface)
}
type Outputs struct {
outputDefs map[OutputId]OutputDefinition
outputConnectionCounter map[OutputId]int64
outputIndexPorts map[string]IndexPortInfo
owner NodeBaseInterface
}
func (n *Outputs) OutputDefByPortId(outputId string) (OutputDefinition, *IndexPortInfo, bool) {
indexPort, ok := n.outputIndexPorts[outputId]
if !ok {
outputDef, ok := n.outputDefs[OutputId(outputId)]
return outputDef, nil, ok
}
outputDef, ok := n.outputDefs[OutputId(indexPort.ArrayPortId)]
if !ok {
// must never happen since `outputIndexPorts` is
// only filled with existing output ports
panic("group output port not found")
}
return outputDef, &indexPort, true
}
func (n *Outputs) SetOwner(owner NodeBaseInterface) {
n.owner = owner
}
func (n *Outputs) IncrementConnectionCounter(outputId OutputId) {
if n.outputConnectionCounter == nil {
n.outputConnectionCounter = make(map[OutputId]int64)
}
n.outputConnectionCounter[outputId]++
}
func (n *Outputs) OutputDefsClone() map[OutputId]OutputDefinition {
return maps.Clone(n.outputDefs)
}
func (n *Outputs) SetOutputDefs(outputDefs map[OutputId]OutputDefinition, opts SetDefsOpts) {
if opts.AssignmentMode == AssignmentMode_Replace {
n.outputDefs = outputDefs
} else {
if n.outputDefs == nil {
n.outputDefs = make(map[OutputId]OutputDefinition)
}
maps.Copy(n.outputDefs, outputDefs)
}
}
func (n *Outputs) AddSubOutput(portId string, groupPortId string, portIndex int) error {
// simple test, proper test should be done by caller by using `IsValidIndexPortId`
if !strings.Contains(portId, "[") {
return CreateErr(nil, nil, "port '%s' is not a sub port", portId)
}
if n.outputIndexPorts == nil {
n.outputIndexPorts = make(map[string]IndexPortInfo)
}
groupOutputDef, exists := n.outputDefs[OutputId(groupPortId)]
if !exists {
return CreateErr(nil, nil, "port '%s' does not exist", groupPortId)
}
if !groupOutputDef.Array {
return CreateErr(nil, nil, "port '%s' is not an array port", groupPortId)
}
n.outputIndexPorts[portId] = IndexPortInfo{
IndexPortId: portId,
ArrayPortId: groupPortId,
Index: portIndex,
}
return nil
}
func (n *Outputs) OutputValueById(c *ExecutionState, outputId OutputId) (any, error) {
// If 'Outputs' belongs to a data node, then this method doesn't seem to be implemented.
// If 'Outputs' belongs to a execution node, then the value hasn't been set yet.
// Reminder, for execution nodes, once they are executed, all outputs have to be populated!
return nil, CreateErr(c, &ErrNoOutputValue{}, "output port '%v' has no value", outputId)
}
// SetOutputValue sets the value of an output to the node.
// The value type must match the output type, otherwise an error
// is returned.
func (n *Outputs) SetOutputValue(ec *ExecutionState, outputId OutputId, value any, opts SetOutputValueOpts) error {
var outputType string
outputDef, outputExists := n.outputDefs[outputId]
if outputExists {
outputType = outputDef.Type
expectedType := outputType
if outputDef.Array {
expectedType = "[]" + expectedType
}
if !isValueValidForOutput(value, expectedType) {
return CreateErr(ec, nil, "output '%s' (%s): expected %v, but got %T", outputDef.Name, outputId, outputDef.Type, value)
}
} else {
if !opts.ForceSet {
// if the output could not be found,
// check if it is a sub port instead
groupPortId, _, isIndexPort := IsValidIndexPortId(string(outputId))
if !isIndexPort {
if opts.NotExistsIsNoError {
return nil
}
return CreateErr(ec, nil, "failed to set a value to an unknown port '%s'", outputId)
}
outputDef, outputExists = n.outputDefs[OutputId(groupPortId)]
if !outputExists {
if opts.NotExistsIsNoError {
return nil
}
// If still nothing found, return an error
return CreateErr(ec, nil, "failed to set a value to an unknown port '%s'", outputId)
}
outputType = outputDef.Type
if !isValueValidForOutput(value, outputType) {
return CreateErr(ec, nil, "output '%s' (%s): expected %v, but got %T", outputDef.Name, outputId, outputDef.Type, value)
}
} else {
// ForceSet without known output definition - use provided type or determine from value
if opts.StringTypeHint {
outputType = "string"
} else {
outputType = determineOutputType(value)
}
}
}
// If the output is not connected, there's no need to keep the value. It can be discarded, unless
// for debug sessions where we always keep the output value, as it will be transmitted to the client for inspection
connectionCounter := n.outputConnectionCounter[outputId]
if connectionCounter == 0 && !ec.IsDebugSession && !opts.ForceSet {
// TODO: (Seb) If the value is a stream, we should close it here
return nil
}
ec.CacheDataOutput(n.owner, string(outputId), value, outputType, Permanent)
return nil
}
func isValueValidForOutput(value any, expectedType string) bool {
if expectedType == "any" || expectedType == "unknown" {
return true
}
// if its not unknown or any but nil then the value is not compatible
if value == nil {
return false
}
valueType := reflect.TypeOf(value)
kind := valueType.Kind()
_, mappingExists := validKindsForExpectedType[expectedType]
if mappingExists {
_, valid := validKindsForExpectedType[expectedType][kind]
return valid
}
switch expectedType {
case "[]string":
return kind == reflect.Slice && valueType.Elem().Kind() == reflect.String
case "[]number":
return kind == reflect.Slice && isNumericType(valueType.Elem())
case "[]bool":
return kind == reflect.Slice && valueType.Elem().Kind() == reflect.Bool
case "git-repo":
return valueType == gitRepository
case "stream":
return kind == reflect.String || valueType == dataStreamFactoryType || valueType == ioPipeReaderFactoryType || (kind == reflect.Slice && valueType.Elem().Kind() == reflect.Uint8)
case "storage-provider":
return valueType.Implements(storageProviderType)
case "credentials":
return valueType.Implements(credentialsType)
}
return valueType.String() == expectedType
}
func isNumericType(valueType reflect.Type) bool {
kind := valueType.Kind()
_, valid := validKindsForExpectedType["number"][kind]
return valid
}
var validKindsForExpectedType = map[string]map[reflect.Kind]struct{}{
"iterable": {
reflect.Slice: {},
reflect.Map: {},
reflect.String: {},
},
"string": {
reflect.String: {},
},
"number": {
reflect.Int: {},
reflect.Int8: {},
reflect.Int16: {},
reflect.Int32: {},
reflect.Int64: {},
reflect.Uint: {},
reflect.Uint8: {},
reflect.Uint16: {},
reflect.Uint32: {},
reflect.Uint64: {},
reflect.Float32: {},
reflect.Float64: {},
},
"bool": {
reflect.Bool: {},
},
}
// determineOutputType returns the primitive type name for a value, or "unknown" if not a primitive.
func determineOutputType(value any) string {
if value == nil {
return "unknown"
}
kind := reflect.TypeOf(value).Kind()
switch kind {
case reflect.String:
return "string"
case reflect.Bool:
return "bool"
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64,
reflect.Float32, reflect.Float64:
return "number"
default:
return "unknown"
}
}