-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathtest_purepython.py
More file actions
46 lines (37 loc) · 1.41 KB
/
test_purepython.py
File metadata and controls
46 lines (37 loc) · 1.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import unittest
from python_workflow_definition.purepython import load_workflow_json
function_str = """
def get_prod_and_div(x, y):
return {"prod": x * y, "div": x / y}
def get_sum(x, y):
return x + y
def get_square(x):
return x ** 2
"""
workflow_str = """
{
"version": "0.1.0",
"nodes": [
{"id": 0, "type": "function", "value": "workflow.get_prod_and_div"},
{"id": 1, "type": "function", "value": "workflow.get_sum"},
{"id": 2, "type": "function", "value": "workflow.get_square"},
{"id": 3, "type": "input", "value": 1, "name": "x"},
{"id": 4, "type": "input", "value": 2, "name": "y"},
{"id": 5, "type": "output", "name": "result"}
],
"edges": [
{"target": 0, "targetPort": "x", "source": 3, "sourcePort": null},
{"target": 0, "targetPort": "y", "source": 4, "sourcePort": null},
{"target": 1, "targetPort": "x", "source": 0, "sourcePort": "prod"},
{"target": 1, "targetPort": "y", "source": 0, "sourcePort": "div"},
{"target": 2, "targetPort": "x", "source": 1, "sourcePort": null},
{"target": 5, "targetPort": null, "source": 2, "sourcePort": null}
]
}"""
class TestPurePython(unittest.TestCase):
def test_pure_python(self):
with open("workflow.py", "w") as f:
f.write(function_str)
with open("workflow.json", "w") as f:
f.write(workflow_str)
self.assertEqual(load_workflow_json(file_name="workflow.json"), 6.25)