vFW and vDNS support added to azure-plugin
[multicloud/azure.git] / azure / aria / aria-extension-cloudify / src / aria / aria / orchestrator / execution_plugin / ctx_proxy / server.py
1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements.  See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License.  You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """
17 ``ctx`` proxy server implementation.
18 """
19
20 import json
21 import socket
22 import Queue
23 import StringIO
24 import threading
25 import traceback
26 import wsgiref.simple_server
27
28 import bottle
29 from aria import modeling
30
31 from .. import exceptions
32
33
34 class CtxProxy(object):
35
36     def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
37         self.ctx = ctx
38         self._ctx_patcher = ctx_patcher
39         self.port = _get_unused_port()
40         self.socket_url = 'http://localhost:{0}'.format(self.port)
41         self.server = None
42         self._started = Queue.Queue(1)
43         self.thread = self._start_server()
44         self._started.get(timeout=5)
45
46     def _start_server(self):
47
48         class BottleServerAdapter(bottle.ServerAdapter):
49             proxy = self
50
51             def close_session(self):
52                 self.proxy.ctx.model.log._session.remove()
53
54             def run(self, app):
55
56                 class Server(wsgiref.simple_server.WSGIServer):
57                     allow_reuse_address = True
58                     bottle_server = self
59
60                     def handle_error(self, request, client_address):
61                         pass
62
63                     def serve_forever(self, poll_interval=0.5):
64                         try:
65                             wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
66                         finally:
67                             # Once shutdown is called, we need to close the session.
68                             # If the session is not closed properly, it might raise warnings,
69                             # or even lock the database.
70                             self.bottle_server.close_session()
71
72                 class Handler(wsgiref.simple_server.WSGIRequestHandler):
73                     def address_string(self):
74                         return self.client_address[0]
75
76                     def log_request(*args, **kwargs):  # pylint: disable=no-method-argument
77                         if not self.quiet:
78                             return wsgiref.simple_server.WSGIRequestHandler.log_request(*args,
79                                                                                         **kwargs)
80                 server = wsgiref.simple_server.make_server(
81                     host=self.host,
82                     port=self.port,
83                     app=app,
84                     server_class=Server,
85                     handler_class=Handler)
86                 self.proxy.server = server
87                 self.proxy._started.put(True)
88                 server.serve_forever(poll_interval=0.1)
89
90         def serve():
91             # Since task is a thread_local object, we need to patch it inside the server thread.
92             self._ctx_patcher(self.ctx)
93
94             bottle_app = bottle.Bottle()
95             bottle_app.post('/', callback=self._request_handler)
96             bottle.run(
97                 app=bottle_app,
98                 host='localhost',
99                 port=self.port,
100                 quiet=True,
101                 server=BottleServerAdapter)
102         thread = threading.Thread(target=serve)
103         thread.daemon = True
104         thread.start()
105         return thread
106
107     def close(self):
108         if self.server:
109             self.server.shutdown()
110             self.server.server_close()
111
112     def _request_handler(self):
113         request = bottle.request.body.read()  # pylint: disable=no-member
114         response = self._process(request)
115         return bottle.LocalResponse(
116             body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
117             status=200,
118             headers={'content-type': 'application/json'}
119         )
120
121     def _process(self, request):
122         try:
123             with self.ctx.model.instrument(*self.ctx.INSTRUMENTATION_FIELDS):
124                 payload = _process_request(self.ctx, request)
125                 result_type = 'result'
126                 if isinstance(payload, exceptions.ScriptException):
127                     payload = dict(message=str(payload))
128                     result_type = 'stop_operation'
129                 result = {'type': result_type, 'payload': payload}
130         except Exception as e:
131             traceback_out = StringIO.StringIO()
132             traceback.print_exc(file=traceback_out)
133             payload = {
134                 'type': type(e).__name__,
135                 'message': str(e),
136                 'traceback': traceback_out.getvalue()
137             }
138             result = {'type': 'error', 'payload': payload}
139
140         return result
141
142     def __enter__(self):
143         return self
144
145     def __exit__(self, *args, **kwargs):
146         self.close()
147
148
149 class CtxError(RuntimeError):
150     pass
151
152
153 class CtxParsingError(CtxError):
154     pass
155
156
157 def _process_request(ctx, request):
158     request = json.loads(request)
159     args = request['args']
160     return _process_arguments(ctx, args)
161
162
163 def _process_arguments(obj, args):
164     # Modifying?
165     try:
166         # TODO: should there be a way to escape "=" in case it is needed as real argument?
167         equals_index = args.index('=') # raises ValueError if not found
168     except ValueError:
169         equals_index = None
170     if equals_index is not None:
171         if equals_index == 0:
172             raise CtxParsingError('The "=" argument cannot be first')
173         elif equals_index != len(args) - 2:
174             raise CtxParsingError('The "=" argument must be penultimate')
175         modifying = True
176         modifying_key = args[-3]
177         modifying_value = args[-1]
178         args = args[:-3]
179     else:
180         modifying = False
181         modifying_key = None
182         modifying_value = None
183
184     # Parse all arguments
185     while len(args) > 0:
186         obj, args = _process_next_operation(obj, args, modifying)
187
188     if modifying:
189         if hasattr(obj, '__setitem__'):
190             # Modify item value (dict, list, and similar)
191             if isinstance(obj, (list, tuple)):
192                 modifying_key = int(modifying_key)
193             obj[modifying_key] = modifying_value
194         elif hasattr(obj, modifying_key):
195             # Modify object attribute
196             setattr(obj, modifying_key, modifying_value)
197         else:
198             raise CtxError('Cannot modify `{0}` of `{1!r}`'.format(modifying_key, obj))
199
200     return obj
201
202
203 def _process_next_operation(obj, args, modifying):
204     args = list(args)
205     arg = args.pop(0)
206
207     # Call?
208     if arg == '[':
209         # TODO: should there be a way to escape "[" and "]" in case they are needed as real
210         # arguments?
211         try:
212             closing_index = args.index(']') # raises ValueError if not found
213         except ValueError:
214             raise CtxParsingError('Opening "[" without a closing "]')
215         callable_args = args[:closing_index]
216         args = args[closing_index + 1:]
217         if not callable(obj):
218             raise CtxError('Used "[" and "] on an object that is not callable')
219         return obj(*callable_args), args
220
221     # Attribute?
222     if isinstance(arg, basestring):
223         if hasattr(obj, arg):
224             return getattr(obj, arg), args
225         token_sugared = arg.replace('-', '_')
226         if hasattr(obj, token_sugared):
227             return getattr(obj, token_sugared), args
228
229     # Item? (dict, lists, and similar)
230     if hasattr(obj, '__getitem__'):
231         if modifying and (arg not in obj) and hasattr(obj, '__setitem__'):
232             # Create nested dict
233             obj[arg] = {}
234         return obj[arg], args
235
236     raise CtxParsingError('Cannot parse argument: `{0!r}`'.format(arg))
237
238
239 def _get_unused_port():
240     sock = socket.socket()
241     sock.bind(('127.0.0.1', 0))
242     _, port = sock.getsockname()
243     sock.close()
244     return port