2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
26 from oslo_config import cfg
27 from oslo_log import log
28 from oslo_messaging._drivers import common as rpc_common
30 from conductor.common.music.messaging import message
31 from conductor.common.music.model import base
32 from conductor.i18n import _LE, _LI # pylint: disable=W0212
34 LOG = log.getLogger(__name__)
38 MESSAGING_SERVER_OPTS = [
39 cfg.StrOpt('keyspace',
40 default='conductor_rpc',
41 help='Music keyspace for messages'),
42 cfg.IntOpt('check_interval',
45 help='Wait interval while checking for a message response. '
46 'Default value is 1 second.'),
50 help='Overall message response timeout. '
51 'Default value is 10 seconds.'),
55 help='Number of workers for messaging service. '
56 'Default value is 1.'),
57 cfg.IntOpt('polling_interval',
60 help='Time between checking for new messages. '
61 'Default value is 1.'),
64 help='Log debug messages. '
65 'Default value is False.'),
68 CONF.register_opts(MESSAGING_SERVER_OPTS, group='messaging_server')
70 # Some class/method descriptions taken from this Oslo Messaging
71 # RPC API Tutorial/Demo: https://www.youtube.com/watch?v=Bf4gkeoBzvA
73 RPCSVRNAME = "Music-RPC Server"
77 """Returns a messaging target.
79 A target encapsulates all the information to identify where a message
80 should be sent or what messages a server is listening for.
85 def __init__(self, topic):
86 """Set the topic and topic class"""
89 # Because this is Music-specific, the server is
90 # built-in to the API class, stored as the transport.
91 # Thus, unlike oslo.messaging, there is no server
92 # specified for a target. There also isn't an
93 # exchange, namespace, or version at the moment.
95 # Dynamically create a message class for this topic.
96 self._topic_class = base.create_dynamic_model(
97 keyspace=CONF.messaging_server.keyspace,
98 baseclass=message.Message, classname=self.topic)
100 if not self._topic_class:
101 raise RuntimeError("Error setting the topic class "
102 "for the messaging layer.")
110 def topic_class(self):
111 """Topic Class Property"""
112 return self._topic_class
115 class RPCClient(object):
116 """Returns an RPC client using Music as a transport.
118 The RPC client is responsible for sending method invocations
119 to remote servers via a messaging transport.
121 A method invocation consists of a request context dictionary
122 a method name, and a dictionary of arguments. A cast() invocation
123 just sends the request and returns immediately. A call() invocation
124 waits for the server to send a return value.
127 def __init__(self, conf, transport, target):
128 """Set the transport and target"""
130 self.transport = transport
132 self.RPC = self.target.topic_class
134 # introduced as a quick means to cache messages
135 # with the aim of preventing unnecessary communication
136 # across conductor components.
137 # self.message_cache = dict()
139 def __check_rpc_status(self, rpc_id, rpc_method):
140 """Check status for a given message id"""
141 # Wait check_interval seconds before proceeding
142 check_interval = self.conf.messaging_server.check_interval
143 time.sleep(check_interval)
144 if self.conf.messaging_server.debug:
145 LOG.debug("Checking status for message {} method {} on "
146 "topic {}".format(rpc_id, rpc_method, self.target.topic))
147 rpc = self.RPC.query.one(rpc_id)
150 def cast(self, ctxt, method, args):
151 """Asynchronous Call"""
152 rpc = self.RPC(action=self.RPC.CAST,
153 ctxt=ctxt, method=method, args=args)
157 topic = self.target.topic
159 _LI("Message {} on topic {} enqueued").format(rpc_id, topic))
160 if self.conf.messaging_server.debug:
161 LOG.debug("Casting method {} with args {}".format(method, args))
165 def call(self, ctxt, method, args):
166 """Synchronous Call"""
167 # # check if the call has a message saved in cache
168 # # key: string concatenation of ctxt + method + args
169 # # value: rpc response object
171 # for k, v in ctxt.items():
173 # key += '#' + str(v) + '#'
174 # key += '|' + str(method) + '|'
175 # for k, v in args.items():
177 # key += '#' + str(v) + '#'
179 # # check if the method has been called before
181 # if key in self.message_cache:
182 # LOG.debug("Retrieved method {} with args "
183 # "{} from cache".format(method, args))
184 # return self.message_cache[key]
186 rpc_start_time = time.time()
188 rpc = self.RPC(action=self.RPC.CALL,
189 ctxt=ctxt, method=method, args=args)
191 # TODO(jdandrea): Do something if the assert fails.
195 topic = self.target.topic
197 _LI("Message {} on topic {} enqueued.").format(rpc_id, topic))
198 if self.conf.messaging_server.debug:
199 LOG.debug("Calling method {} with args {}".format(method, args))
201 # Check message status within a thread
202 executor = futurist.ThreadPoolExecutor()
203 started_at = time.time()
204 while (time.time() - started_at) <= \
205 self.conf.messaging_server.timeout:
206 fut = executor.submit(self.__check_rpc_status, rpc_id, method)
208 if rpc and rpc.finished:
209 if self.conf.messaging_server.debug:
210 LOG.debug("Message {} method {} response received".
211 format(rpc_id, method))
215 # Get response, delete message, and return response
216 if not rpc or not rpc.finished:
217 LOG.error(_LE("Message {} on topic {} timed out at {} seconds").
218 format(rpc_id, topic,
219 self.conf.messaging_server.timeout))
221 LOG.error(_LE("Message {} on topic {} returned an error").
222 format(rpc_id, topic))
223 response = rpc.response
224 failure = rpc.failure
225 rpc.delete() # TODO(jdandrea): Put a TTL on the msg instead?
226 # self.message_cache[key] = response
228 LOG.debug("Elapsed time: {0:.3f} sec".format(
229 time.time() - rpc_start_time)
231 # If there's a failure, raise it as an exception
233 if failure is not None and failure != '':
234 # TODO(jdandrea): Do we need to populate allowed(_remote_exmods)?
235 raise rpc_common.deserialize_remote_exception(failure, allowed)
239 class RPCService(cotyledon.Service):
240 """Listener for the RPC service.
242 An RPC Service exposes a number of endpoints, each of which contain
243 a set of methods which may be invoked remotely by clients over a
244 given transport. To create an RPC server, you supply a transport,
245 target, and a list of endpoints.
247 Start the server with server.run()
250 # This will appear in 'ps xaf'
253 def __init__(self, worker_id, conf, **kwargs):
255 super(RPCService, self).__init__(worker_id)
256 if conf.messaging_server.debug:
257 LOG.debug("%s" % self.__class__.__name__)
258 self._init(conf, **kwargs)
261 def _init(self, conf, **kwargs):
262 """Prepare to process requests"""
264 self.rpc_listener = None
265 self.transport = kwargs.pop('transport')
266 self.target = kwargs.pop('target')
267 self.endpoints = kwargs.pop('endpoints')
268 self.flush = kwargs.pop('flush')
270 self.RPC = self.target.topic_class
271 self.name = "{}, topic({})".format(RPCSVRNAME, self.target.topic)
274 self._flush_enqueued()
276 def _flush_enqueued(self):
277 """Flush all messages with an enqueued status.
279 Use this only when the parent service is not running concurrently.
282 msgs = self.RPC.query.all()
287 def _log_error_and_update_msg(self, msg, error_msg):
294 msg.status = message.Message.ERROR
297 def __check_for_messages(self):
298 """Wait for the polling interval, then do the real message check."""
300 # Wait for at least poll_interval sec
301 polling_interval = self.conf.messaging_server.polling_interval
302 time.sleep(polling_interval)
303 if self.conf.messaging_server.debug:
304 LOG.debug("Topic {}: Checking for new messages".format(
309 # FIXME(jdandrea): Better name for this, please, kthx.
311 """Look for a new RPC call and serve it"""
312 # Get all the messages in queue
313 msgs = self.RPC.query.all()
315 # Find the first msg marked as enqueued.
319 # RPC methods must not start/end with an underscore.
320 if msg.method.startswith('_') or msg.method.endswith('_'):
321 error_msg = _LE("Method {} must not start or end"
322 "with underscores").format(msg.method)
323 self._log_error_and_update_msg(msg, error_msg)
326 # The first endpoint that supports the method wins.
328 for endpoint in self.endpoints:
329 if msg.method not in dir(endpoint):
331 endpoint_method = getattr(endpoint, msg.method)
332 if callable(endpoint_method):
333 method = endpoint_method
334 if self.conf.messaging_server.debug:
335 LOG.debug("Message {} method {} is "
336 "handled by endpoint {}".
337 format(msg.id, msg.method,
338 method.__str__.__name__))
341 error_msg = _LE("Message {} method {} unsupported "
342 "in endpoints.").format(msg.id, msg.method)
343 self._log_error_and_update_msg(msg, error_msg)
346 # All methods must take a ctxt and args param.
347 if inspect.getargspec(method).args != ['self', 'ctx', 'arg']:
348 error_msg = _LE("Method {} must take three args: "
349 "self, ctx, arg").format(msg.method)
350 self._log_error_and_update_msg(msg, error_msg)
353 LOG.info(_LI("Message {} method {} received").format(
355 if self.conf.messaging_server.debug:
357 _LI("Message {} method {} context: {}, args: {}").format(
358 msg.id, msg.method, msg.ctxt, msg.args))
362 # Methods return an opaque dictionary
363 result = method(msg.ctxt, msg.args)
365 # FIXME(jdandrea): Remove response/error and make it opaque.
366 # That means this would just be assigned result outright.
367 msg.response = result.get('response', result)
369 # Current sys.exc_info() content can be overridden
370 # by another exception raised by a log handler during
371 # LOG.exception(). So keep a copy and delete it later.
372 failure = sys.exc_info()
374 # Do not log details about the failure here. It will
375 # be returned later upstream.
376 LOG.exception(_LE('Exception during message handling'))
380 msg.status = message.Message.COMPLETED
383 rpc_common.serialize_remote_exception(failure)
384 msg.status = message.Message.ERROR
385 LOG.info(_LI("Message {} method {}, status: {}").format(
386 msg.id, msg.method, msg.status))
387 if self.conf.messaging_server.debug:
388 LOG.debug("Message {} method {}, response: {}".format(
389 msg.id, msg.method, msg.response))
392 LOG.exception(_LE("Can not send reply for message {} "
394 format(msg.id, msg.method))
396 # Remove circular object reference between the current
397 # stack frame and the traceback in exc_info.
400 def _gracefully_stop(self):
401 """Gracefully stop working on things"""
405 """Prepare to restart the RPC Server"""
410 # The server listens for messages and calls the
411 # appropriate methods. It also deletes messages once
413 if self.conf.messaging_server.debug:
414 LOG.debug("%s" % self.__class__.__name__)
416 # Listen for messages within a thread
417 executor = futurist.ThreadPoolExecutor()
419 fut = executor.submit(self.__check_for_messages)
425 if self.conf.messaging_server.debug:
426 LOG.debug("%s" % self.__class__.__name__)
428 self._gracefully_stop()
429 super(RPCService, self).terminate()
433 if self.conf.messaging_server.debug:
434 LOG.debug("%s" % self.__class__.__name__)