-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathhandler.py
More file actions
120 lines (109 loc) · 4.12 KB
/
handler.py
File metadata and controls
120 lines (109 loc) · 4.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import sys
import os
import json
import base64
import traceback
import inspect
import six
class Handler(object):
def __init__(self, apply_func, load_func):
"""
Creates the handler object
:param apply_func: A required function that can have an arity of 1-2, depending on if loading occurs
:param load_func: An optional supplier function used if load time events are required, has an arity of 0.
"""
self.FIFO_PATH = "/tmp/algoout"
apply_args, _, _, apply_defaults = inspect.getargspec(apply_func)
load_args, _, _, _ = inspect.getargspec(load_func)
if len(load_args) > 0:
raise Exception("load function must not have parameters")
if len(apply_args) > 2 or len(apply_args) == 0:
raise Exception("apply function may have between 1 and 2 parameters, not {}".format(len(apply_args)))
self.apply_func = apply_func
self.load_func = load_func
self.load_result = None
def load(self):
self.load_result = self.load_func()
print('PIPE_INIT_COMPLETE')
sys.stdout.flush()
def format_data(self, request):
if request['content_type'] in ['text', 'json']:
data = request['data']
elif request['content_type'] == 'binary':
data = self.wrap_binary_data(request['data'])
else:
raise Exception("Invalid content_type: {}".format(request['content_type']))
return data
def is_binary(self, arg):
if six.PY3:
return isinstance(arg, base64.bytes_types)
return isinstance(arg, bytearray)
def wrap_binary_data(self, data):
if six.PY3:
return bytes(data)
else:
return bytearray(data)
def format_response(self, response):
if self.is_binary(response):
content_type = 'binary'
response = base64.b64encode(response)
if not isinstance(response, six.string_types):
response = str(response, 'utf-8')
elif isinstance(response, six.string_types) or isinstance(response, six.text_type):
content_type = 'text'
else:
content_type = 'json'
response_string = json.dumps({
'result': response,
'metadata': {
'content_type': content_type
}
})
return response_string
def write_to_pipe(self, data_string):
if os.name == "posix":
with open(self.FIFO_PATH, 'w') as f:
f.write(data_string)
f.write('\n')
sys.stdout.flush()
if os.name == "nt":
sys.stdin = data_string
def serve(self):
try:
self.load()
except Exception as e:
if hasattr(e, 'error_type'):
error_type = e.error_type
else:
error_type = 'AlgorithmError'
load_error_string = json.dumps({
'error': {
'message': str(e),
'stacktrace': traceback.format_exc(),
'error_type': error_type
}
})
return self.write_to_pipe(load_error_string)
for line in sys.stdin:
try:
request = json.loads(line)
formatted_input = self.format_data(request)
if self.load_result:
apply_result = self.apply_func(formatted_input, self.load_result)
else:
apply_result = self.apply_func(formatted_input)
response_string = self.format_response(apply_result)
except Exception as e:
if hasattr(e, 'error_type'):
error_type = e.error_type
else:
error_type = 'AlgorithmError'
response_string = json.dumps({
'error': {
'message': str(e),
'stacktrace': traceback.format_exc(),
'error_type': error_type
}
})
finally:
self.write_to_pipe(response_string)