2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
23 from oslo_config import cfg
24 from oslo_log import log
26 from conductor.common.models import plan
27 from conductor.common.music import api
28 from conductor.common.music import messaging as music_messaging
29 from conductor.common.music.model import base
30 from conductor.i18n import _LE, _LI
31 from conductor import messaging
32 from conductor import service
33 from conductor.solver.optimizer import optimizer
34 from conductor.solver.request import parser
35 from conductor.solver.utils import constraint_engine_interface as cei
38 # To use oslo.log in services:
40 # 0. Note that conductor.service.prepare_service() bootstraps this.
41 # It's set up within conductor.cmd.SERVICENAME.
42 # 1. Add "from oslo_log import log"
43 # 2. Also add "LOG = log.getLogger(__name__)"
44 # 3. For i18n support, import appropriate shortcuts as well:
45 # "from i18n import _, _LC, _LE, _LI, _LW # noqa"
46 # (that's for primary, critical, error, info, warning)
47 # 4. Use LOG.info, LOG.warning, LOG.error, LOG.critical, LOG.debug, e.g.:
48 # "LOG.info(_LI("Something happened with {}").format(thingie))"
49 # 5. Do NOT put translation wrappers around any LOG.debug text.
50 # 6. Be liberal with logging, especially in the absence of unit tests!
51 # 7. Calls to print() are verboten within the service proper.
52 # Logging can be redirected! (In a CLI-side script, print() is fine.)
54 # Usage: http://docs.openstack.org/developer/oslo.i18n/usage.html
56 LOG = log.getLogger(__name__)
58 # To use oslo.config in services:
60 # 0. Note that conductor.service.prepare_service() bootstraps this.
61 # It's set up within conductor.cmd.SERVICENAME.
62 # 1. Add "from oslo_config import cfg"
63 # 2. Also add "CONF = cfg.CONF"
64 # 3. Set a list of locally used options (SOLVER_OPTS is fine).
65 # Choose key names thoughtfully. Be technology-agnostic, avoid TLAs, etc.
66 # 4. Register, e.g. "CONF.register_opts(SOLVER_OPTS, group='solver')"
67 # 5. Add file reference to opts.py (may need to use itertools.chain())
68 # 6. Run tox -e genconfig to build a new config template.
69 # 7. If you want to load an entire config from a CLI you can do this:
70 # "conf = service.prepare_service([], config_files=[CONFIG_FILE])"
71 # 8. You can even use oslo_config from a CLI and override values on the fly,
72 # e.g., "CONF.set_override('hostnames', ['music2'], 'music_api')"
73 # (leave the third arg out to use the DEFAULT group).
74 # 9. Loading a config from a CLI is optional. So long as all the options
75 # have defaults (or you override them as needed), it should all work.
77 # Docs: http://docs.openstack.org/developer/oslo.config/
85 help='Number of workers for solver service. '
86 'Default value is 1.'),
87 cfg.IntOpt('solver_timeout',
90 help='The timeout value for solver service. '
91 'Default value is 480 seconds.'),
92 cfg.BoolOpt('concurrent',
94 help='Set to True when solver will run in active-active '
95 'mode. When set to False, solver will restart any '
96 'orphaned solving requests at startup.'),
100 help='Timeout for detecting a VM is down, and other VMs can pick the plan up. '
101 'This value should be larger than solver_timeout'
102 'Default value is 10 minutes. (integer value)'),
103 cfg.IntOpt('max_solver_counter',
108 CONF.register_opts(SOLVER_OPTS, group='solver')
110 # Pull in service opts. We use them here.
112 CONF.register_opts(OPTS)
115 class SolverServiceLauncher(object):
116 """Launcher for the solver service."""
118 def __init__(self, conf):
121 # Set up Music access.
122 self.music = api.API()
123 self.music.keyspace_create(keyspace=conf.keyspace)
125 # Dynamically create a plan class for the specified keyspace
126 self.Plan = base.create_dynamic_model(
127 keyspace=conf.keyspace, baseclass=plan.Plan, classname="Plan")
133 kwargs = {'plan_class': self.Plan}
134 svcmgr = cotyledon.ServiceManager()
135 svcmgr.add(SolverService,
136 workers=self.conf.solver.workers,
137 args=(self.conf,), kwargs=kwargs)
141 class SolverService(cotyledon.Service):
142 """Solver service."""
144 # This will appear in 'ps xaf'
145 name = "Conductor Solver"
147 def __init__(self, worker_id, conf, **kwargs):
149 LOG.debug("%s" % self.__class__.__name__)
150 super(SolverService, self).__init__(worker_id)
151 self._init(conf, **kwargs)
154 def _init(self, conf, **kwargs):
155 """Set up the necessary ingredients."""
159 self.Plan = kwargs.get('plan_class')
161 # Set up the RPC service(s) we want to talk to.
162 self.data_service = self.setup_rpc(conf, "data")
164 # Set up the cei and optimizer
165 self.cei = cei.ConstraintEngineInterface(self.data_service)
166 # self.optimizer = optimizer.Optimizer(conf)
168 # Set up Music access.
169 self.music = api.API()
171 self.solver_owner_condition = {
172 "solver_owner": socket.gethostname()
174 self.translated_status_condition = {
175 "status": self.Plan.TRANSLATED
177 self.solving_status_condition = {
178 "status": self.Plan.SOLVING
181 if not self.conf.solver.concurrent:
182 self._reset_solving_status()
184 def _gracefully_stop(self):
185 """Gracefully stop working on things"""
188 def current_time_seconds(self):
189 """Current time in milliseconds."""
190 return int(round(time.time()))
192 def _reset_solving_status(self):
193 """Reset plans being solved so they are solved again.
195 Use this only when the solver service is not running concurrently.
197 plans = self.Plan.query.all()
198 for the_plan in plans:
199 if the_plan.status == self.Plan.SOLVING:
200 the_plan.status = self.Plan.TRANSLATED
201 # Use only in active-passive mode, so don't have to be atomic
205 """Prepare to restart the service"""
208 def millisec_to_sec(self, millisec):
209 """Convert milliseconds to seconds"""
212 def setup_rpc(self, conf, topic):
213 """Set up the RPC Client"""
214 # TODO(jdandrea): Put this pattern inside music_messaging?
215 transport = messaging.get_transport(conf=conf)
216 target = music_messaging.Target(topic=topic)
217 client = music_messaging.RPCClient(conf=conf,
224 LOG.debug("%s" % self.__class__.__name__)
225 # TODO(snarayanan): This is really meant to be a control loop
226 # As long as self.running is true, we process another request.
228 # Delay time (Seconds) for MUSIC requests.
229 time.sleep(self.conf.delay_time)
230 # plans = Plan.query().all()
231 # Find the first plan with a status of TRANSLATED.
232 # Change its status to SOLVING.
233 # Then, read the "translated" field as "template".
236 requests_to_solve = dict()
237 plans = self.Plan.query.all()
238 found_translated_template = False
240 if p.status == self.Plan.TRANSLATED:
241 json_template = p.translation
242 found_translated_template = True
244 elif p.status == self.Plan.SOLVING and \
245 (self.current_time_seconds() - self.millisec_to_sec(
246 p.updated)) > self.conf.solver.timeout:
247 p.status = self.Plan.TRANSLATED
248 p.update(condition=self.solving_status_condition)
250 if found_translated_template and not json_template:
251 message = _LE("Plan {} status is translated, yet "
252 "the translation wasn't found").format(p.id)
254 p.status = self.Plan.ERROR
256 p.update(condition=self.translated_status_condition)
258 elif found_translated_template and p and p.solver_counter >= self.conf.solver.max_solver_counter:
259 message = _LE("Tried {} times. Plan {} is unable to solve") \
260 .format(self.conf.solver.max_solver_counter, p.id)
262 p.status = self.Plan.ERROR
264 p.update(condition=self.translated_status_condition)
266 elif not json_template:
269 p.status = self.Plan.SOLVING
271 p.solver_counter += 1
272 p.solver_owner = socket.gethostname()
274 _is_updated = p.update(condition=self.translated_status_condition)
275 # other VMs have updated the status and start solving the plan
276 if 'FAILURE' in _is_updated:
279 LOG.info(_LI("Plan {} with request id {} is solving by machine {}. Tried to solve it for {} times.").
280 format(p.id, p.name, p.solver_owner, p.solver_counter))
282 _is_success = 'FAILURE | Could not acquire lock'
284 request = parser.Parser()
285 request.cei = self.cei
287 request.parse_template(json_template)
288 request.assgin_constraints_to_demands()
289 requests_to_solve[p.id] = request
290 opt = optimizer.Optimizer(self.conf, _requests=requests_to_solve, _begin_time=self.millisec_to_sec(p.updated))
291 solution = opt.get_solution()
293 except Exception as err:
294 message = _LE("Plan {} status encountered a "
295 "parsing error: {}").format(p.id, err.message)
297 p.status = self.Plan.ERROR
299 while 'FAILURE | Could not acquire lock' in _is_success:
300 _is_success = p.update(condition=self.solver_owner_condition)
304 if not solution or not solution.decisions:
305 if (int(round(time.time())) - self.millisec_to_sec(p.updated)) > self.conf.solver.solver_timeout:
306 message = _LI("Plan {} is timed out, exceed the expected "
307 "time {} seconds").format(p.id, self.conf.solver.timeout)
310 message = _LI("Plan {} search failed, no "
311 "recommendations found by machine {}").format(p.id, p.solver_owner)
313 # Update the plan status
314 p.status = self.Plan.NOT_FOUND
316 while 'FAILURE | Could not acquire lock' in _is_success:
317 _is_success = p.update(condition=self.solver_owner_condition)
319 # Assemble recommendation result JSON
320 for demand_name in solution.decisions:
321 resource = solution.decisions[demand_name]
322 is_rehome = "false" if resource.get("existing_placement") == 'true' else "true"
323 location_id = "" if resource.get("cloud_region_version") == '2.5' else resource.get("location_id")
326 # FIXME(shankar) A&AI must not be hardcoded here.
327 # Also, account for more than one Inventory Provider.
328 "inventory_provider": "aai",
329 "service_resource_id":
330 resource.get("service_resource_id"),
332 "candidate_id": resource.get("candidate_id"),
333 "inventory_type": resource.get("inventory_type"),
334 "cloud_owner": resource.get("cloud_owner"),
335 "location_type": resource.get("location_type"),
336 "location_id": location_id,
337 "is_rehome": is_rehome,
338 "vim-id": resource.get("vim-id"),
341 "physical-location-id":
342 resource.get("physical_location_id"),
343 "cloud_owner": resource.get("cloud_owner"),
344 'aic_version': resource.get("cloud_region_version")},
346 if rec["candidate"]["inventory_type"] == "service":
347 rec["attributes"]["host_id"] = resource.get("host_id")
348 rec["candidate"]["host_id"] = resource.get("host_id")
350 # TODO(snarayanan): Add total value to recommendations?
351 # msg = "--- total value of decision = {}"
352 # LOG.debug(msg.format(_best_path.total_value))
353 # msg = "--- total cost of decision = {}"
354 # LOG.debug(msg.format(_best_path.total_cost))
356 recommendations.append({demand_name: rec})
358 # Update the plan with the solution
360 "recommendations": recommendations
362 p.status = self.Plan.SOLVED
363 while 'FAILURE | Could not acquire lock' in _is_success:
364 _is_success = p.update(condition=self.solver_owner_condition)
365 LOG.info(_LI("Plan {} search complete, solution with {} "
366 "recommendations found by machine {}").
367 format(p.id, len(recommendations), p.solver_owner))
368 LOG.debug("Plan {} detailed solution: {}".
369 format(p.id, p.solution))
370 LOG.info("Plan name: {}".
373 # Check status, update plan with response, SOLVED or ERROR
377 LOG.debug("%s" % self.__class__.__name__)
379 self._gracefully_stop()
380 super(SolverService, self).terminate()
384 LOG.debug("%s" % self.__class__.__name__)