Merge "Accommodate Changes for Music 2.4.x"
[optf/has.git] / conductor / conductor / solver / service.py
1 #
2 # -------------------------------------------------------------------------
3 #   Copyright (c) 2015-2017 AT&T Intellectual Property
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 # -------------------------------------------------------------------------
18 #
19
20 import cotyledon
21 import time
22 import socket
23 from oslo_config import cfg
24 from oslo_log import log
25
26 from conductor.common.models import plan
27 from conductor.common.music import api
28 from conductor.common.music import messaging as music_messaging
29 from conductor.common.music.model import base
30 from conductor.i18n import _LE, _LI
31 from conductor import messaging
32 from conductor import service
33 from conductor.solver.optimizer import optimizer
34 from conductor.solver.request import parser
35 from conductor.solver.utils import constraint_engine_interface as cei
36
37
38 # To use oslo.log in services:
39 #
40 # 0. Note that conductor.service.prepare_service() bootstraps this.
41 #    It's set up within conductor.cmd.SERVICENAME.
42 # 1. Add "from oslo_log import log"
43 # 2. Also add "LOG = log.getLogger(__name__)"
44 # 3. For i18n support, import appropriate shortcuts as well:
45 #    "from i18n import _, _LC, _LE, _LI, _LW  # noqa"
46 #    (that's for primary, critical, error, info, warning)
47 # 4. Use LOG.info, LOG.warning, LOG.error, LOG.critical, LOG.debug, e.g.:
48 #    "LOG.info(_LI("Something happened with {}").format(thingie))"
49 # 5. Do NOT put translation wrappers around any LOG.debug text.
50 # 6. Be liberal with logging, especially in the absence of unit tests!
51 # 7. Calls to print() are verboten within the service proper.
52 #    Logging can be redirected! (In a CLI-side script, print() is fine.)
53 #
54 # Usage: http://docs.openstack.org/developer/oslo.i18n/usage.html
55
56 LOG = log.getLogger(__name__)
57
58 # To use oslo.config in services:
59 #
60 # 0. Note that conductor.service.prepare_service() bootstraps this.
61 #    It's set up within conductor.cmd.SERVICENAME.
62 # 1. Add "from oslo_config import cfg"
63 # 2. Also add "CONF = cfg.CONF"
64 # 3. Set a list of locally used options (SOLVER_OPTS is fine).
65 #    Choose key names thoughtfully. Be technology-agnostic, avoid TLAs, etc.
66 # 4. Register, e.g. "CONF.register_opts(SOLVER_OPTS, group='solver')"
67 # 5. Add file reference to opts.py (may need to use itertools.chain())
68 # 6. Run tox -e genconfig to build a new config template.
69 # 7. If you want to load an entire config from a CLI you can do this:
70 #    "conf = service.prepare_service([], config_files=[CONFIG_FILE])"
71 # 8. You can even use oslo_config from a CLI and override values on the fly,
72 #    e.g., "CONF.set_override('hostnames', ['music2'], 'music_api')"
73 #    (leave the third arg out to use the DEFAULT group).
74 # 9. Loading a config from a CLI is optional. So long as all the options
75 #    have defaults (or you override them as needed), it should all work.
76 #
77 # Docs: http://docs.openstack.org/developer/oslo.config/
78
79 CONF = cfg.CONF
80
81 SOLVER_OPTS = [
82     cfg.IntOpt('workers',
83                default=1,
84                min=1,
85                help='Number of workers for solver service. '
86                     'Default value is 1.'),
87     cfg.IntOpt('solver_timeout',
88                default=480,
89                min=1,
90                help='The timeout value for solver service. '
91                     'Default value is 480 seconds.'),
92     cfg.BoolOpt('concurrent',
93                 default=False,
94                 help='Set to True when solver will run in active-active '
95                      'mode. When set to False, solver will restart any '
96                      'orphaned solving requests at startup.'),
97     cfg.IntOpt('timeout',
98                default=600,
99                min=1,
100                help='Timeout for detecting a VM is down, and other VMs can pick the plan up. '
101                     'This value should be larger than solver_timeout'
102                     'Default value is 10 minutes. (integer value)'),
103     cfg.IntOpt('max_solver_counter',
104                default=1,
105                min=1)
106 ]
107
108 CONF.register_opts(SOLVER_OPTS, group='solver')
109
110 # Pull in service opts. We use them here.
111 OPTS = service.OPTS
112 CONF.register_opts(OPTS)
113
114
115 class SolverServiceLauncher(object):
116     """Launcher for the solver service."""
117
118     def __init__(self, conf):
119         self.conf = conf
120
121         # Set up Music access.
122         self.music = api.API()
123         self.music.keyspace_create(keyspace=conf.keyspace)
124
125         # Dynamically create a plan class for the specified keyspace
126         self.Plan = base.create_dynamic_model(
127             keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan")
128
129         if not self.Plan:
130             raise
131
132     def run(self):
133         kwargs = {'plan_class': self.Plan}
134         svcmgr = cotyledon.ServiceManager()
135         svcmgr.add(SolverService,
136                    workers=self.conf.solver.workers,
137                    args=(self.conf,), kwargs=kwargs)
138         svcmgr.run()
139
140
141 class SolverService(cotyledon.Service):
142     """Solver service."""
143
144     # This will appear in 'ps xaf'
145     name = "Conductor Solver"
146
147     def __init__(self, worker_id, conf, **kwargs):
148         """Initializer"""
149         LOG.debug("%s" % self.__class__.__name__)
150         super(SolverService, self).__init__(worker_id)
151         self._init(conf, **kwargs)
152         self.running = True
153
154     def _init(self, conf, **kwargs):
155         """Set up the necessary ingredients."""
156         self.conf = conf
157         self.kwargs = kwargs
158
159         self.Plan = kwargs.get('plan_class')
160
161         # Set up the RPC service(s) we want to talk to.
162         self.data_service = self.setup_rpc(conf, "data")
163
164         # Set up the cei and optimizer
165         self.cei = cei.ConstraintEngineInterface(self.data_service)
166         # self.optimizer = optimizer.Optimizer(conf)
167
168         # Set up Music access.
169         self.music = api.API()
170
171         self.solver_owner_condition = {
172             "solver_owner": socket.gethostname()
173         }
174         self.translated_status_condition = {
175             "status": self.Plan.TRANSLATED
176         }
177         self.solving_status_condition = {
178             "status": self.Plan.SOLVING
179         }
180
181         if not self.conf.solver.concurrent:
182             self._reset_solving_status()
183
184     def _gracefully_stop(self):
185         """Gracefully stop working on things"""
186         pass
187
188     def current_time_seconds(self):
189         """Current time in milliseconds."""
190         return int(round(time.time()))
191
192     def _reset_solving_status(self):
193         """Reset plans being solved so they are solved again.
194
195         Use this only when the solver service is not running concurrently.
196         """
197         plans = self.Plan.query.all()
198         for the_plan in plans:
199             if the_plan.status == self.Plan.SOLVING:
200                 the_plan.status = self.Plan.TRANSLATED
201                 # Use only in active-passive mode, so don't have to be atomic
202                 the_plan.update()
203
204     def _restart(self):
205         """Prepare to restart the service"""
206         pass
207
208     def millisec_to_sec(self, millisec):
209         """Convert milliseconds to seconds"""
210         return millisec/1000
211
212     def setup_rpc(self, conf, topic):
213         """Set up the RPC Client"""
214         # TODO(jdandrea): Put this pattern inside music_messaging?
215         transport = messaging.get_transport(conf=conf)
216         target = music_messaging.Target(topic=topic)
217         client = music_messaging.RPCClient(conf=conf,
218                                            transport=transport,
219                                            target=target)
220         return client
221
222     def run(self):
223         """Run"""
224         LOG.debug("%s" % self.__class__.__name__)
225         # TODO(snarayanan): This is really meant to be a control loop
226         # As long as self.running is true, we process another request.
227         while self.running:
228             # Delay time (Seconds) for MUSIC requests.
229             time.sleep(self.conf.delay_time)
230             # plans = Plan.query().all()
231             # Find the first plan with a status of TRANSLATED.
232             # Change its status to SOLVING.
233             # Then, read the "translated" field as "template".
234             json_template = None
235             p = None
236             requests_to_solve = dict()
237             plans = self.Plan.query.all()
238             found_translated_template = False
239             for p in plans:
240                 if p.status == self.Plan.TRANSLATED:
241                     json_template = p.translation
242                     found_translated_template = True
243                     break
244                 elif p.status == self.Plan.SOLVING and \
245                                 (self.current_time_seconds() - self.millisec_to_sec(
246                                     p.updated)) > self.conf.solver.timeout:
247                     p.status = self.Plan.TRANSLATED
248                     p.update(condition=self.solving_status_condition)
249                     break
250             if found_translated_template and not json_template:
251                 message = _LE("Plan {} status is translated, yet "
252                               "the translation wasn't found").format(p.id)
253                 LOG.error(message)
254                 p.status = self.Plan.ERROR
255                 p.message = message
256                 p.update(condition=self.translated_status_condition)
257                 continue
258             elif found_translated_template and p and p.solver_counter >= self.conf.solver.max_solver_counter:
259                 message = _LE("Tried {} times. Plan {} is unable to solve") \
260                     .format(self.conf.solver.max_solver_counter, p.id)
261                 LOG.error(message)
262                 p.status = self.Plan.ERROR
263                 p.message = message
264                 p.update(condition=self.translated_status_condition)
265                 continue
266             elif not json_template:
267                 continue
268
269             p.status = self.Plan.SOLVING
270
271             p.solver_counter += 1
272             p.solver_owner = socket.gethostname()
273
274             _is_updated = p.update(condition=self.translated_status_condition)
275             # other VMs have updated the status and start solving the plan
276             if 'FAILURE' in _is_updated:
277                 continue
278
279             LOG.info(_LI("Plan {} with request id {} is solving by machine {}. Tried to solve it for {} times.").
280                      format(p.id, p.name, p.solver_owner, p.solver_counter))
281
282             _is_success = 'FAILURE | Could not acquire lock'
283
284             request = parser.Parser()
285             request.cei = self.cei
286             try:
287                 request.parse_template(json_template)
288                 request.assgin_constraints_to_demands()
289                 requests_to_solve[p.id] = request
290                 opt = optimizer.Optimizer(self.conf, _requests=requests_to_solve, _begin_time=self.millisec_to_sec(p.updated))
291                 solution = opt.get_solution()
292
293             except Exception as err:
294                 message = _LE("Plan {} status encountered a "
295                               "parsing error: {}").format(p.id, err.message)
296                 LOG.error(message)
297                 p.status = self.Plan.ERROR
298                 p.message = message
299                 while 'FAILURE | Could not acquire lock' in _is_success:
300                     _is_success = p.update(condition=self.solver_owner_condition)
301                 continue
302
303             recommendations = []
304             if not solution or not solution.decisions:
305                 if (int(round(time.time())) - self.millisec_to_sec(p.updated)) > self.conf.solver.solver_timeout:
306                     message = _LI("Plan {} is timed out, exceed the expected "
307                                   "time {} seconds").format(p.id, self.conf.solver.timeout)
308
309                 else:
310                     message = _LI("Plan {} search failed, no "
311                                   "recommendations found by machine {}").format(p.id, p.solver_owner)
312                 LOG.info(message)
313                 # Update the plan status
314                 p.status = self.Plan.NOT_FOUND
315                 p.message = message
316                 while 'FAILURE | Could not acquire lock' in _is_success:
317                     _is_success = p.update(condition=self.solver_owner_condition)
318             else:
319                 # Assemble recommendation result JSON
320                 for demand_name in solution.decisions:
321                     resource = solution.decisions[demand_name]
322                     is_rehome = "false" if resource.get("existing_placement") == 'true' else "true"
323                     location_id = "" if resource.get("cloud_region_version") == '2.5' else resource.get("location_id")
324
325                     rec = {
326                         # FIXME(shankar) A&AI must not be hardcoded here.
327                         # Also, account for more than one Inventory Provider.
328                         "inventory_provider": "aai",
329                         "service_resource_id":
330                             resource.get("service_resource_id"),
331                         "candidate": {
332                             "candidate_id": resource.get("candidate_id"),
333                             "inventory_type": resource.get("inventory_type"),
334                             "cloud_owner": resource.get("cloud_owner"),
335                             "location_type": resource.get("location_type"),
336                             "location_id": location_id,
337                             "is_rehome": is_rehome,
338                             "vim-id": resource.get("vim-id"),
339                         },
340                         "attributes": {
341                             "physical-location-id":
342                                 resource.get("physical_location_id"),
343                             "cloud_owner": resource.get("cloud_owner"),
344                             'aic_version': resource.get("cloud_region_version")},
345                     }
346                     if rec["candidate"]["inventory_type"] == "service":
347                         rec["attributes"]["host_id"] = resource.get("host_id")
348                         rec["candidate"]["host_id"] = resource.get("host_id")
349
350                     # TODO(snarayanan): Add total value to recommendations?
351                     # msg = "--- total value of decision = {}"
352                     # LOG.debug(msg.format(_best_path.total_value))
353                     # msg = "--- total cost of decision = {}"
354                     # LOG.debug(msg.format(_best_path.total_cost))
355
356                     recommendations.append({demand_name: rec})
357
358                 # Update the plan with the solution
359                 p.solution = {
360                     "recommendations": recommendations
361                 }
362                 p.status = self.Plan.SOLVED
363                 while 'FAILURE | Could not acquire lock' in _is_success:
364                     _is_success = p.update(condition=self.solver_owner_condition)
365             LOG.info(_LI("Plan {} search complete, solution with {} "
366                          "recommendations found by machine {}").
367                      format(p.id, len(recommendations), p.solver_owner))
368             LOG.debug("Plan {} detailed solution: {}".
369                       format(p.id, p.solution))
370             LOG.info("Plan name: {}".
371                       format(p.name))
372
373             # Check status, update plan with response, SOLVED or ERROR
374
375     def terminate(self):
376         """Terminate"""
377         LOG.debug("%s" % self.__class__.__name__)
378         self.running = False
379         self._gracefully_stop()
380         super(SolverService, self).terminate()
381
382     def reload(self):
383         """Reload"""
384         LOG.debug("%s" % self.__class__.__name__)
385         self._restart()