vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / orchestrator / execution_plugin / ctx_proxy / server.py
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/execution_plugin/ctx_proxy/server.py
new file mode 100644 (file)
index 0000000..91b95d9
--- /dev/null
@@ -0,0 +1,244 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+``ctx`` proxy server implementation.
+"""
+
+import json
+import socket
+import Queue
+import StringIO
+import threading
+import traceback
+import wsgiref.simple_server
+
+import bottle
+from aria import modeling
+
+from .. import exceptions
+
+
+class CtxProxy(object):
+
+    def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
+        self.ctx = ctx
+        self._ctx_patcher = ctx_patcher
+        self.port = _get_unused_port()
+        self.socket_url = 'http://localhost:{0}'.format(self.port)
+        self.server = None
+        self._started = Queue.Queue(1)
+        self.thread = self._start_server()
+        self._started.get(timeout=5)
+
+    def _start_server(self):
+
+        class BottleServerAdapter(bottle.ServerAdapter):
+            proxy = self
+
+            def close_session(self):
+                self.proxy.ctx.model.log._session.remove()
+
+            def run(self, app):
+
+                class Server(wsgiref.simple_server.WSGIServer):
+                    allow_reuse_address = True
+                    bottle_server = self
+
+                    def handle_error(self, request, client_address):
+                        pass
+
+                    def serve_forever(self, poll_interval=0.5):
+                        try:
+                            wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
+                        finally:
+                            # Once shutdown is called, we need to close the session.
+                            # If the session is not closed properly, it might raise warnings,
+                            # or even lock the database.
+                            self.bottle_server.close_session()
+
+                class Handler(wsgiref.simple_server.WSGIRequestHandler):
+                    def address_string(self):
+                        return self.client_address[0]
+
+                    def log_request(*args, **kwargs):  # pylint: disable=no-method-argument
+                        if not self.quiet:
+                            return wsgiref.simple_server.WSGIRequestHandler.log_request(*args,
+                                                                                        **kwargs)
+                server = wsgiref.simple_server.make_server(
+                    host=self.host,
+                    port=self.port,
+                    app=app,
+                    server_class=Server,
+                    handler_class=Handler)
+                self.proxy.server = server
+                self.proxy._started.put(True)
+                server.serve_forever(poll_interval=0.1)
+
+        def serve():
+            # Since task is a thread_local object, we need to patch it inside the server thread.
+            self._ctx_patcher(self.ctx)
+
+            bottle_app = bottle.Bottle()
+            bottle_app.post('/', callback=self._request_handler)
+            bottle.run(
+                app=bottle_app,
+                host='localhost',
+                port=self.port,
+                quiet=True,
+                server=BottleServerAdapter)
+        thread = threading.Thread(target=serve)
+        thread.daemon = True
+        thread.start()
+        return thread
+
+    def close(self):
+        if self.server:
+            self.server.shutdown()
+            self.server.server_close()
+
+    def _request_handler(self):
+        request = bottle.request.body.read()  # pylint: disable=no-member
+        response = self._process(request)
+        return bottle.LocalResponse(
+            body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
+            status=200,
+            headers={'content-type': 'application/json'}
+        )
+
+    def _process(self, request):
+        try:
+            with self.ctx.model.instrument(*self.ctx.INSTRUMENTATION_FIELDS):
+                payload = _process_request(self.ctx, request)
+                result_type = 'result'
+                if isinstance(payload, exceptions.ScriptException):
+                    payload = dict(message=str(payload))
+                    result_type = 'stop_operation'
+                result = {'type': result_type, 'payload': payload}
+        except Exception as e:
+            traceback_out = StringIO.StringIO()
+            traceback.print_exc(file=traceback_out)
+            payload = {
+                'type': type(e).__name__,
+                'message': str(e),
+                'traceback': traceback_out.getvalue()
+            }
+            result = {'type': 'error', 'payload': payload}
+
+        return result
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.close()
+
+
+class CtxError(RuntimeError):
+    pass
+
+
+class CtxParsingError(CtxError):
+    pass
+
+
+def _process_request(ctx, request):
+    request = json.loads(request)
+    args = request['args']
+    return _process_arguments(ctx, args)
+
+
+def _process_arguments(obj, args):
+    # Modifying?
+    try:
+        # TODO: should there be a way to escape "=" in case it is needed as real argument?
+        equals_index = args.index('=') # raises ValueError if not found
+    except ValueError:
+        equals_index = None
+    if equals_index is not None:
+        if equals_index == 0:
+            raise CtxParsingError('The "=" argument cannot be first')
+        elif equals_index != len(args) - 2:
+            raise CtxParsingError('The "=" argument must be penultimate')
+        modifying = True
+        modifying_key = args[-3]
+        modifying_value = args[-1]
+        args = args[:-3]
+    else:
+        modifying = False
+        modifying_key = None
+        modifying_value = None
+
+    # Parse all arguments
+    while len(args) > 0:
+        obj, args = _process_next_operation(obj, args, modifying)
+
+    if modifying:
+        if hasattr(obj, '__setitem__'):
+            # Modify item value (dict, list, and similar)
+            if isinstance(obj, (list, tuple)):
+                modifying_key = int(modifying_key)
+            obj[modifying_key] = modifying_value
+        elif hasattr(obj, modifying_key):
+            # Modify object attribute
+            setattr(obj, modifying_key, modifying_value)
+        else:
+            raise CtxError('Cannot modify `{0}` of `{1!r}`'.format(modifying_key, obj))
+
+    return obj
+
+
+def _process_next_operation(obj, args, modifying):
+    args = list(args)
+    arg = args.pop(0)
+
+    # Call?
+    if arg == '[':
+        # TODO: should there be a way to escape "[" and "]" in case they are needed as real
+        # arguments?
+        try:
+            closing_index = args.index(']') # raises ValueError if not found
+        except ValueError:
+            raise CtxParsingError('Opening "[" without a closing "]')
+        callable_args = args[:closing_index]
+        args = args[closing_index + 1:]
+        if not callable(obj):
+            raise CtxError('Used "[" and "] on an object that is not callable')
+        return obj(*callable_args), args
+
+    # Attribute?
+    if isinstance(arg, basestring):
+        if hasattr(obj, arg):
+            return getattr(obj, arg), args
+        token_sugared = arg.replace('-', '_')
+        if hasattr(obj, token_sugared):
+            return getattr(obj, token_sugared), args
+
+    # Item? (dict, lists, and similar)
+    if hasattr(obj, '__getitem__'):
+        if modifying and (arg not in obj) and hasattr(obj, '__setitem__'):
+            # Create nested dict
+            obj[arg] = {}
+        return obj[arg], args
+
+    raise CtxParsingError('Cannot parse argument: `{0!r}`'.format(arg))
+
+
+def _get_unused_port():
+    sock = socket.socket()
+    sock.bind(('127.0.0.1', 0))
+    _, port = sock.getsockname()
+    sock.close()
+    return port