1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 ``ctx`` proxy server implementation.
26 import wsgiref.simple_server
29 from aria import modeling
31 from .. import exceptions
34 class CtxProxy(object):
36 def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
38 self._ctx_patcher = ctx_patcher
39 self.port = _get_unused_port()
40 self.socket_url = 'http://localhost:{0}'.format(self.port)
42 self._started = Queue.Queue(1)
43 self.thread = self._start_server()
44 self._started.get(timeout=5)
46 def _start_server(self):
48 class BottleServerAdapter(bottle.ServerAdapter):
51 def close_session(self):
52 self.proxy.ctx.model.log._session.remove()
56 class Server(wsgiref.simple_server.WSGIServer):
57 allow_reuse_address = True
60 def handle_error(self, request, client_address):
63 def serve_forever(self, poll_interval=0.5):
65 wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
67 # Once shutdown is called, we need to close the session.
68 # If the session is not closed properly, it might raise warnings,
69 # or even lock the database.
70 self.bottle_server.close_session()
72 class Handler(wsgiref.simple_server.WSGIRequestHandler):
73 def address_string(self):
74 return self.client_address[0]
76 def log_request(*args, **kwargs): # pylint: disable=no-method-argument
78 return wsgiref.simple_server.WSGIRequestHandler.log_request(*args,
80 server = wsgiref.simple_server.make_server(
85 handler_class=Handler)
86 self.proxy.server = server
87 self.proxy._started.put(True)
88 server.serve_forever(poll_interval=0.1)
91 # Since task is a thread_local object, we need to patch it inside the server thread.
92 self._ctx_patcher(self.ctx)
94 bottle_app = bottle.Bottle()
95 bottle_app.post('/', callback=self._request_handler)
101 server=BottleServerAdapter)
102 thread = threading.Thread(target=serve)
109 self.server.shutdown()
110 self.server.server_close()
112 def _request_handler(self):
113 request = bottle.request.body.read() # pylint: disable=no-member
114 response = self._process(request)
115 return bottle.LocalResponse(
116 body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
118 headers={'content-type': 'application/json'}
121 def _process(self, request):
123 with self.ctx.model.instrument(*self.ctx.INSTRUMENTATION_FIELDS):
124 payload = _process_request(self.ctx, request)
125 result_type = 'result'
126 if isinstance(payload, exceptions.ScriptException):
127 payload = dict(message=str(payload))
128 result_type = 'stop_operation'
129 result = {'type': result_type, 'payload': payload}
130 except Exception as e:
131 traceback_out = StringIO.StringIO()
132 traceback.print_exc(file=traceback_out)
134 'type': type(e).__name__,
136 'traceback': traceback_out.getvalue()
138 result = {'type': 'error', 'payload': payload}
145 def __exit__(self, *args, **kwargs):
149 class CtxError(RuntimeError):
153 class CtxParsingError(CtxError):
157 def _process_request(ctx, request):
158 request = json.loads(request)
159 args = request['args']
160 return _process_arguments(ctx, args)
163 def _process_arguments(obj, args):
166 # TODO: should there be a way to escape "=" in case it is needed as real argument?
167 equals_index = args.index('=') # raises ValueError if not found
170 if equals_index is not None:
171 if equals_index == 0:
172 raise CtxParsingError('The "=" argument cannot be first')
173 elif equals_index != len(args) - 2:
174 raise CtxParsingError('The "=" argument must be penultimate')
176 modifying_key = args[-3]
177 modifying_value = args[-1]
182 modifying_value = None
184 # Parse all arguments
186 obj, args = _process_next_operation(obj, args, modifying)
189 if hasattr(obj, '__setitem__'):
190 # Modify item value (dict, list, and similar)
191 if isinstance(obj, (list, tuple)):
192 modifying_key = int(modifying_key)
193 obj[modifying_key] = modifying_value
194 elif hasattr(obj, modifying_key):
195 # Modify object attribute
196 setattr(obj, modifying_key, modifying_value)
198 raise CtxError('Cannot modify `{0}` of `{1!r}`'.format(modifying_key, obj))
203 def _process_next_operation(obj, args, modifying):
209 # TODO: should there be a way to escape "[" and "]" in case they are needed as real
212 closing_index = args.index(']') # raises ValueError if not found
214 raise CtxParsingError('Opening "[" without a closing "]')
215 callable_args = args[:closing_index]
216 args = args[closing_index + 1:]
217 if not callable(obj):
218 raise CtxError('Used "[" and "] on an object that is not callable')
219 return obj(*callable_args), args
222 if isinstance(arg, basestring):
223 if hasattr(obj, arg):
224 return getattr(obj, arg), args
225 token_sugared = arg.replace('-', '_')
226 if hasattr(obj, token_sugared):
227 return getattr(obj, token_sugared), args
229 # Item? (dict, lists, and similar)
230 if hasattr(obj, '__getitem__'):
231 if modifying and (arg not in obj) and hasattr(obj, '__setitem__'):
234 return obj[arg], args
236 raise CtxParsingError('Cannot parse argument: `{0!r}`'.format(arg))
239 def _get_unused_port():
240 sock = socket.socket()
241 sock.bind(('127.0.0.1', 0))
242 _, port = sock.getsockname()