Added all common modules in conductor directory
[optf/has.git] / conductor / conductor / common / music / messaging / component.py
1 #
2 # -------------------------------------------------------------------------
3 #   Copyright (c) 2015-2017 AT&T Intellectual Property
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 # -------------------------------------------------------------------------
18 #
19
20 import inspect
21 import sys
22 import time
23
24 import cotyledon
25 import futurist
26 from oslo_config import cfg
27 from oslo_log import log
28 from oslo_messaging._drivers import common as rpc_common
29
30 from conductor.common.music.messaging import message
31 from conductor.common.music.model import base
32 from conductor.i18n import _LE, _LI  # pylint: disable=W0212
33
34 LOG = log.getLogger(__name__)
35
36 CONF = cfg.CONF
37
38 MESSAGING_SERVER_OPTS = [
39     cfg.StrOpt('keyspace',
40                default='conductor_rpc',
41                help='Music keyspace for messages'),
42     cfg.IntOpt('check_interval',
43                default=1,
44                min=1,
45                help='Wait interval while checking for a message response. '
46                     'Default value is 1 second.'),
47     cfg.IntOpt('timeout',
48                default=10,
49                min=1,
50                help='Overall message response timeout. '
51                     'Default value is 10 seconds.'),
52     cfg.IntOpt('workers',
53                default=1,
54                min=1,
55                help='Number of workers for messaging service. '
56                     'Default value is 1.'),
57     cfg.IntOpt('polling_interval',
58                default=1,
59                min=1,
60                help='Time between checking for new messages. '
61                     'Default value is 1.'),
62     cfg.BoolOpt('debug',
63                 default=False,
64                 help='Log debug messages. '
65                      'Default value is False.'),
66 ]
67
68 CONF.register_opts(MESSAGING_SERVER_OPTS, group='messaging_server')
69
70 # Some class/method descriptions taken from this Oslo Messaging
71 # RPC API Tutorial/Demo: https://www.youtube.com/watch?v=Bf4gkeoBzvA
72
73 RPCSVRNAME = "Music-RPC Server"
74
75
76 class Target(object):
77     """Returns a messaging target.
78
79     A target encapsulates all the information to identify where a message
80     should be sent or what messages a server is listening for.
81     """
82     _topic = None
83     _topic_class = None
84
85     def __init__(self, topic):
86         """Set the topic and topic class"""
87         self._topic = topic
88
89         # Because this is Music-specific, the server is
90         # built-in to the API class, stored as the transport.
91         # Thus, unlike oslo.messaging, there is no server
92         # specified for a target. There also isn't an
93         # exchange, namespace, or version at the moment.
94
95         # Dynamically create a message class for this topic.
96         self._topic_class = base.create_dynamic_model(
97             keyspace=CONF.messaging_server.keyspace,
98             baseclass=message.Message, classname=self.topic)
99
100         if not self._topic_class:
101             raise RuntimeError("Error setting the topic class "
102                                "for the messaging layer.")
103
104     @property
105     def topic(self):
106         """Topic Property"""
107         return self._topic
108
109     @property
110     def topic_class(self):
111         """Topic Class Property"""
112         return self._topic_class
113
114
115 class RPCClient(object):
116     """Returns an RPC client using Music as a transport.
117
118     The RPC client is responsible for sending method invocations
119     to remote servers via a messaging transport.
120
121     A method invocation consists of a request context dictionary
122     a method name, and a dictionary of arguments. A cast() invocation
123     just sends the request and returns immediately. A call() invocation
124     waits for the server to send a return value.
125     """
126
127     def __init__(self, conf, transport, target):
128         """Set the transport and target"""
129         self.conf = conf
130         self.transport = transport
131         self.target = target
132         self.RPC = self.target.topic_class
133
134         # introduced as a quick means to cache messages
135         # with the aim of preventing unnecessary communication
136         # across conductor components.
137         # self.message_cache = dict()
138
139     def __check_rpc_status(self, rpc_id, rpc_method):
140         """Check status for a given message id"""
141         # Wait check_interval seconds before proceeding
142         check_interval = self.conf.messaging_server.check_interval
143         time.sleep(check_interval)
144         if self.conf.messaging_server.debug:
145             LOG.debug("Checking status for message {} method {} on "
146                       "topic {}".format(rpc_id, rpc_method, self.target.topic))
147         rpc = self.RPC.query.one(rpc_id)
148         return rpc
149
150     def cast(self, ctxt, method, args):
151         """Asynchronous Call"""
152         rpc = self.RPC(action=self.RPC.CAST,
153                        ctxt=ctxt, method=method, args=args)
154         assert(rpc.enqueued)
155
156         rpc_id = rpc.id
157         topic = self.target.topic
158         LOG.info(
159             _LI("Message {} on topic {} enqueued").format(rpc_id, topic))
160         if self.conf.messaging_server.debug:
161             LOG.debug("Casting method {} with args {}".format(method, args))
162
163         return rpc_id
164
165     def call(self, ctxt, method, args):
166         """Synchronous Call"""
167         # # check if the call has a message saved in cache
168         # # key: string concatenation of ctxt + method + args
169         # # value: rpc response object
170         # key = ""
171         # for k, v in ctxt.items():
172         #     key += str(k)
173         #     key += '#' + str(v) + '#'
174         # key += '|' + str(method) + '|'
175         # for k, v in args.items():
176         #     key += str(k)
177         #     key += '#' + str(v) + '#'
178         #
179         # # check if the method has been called before
180         # # and cached
181         # if key in self.message_cache:
182         #     LOG.debug("Retrieved method {} with args "
183         #               "{} from cache".format(method, args))
184         #     return self.message_cache[key]
185
186         rpc_start_time = time.time()
187
188         rpc = self.RPC(action=self.RPC.CALL,
189                        ctxt=ctxt, method=method, args=args)
190
191         # TODO(jdandrea): Do something if the assert fails.
192         assert(rpc.enqueued)
193
194         rpc_id = rpc.id
195         topic = self.target.topic
196         LOG.info(
197             _LI("Message {} on topic {} enqueued.").format(rpc_id, topic))
198         if self.conf.messaging_server.debug:
199             LOG.debug("Calling method {} with args {}".format(method, args))
200
201         # Check message status within a thread
202         executor = futurist.ThreadPoolExecutor()
203         started_at = time.time()
204         while (time.time() - started_at) <= \
205                 self.conf.messaging_server.timeout:
206             fut = executor.submit(self.__check_rpc_status, rpc_id, method)
207             rpc = fut.result()
208             if rpc and rpc.finished:
209                 if self.conf.messaging_server.debug:
210                     LOG.debug("Message {} method {} response received".
211                               format(rpc_id, method))
212                 break
213         executor.shutdown()
214
215         # Get response, delete message, and return response
216         if not rpc or not rpc.finished:
217             LOG.error(_LE("Message {} on topic {} timed out at {} seconds").
218                       format(rpc_id, topic,
219                              self.conf.messaging_server.timeout))
220         elif not rpc.ok:
221             LOG.error(_LE("Message {} on topic {} returned an error").
222                       format(rpc_id, topic))
223         response = rpc.response
224         failure = rpc.failure
225         rpc.delete()  # TODO(jdandrea): Put a TTL on the msg instead?
226         # self.message_cache[key] = response
227
228         LOG.debug("Elapsed time: {0:.3f} sec".format(
229             time.time() - rpc_start_time)
230         )
231         # If there's a failure, raise it as an exception
232         allowed = []
233         if failure is not None and failure != '':
234             # TODO(jdandrea): Do we need to populate allowed(_remote_exmods)?
235             raise rpc_common.deserialize_remote_exception(failure, allowed)
236         return response
237
238
239 class RPCService(cotyledon.Service):
240     """Listener for the RPC service.
241
242     An RPC Service exposes a number of endpoints, each of which contain
243     a set of methods which may be invoked remotely by clients over a
244     given transport. To create an RPC server, you supply a transport,
245     target, and a list of endpoints.
246
247     Start the server with server.run()
248     """
249
250     # This will appear in 'ps xaf'
251     name = RPCSVRNAME
252
253     def __init__(self, worker_id, conf, **kwargs):
254         """Initializer"""
255         super(RPCService, self).__init__(worker_id)
256         if conf.messaging_server.debug:
257             LOG.debug("%s" % self.__class__.__name__)
258         self._init(conf, **kwargs)
259         self.running = True
260
261     def _init(self, conf, **kwargs):
262         """Prepare to process requests"""
263         self.conf = conf
264         self.rpc_listener = None
265         self.transport = kwargs.pop('transport')
266         self.target = kwargs.pop('target')
267         self.endpoints = kwargs.pop('endpoints')
268         self.flush = kwargs.pop('flush')
269         self.kwargs = kwargs
270         self.RPC = self.target.topic_class
271         self.name = "{}, topic({})".format(RPCSVRNAME, self.target.topic)
272
273         if self.flush:
274             self._flush_enqueued()
275
276     def _flush_enqueued(self):
277         """Flush all messages with an enqueued status.
278
279         Use this only when the parent service is not running concurrently.
280         """
281
282         msgs = self.RPC.query.all()
283         for msg in msgs:
284             if msg.enqueued:
285                 msg.delete()
286
287     def _log_error_and_update_msg(self, msg, error_msg):
288         LOG.error(error_msg)
289         msg.response = {
290             'error': {
291                 'message': error_msg
292             }
293         }
294         msg.status = message.Message.ERROR
295         msg.update()
296
297     def __check_for_messages(self):
298         """Wait for the polling interval, then do the real message check."""
299
300         # Wait for at least poll_interval sec
301         polling_interval = self.conf.messaging_server.polling_interval
302         time.sleep(polling_interval)
303         if self.conf.messaging_server.debug:
304             LOG.debug("Topic {}: Checking for new messages".format(
305                 self.target.topic))
306         self._do()
307         return True
308
309     # FIXME(jdandrea): Better name for this, please, kthx.
310     def _do(self):
311         """Look for a new RPC call and serve it"""
312         # Get all the messages in queue
313         msgs = self.RPC.query.all()
314         for msg in msgs:
315             # Find the first msg marked as enqueued.
316             if not msg.enqueued:
317                 continue
318
319             # RPC methods must not start/end with an underscore.
320             if msg.method.startswith('_') or msg.method.endswith('_'):
321                 error_msg = _LE("Method {} must not start or end"
322                                 "with underscores").format(msg.method)
323                 self._log_error_and_update_msg(msg, error_msg)
324                 return
325
326             # The first endpoint that supports the method wins.
327             method = None
328             for endpoint in self.endpoints:
329                 if msg.method not in dir(endpoint):
330                     continue
331                 endpoint_method = getattr(endpoint, msg.method)
332                 if callable(endpoint_method):
333                     method = endpoint_method
334                     if self.conf.messaging_server.debug:
335                         LOG.debug("Message {} method {} is "
336                                   "handled by endpoint {}".
337                                   format(msg.id, msg.method,
338                                          method.__str__.__name__))
339                     break
340             if not method:
341                 error_msg = _LE("Message {} method {} unsupported "
342                                 "in endpoints.").format(msg.id, msg.method)
343                 self._log_error_and_update_msg(msg, error_msg)
344                 return
345
346             # All methods must take a ctxt and args param.
347             if inspect.getargspec(method).args != ['self', 'ctx', 'arg']:
348                 error_msg = _LE("Method {} must take three args: "
349                                 "self, ctx, arg").format(msg.method)
350                 self._log_error_and_update_msg(msg, error_msg)
351                 return
352
353             LOG.info(_LI("Message {} method {} received").format(
354                 msg.id, msg.method))
355             if self.conf.messaging_server.debug:
356                 LOG.debug(
357                     _LI("Message {} method {} context: {}, args: {}").format(
358                         msg.id, msg.method, msg.ctxt, msg.args))
359
360             failure = None
361             try:
362                 # Methods return an opaque dictionary
363                 result = method(msg.ctxt, msg.args)
364
365                 # FIXME(jdandrea): Remove response/error and make it opaque.
366                 # That means this would just be assigned result outright.
367                 msg.response = result.get('response', result)
368             except Exception:
369                 # Current sys.exc_info() content can be overridden
370                 # by another exception raised by a log handler during
371                 # LOG.exception(). So keep a copy and delete it later.
372                 failure = sys.exc_info()
373
374                 # Do not log details about the failure here. It will
375                 # be returned later upstream.
376                 LOG.exception(_LE('Exception during message handling'))
377
378             try:
379                 if failure is None:
380                     msg.status = message.Message.COMPLETED
381                 else:
382                     msg.failure = \
383                         rpc_common.serialize_remote_exception(failure)
384                     msg.status = message.Message.ERROR
385                 LOG.info(_LI("Message {} method {}, status: {}").format(
386                     msg.id, msg.method, msg.status))
387                 if self.conf.messaging_server.debug:
388                     LOG.debug("Message {} method {}, response: {}".format(
389                         msg.id, msg.method, msg.response))
390                 msg.update()
391             except Exception:
392                 LOG.exception(_LE("Can not send reply for message {} "
393                                   "method {}").
394                               format(msg.id, msg.method))
395             finally:
396                 # Remove circular object reference between the current
397                 # stack frame and the traceback in exc_info.
398                 del failure
399
400     def _gracefully_stop(self):
401         """Gracefully stop working on things"""
402         pass
403
404     def _restart(self):
405         """Prepare to restart the RPC Server"""
406         pass
407
408     def run(self):
409         """Run"""
410         # The server listens for messages and calls the
411         # appropriate methods. It also deletes messages once
412         # processed.
413         if self.conf.messaging_server.debug:
414             LOG.debug("%s" % self.__class__.__name__)
415
416         # Listen for messages within a thread
417         executor = futurist.ThreadPoolExecutor()
418         while self.running:
419             fut = executor.submit(self.__check_for_messages)
420             fut.result()
421         executor.shutdown()
422
423     def terminate(self):
424         """Terminate"""
425         if self.conf.messaging_server.debug:
426             LOG.debug("%s" % self.__class__.__name__)
427         self.running = False
428         self._gracefully_stop()
429         super(RPCService, self).terminate()
430
431     def reload(self):
432         """Reload"""
433         if self.conf.messaging_server.debug:
434             LOG.debug("%s" % self.__class__.__name__)
435         self._restart()