|
| 1 | +"""Executable Python script for a proxy service to dockerSkeleton. |
| 2 | +
|
| 3 | +Provides a proxy service (using Flask, a Python web microframework) |
| 4 | +that implements the required /init and /run routes to interact with |
| 5 | +the OpenWhisk invoker service. |
| 6 | +
|
| 7 | +The implementation of these routes is encapsulated in a class named |
| 8 | +ActionRunner which provides a basic framework for receiving code |
| 9 | +from an invoker, preparing it for execution, and then running the |
| 10 | +code when required. |
| 11 | +
|
| 12 | +/* |
| 13 | + * Licensed to the Apache Software Foundation (ASF) under one or more |
| 14 | + * contributor license agreements. See the NOTICE file distributed with |
| 15 | + * this work for additional information regarding copyright ownership. |
| 16 | + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| 17 | + * (the "License"); you may not use this file except in compliance with |
| 18 | + * the License. You may obtain a copy of the License at |
| 19 | + * |
| 20 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 21 | + * |
| 22 | + * Unless required by applicable law or agreed to in writing, software |
| 23 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 24 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 25 | + * See the License for the specific language governing permissions and |
| 26 | + * limitations under the License. |
| 27 | + */ |
| 28 | +""" |
| 29 | + |
| 30 | +import sys |
| 31 | +import os |
| 32 | +import json |
| 33 | +import subprocess |
| 34 | +import codecs |
| 35 | +import flask |
| 36 | +from gevent.wsgi import WSGIServer |
| 37 | +import zipfile |
| 38 | +import io |
| 39 | +import base64 |
| 40 | + |
| 41 | + |
| 42 | +class ActionRunner: |
| 43 | + """ActionRunner.""" |
| 44 | + LOG_SENTINEL = 'XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX' |
| 45 | + |
| 46 | + # initializes the runner |
| 47 | + # @param source the path where the source code will be located (if any) |
| 48 | + # @param binary the path where the binary will be located (may be the |
| 49 | + # same as source code path) |
| 50 | + def __init__(self, source=None, binary=None): |
| 51 | + defaultBinary = '/action/exec' |
| 52 | + self.source = source if source else defaultBinary |
| 53 | + self.binary = binary if binary else defaultBinary |
| 54 | + |
| 55 | + def preinit(self): |
| 56 | + return |
| 57 | + |
| 58 | + # extracts from the JSON object message a 'code' property and |
| 59 | + # writes it to the <source> path. The source code may have an |
| 60 | + # an optional <epilogue>. The source code is subsequently built |
| 61 | + # to produce the <binary> that is executed during <run>. |
| 62 | + # @param message is a JSON object, should contain 'code' |
| 63 | + # @return True iff binary exists and is executable |
| 64 | + def init(self, message): |
| 65 | + def prep(): |
| 66 | + self.preinit() |
| 67 | + if 'code' in message and message['code'] is not None: |
| 68 | + binary = message['binary'] if 'binary' in message else False |
| 69 | + if not binary: |
| 70 | + return self.initCodeFromString(message) |
| 71 | + else: |
| 72 | + return self.initCodeFromZip(message) |
| 73 | + else: |
| 74 | + return False |
| 75 | + |
| 76 | + if prep(): |
| 77 | + try: |
| 78 | + # write source epilogue if any |
| 79 | + # the message is passed along as it may contain other |
| 80 | + # fields relevant to a specific container. |
| 81 | + if self.epilogue(message) is False: |
| 82 | + return False |
| 83 | + # build the source |
| 84 | + if self.build(message) is False: |
| 85 | + return False |
| 86 | + except Exception: |
| 87 | + return False |
| 88 | + # verify the binary exists and is executable |
| 89 | + return self.verify() |
| 90 | + |
| 91 | + # optionally appends source to the loaded code during <init> |
| 92 | + def epilogue(self, init_arguments): |
| 93 | + return |
| 94 | + |
| 95 | + # optionally builds the source code loaded during <init> into an executable |
| 96 | + def build(self, init_arguments): |
| 97 | + return |
| 98 | + |
| 99 | + # @return True iff binary exists and is executable, False otherwise |
| 100 | + def verify(self): |
| 101 | + return (os.path.isfile(self.binary) and |
| 102 | + os.access(self.binary, os.X_OK)) |
| 103 | + |
| 104 | + # constructs an environment for the action to run in |
| 105 | + # @param message is a JSON object received from invoker (should |
| 106 | + # contain 'value' and 'api_key' and other metadata) |
| 107 | + # @return an environment dictionary for the action process |
| 108 | + def env(self, message): |
| 109 | + # make sure to include all the env vars passed in by the invoker |
| 110 | + env = os.environ |
| 111 | + for p in ['api_key', 'namespace', 'action_name', 'activation_id', 'deadline']: |
| 112 | + if p in message: |
| 113 | + env['__OW_%s' % p.upper()] = message[p] |
| 114 | + return env |
| 115 | + |
| 116 | + # runs the action, called iff self.verify() is True. |
| 117 | + # @param args is a JSON object representing the input to the action |
| 118 | + # @param env is the environment for the action to run in (defined edge |
| 119 | + # host, auth key) |
| 120 | + # return JSON object result of running the action or an error dictionary |
| 121 | + # if action failed |
| 122 | + def run(self, args, env): |
| 123 | + def error(msg): |
| 124 | + # fall through (exception and else case are handled the same way) |
| 125 | + sys.stdout.write('%s\n' % msg) |
| 126 | + return (502, {'error': 'The action did not return a dictionary.'}) |
| 127 | + |
| 128 | + try: |
| 129 | + input = json.dumps(args) |
| 130 | + p = subprocess.Popen( |
| 131 | + [self.binary, input], |
| 132 | + stdout=subprocess.PIPE, |
| 133 | + stderr=subprocess.PIPE, |
| 134 | + env=env) |
| 135 | + except Exception as e: |
| 136 | + return error(e) |
| 137 | + |
| 138 | + # run the process and wait until it completes. |
| 139 | + # stdout/stderr will always be set because we passed PIPEs to Popen |
| 140 | + (o, e) = p.communicate() |
| 141 | + |
| 142 | + # stdout/stderr may be either text or bytes, depending on Python |
| 143 | + # version, so if bytes, decode to text. Note that in Python 2 |
| 144 | + # a string will match both types; so also skip decoding in that case |
| 145 | + if isinstance(o, bytes) and not isinstance(o, str): |
| 146 | + o = o.decode('utf-8') |
| 147 | + if isinstance(e, bytes) and not isinstance(e, str): |
| 148 | + e = e.decode('utf-8') |
| 149 | + |
| 150 | + # get the last line of stdout, even if empty |
| 151 | + lastNewLine = o.rfind('\n', 0, len(o)-1) |
| 152 | + if lastNewLine != -1: |
| 153 | + # this is the result string to JSON parse |
| 154 | + lastLine = o[lastNewLine+1:].strip() |
| 155 | + # emit the rest as logs to stdout (including last new line) |
| 156 | + sys.stdout.write(o[:lastNewLine+1]) |
| 157 | + else: |
| 158 | + # either o is empty or it is the result string |
| 159 | + lastLine = o.strip() |
| 160 | + |
| 161 | + if e: |
| 162 | + sys.stderr.write(e) |
| 163 | + |
| 164 | + try: |
| 165 | + json_output = json.loads(lastLine) |
| 166 | + if isinstance(json_output, dict): |
| 167 | + return (200, json_output) |
| 168 | + else: |
| 169 | + return error(lastLine) |
| 170 | + except Exception: |
| 171 | + return error(lastLine) |
| 172 | + |
| 173 | + # initialize code from inlined string |
| 174 | + def initCodeFromString(self, message): |
| 175 | + with codecs.open(self.source, 'w', 'utf-8') as fp: |
| 176 | + fp.write(message['code']) |
| 177 | + return True |
| 178 | + |
| 179 | + # initialize code from base64 encoded archive |
| 180 | + def initCodeFromZip(self, message): |
| 181 | + try: |
| 182 | + bytes = base64.b64decode(message['code']) |
| 183 | + bytes = io.BytesIO(bytes) |
| 184 | + archive = zipfile.ZipFile(bytes) |
| 185 | + archive.extractall(os.path.dirname(self.source)) |
| 186 | + archive.close() |
| 187 | + return True |
| 188 | + except Exception as e: |
| 189 | + print('err', str(e)) |
| 190 | + return False |
| 191 | + |
| 192 | +proxy = flask.Flask(__name__) |
| 193 | +proxy.debug = False |
| 194 | +runner = None |
| 195 | + |
| 196 | + |
| 197 | +def setRunner(r): |
| 198 | + global runner |
| 199 | + runner = r |
| 200 | + |
| 201 | + |
| 202 | +@proxy.route('/init', methods=['POST']) |
| 203 | +def init(): |
| 204 | + message = flask.request.get_json(force=True, silent=True) |
| 205 | + if message and not isinstance(message, dict): |
| 206 | + flask.abort(404) |
| 207 | + else: |
| 208 | + value = message.get('value', {}) if message else {} |
| 209 | + |
| 210 | + if not isinstance(value, dict): |
| 211 | + flask.abort(404) |
| 212 | + |
| 213 | + try: |
| 214 | + status = runner.init(value) |
| 215 | + except Exception as e: |
| 216 | + status = False |
| 217 | + |
| 218 | + if status is True: |
| 219 | + return ('OK', 200) |
| 220 | + else: |
| 221 | + response = flask.jsonify({'error': 'The action failed to generate or locate a binary. See logs for details.'}) |
| 222 | + response.status_code = 502 |
| 223 | + return complete(response) |
| 224 | + |
| 225 | + |
| 226 | +@proxy.route('/run', methods=['POST']) |
| 227 | +def run(): |
| 228 | + def error(): |
| 229 | + response = flask.jsonify({'error': 'The action did not receive a dictionary as an argument.'}) |
| 230 | + response.status_code = 404 |
| 231 | + return complete(response) |
| 232 | + |
| 233 | + message = flask.request.get_json(force=True, silent=True) |
| 234 | + if message and not isinstance(message, dict): |
| 235 | + return error() |
| 236 | + else: |
| 237 | + args = message.get('value', {}) if message else {} |
| 238 | + if not isinstance(args, dict): |
| 239 | + return error() |
| 240 | + |
| 241 | + if runner.verify(): |
| 242 | + try: |
| 243 | + code, result = runner.run(args, runner.env(message or {})) |
| 244 | + response = flask.jsonify(result) |
| 245 | + response.status_code = code |
| 246 | + except Exception as e: |
| 247 | + response = flask.jsonify({'error': 'Internal error. {}'.format(e)}) |
| 248 | + response.status_code = 500 |
| 249 | + else: |
| 250 | + response = flask.jsonify({'error': 'The action failed to locate a binary. See logs for details.'}) |
| 251 | + response.status_code = 502 |
| 252 | + return complete(response) |
| 253 | + |
| 254 | + |
| 255 | +def complete(response): |
| 256 | + # Add sentinel to stdout/stderr |
| 257 | + sys.stdout.write('%s\n' % ActionRunner.LOG_SENTINEL) |
| 258 | + sys.stdout.flush() |
| 259 | + sys.stderr.write('%s\n' % ActionRunner.LOG_SENTINEL) |
| 260 | + sys.stderr.flush() |
| 261 | + return response |
| 262 | + |
| 263 | + |
| 264 | +def main(): |
| 265 | + port = int(os.getenv('FLASK_PROXY_PORT', 8080)) |
| 266 | + server = WSGIServer(('', port), proxy, log=None) |
| 267 | + server.serve_forever() |
| 268 | + |
| 269 | +if __name__ == '__main__': |
| 270 | + setRunner(ActionRunner()) |
| 271 | + main() |
0 commit comments