2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
24 from oslo_config import cfg
25 from oslo_log import log
26 # from stevedore import driver
28 # from conductor import __file__ as conductor_root
29 from conductor.common.music import messaging as music_messaging
30 from conductor.data.plugins.inventory_provider import extensions as ip_ext
31 from conductor.data.plugins.service_controller import extensions as sc_ext
32 from conductor.i18n import _LE, _LI, _LW
33 from conductor import messaging
34 from conductor.common.utils import conductor_logging_util as log_util
35 # from conductor.solver.resource import region
36 # from conductor.solver.resource import service
38 LOG = log.getLogger(__name__)
46 help='Number of workers for data service. '
47 'Default value is 1.'),
48 cfg.BoolOpt('concurrent',
50 help='Set to True when data will run in active-active '
51 'mode. When set to False, data will flush any abandoned '
52 'messages at startup.'),
53 cfg.FloatOpt('existing_placement_cost',
55 help='Default value is -8000, which is the diameter of the earth. '
56 'The distance cannot larger than this value'),
57 cfg.FloatOpt('cloud_candidate_cost',
59 cfg.FloatOpt('service_candidate_cost',
63 CONF.register_opts(DATA_OPTS, group='data')
66 class DataServiceLauncher(object):
67 """Listener for the data service."""
69 def __init__(self, conf):
73 self.init_extension_managers(conf)
75 def init_extension_managers(self, conf):
76 """Initialize extension managers."""
77 self.ip_ext_manager = (
78 ip_ext.Manager(conf, 'conductor.inventory_provider.plugin'))
79 self.ip_ext_manager.initialize()
80 self.sc_ext_manager = (
81 sc_ext.Manager(conf, 'conductor.service_controller.plugin'))
82 self.sc_ext_manager.initialize()
85 transport = messaging.get_transport(self.conf)
88 target = music_messaging.Target(topic=topic)
89 endpoints = [DataEndpoint(self.ip_ext_manager,
90 self.sc_ext_manager), ]
91 flush = not self.conf.data.concurrent
92 kwargs = {'transport': transport,
94 'endpoints': endpoints,
96 svcmgr = cotyledon.ServiceManager()
97 svcmgr.add(music_messaging.RPCService,
98 workers=self.conf.data.workers,
99 args=(self.conf,), kwargs=kwargs)
103 class DataEndpoint(object):
104 def __init__(self, ip_ext_manager, sc_ext_manager):
106 self.ip_ext_manager = ip_ext_manager
107 self.sc_ext_manager = sc_ext_manager
108 self.plugin_cache = {}
110 def get_candidate_location(self, ctx, arg):
111 # candidates should have lat long info already
114 candidate = arg["candidate"]
115 lat = candidate.get('latitude', None)
116 lon = candidate.get('longitude', None)
118 location = (float(lat), float(lon))
121 return {'response': location, 'error': error}
123 def get_candidate_zone(self, ctx, arg):
124 candidate = arg["candidate"]
125 category = arg["category"]
129 if category == 'region':
130 zone = candidate['location_id']
131 elif category == 'complex':
132 zone = candidate['complex_name']
133 elif category == 'country':
134 zone = candidate['country']
139 LOG.error(_LE("Unresolvable zone category {}").format(category))
141 LOG.info(_LI("Candidate zone is {}").format(zone))
142 return {'response': zone, 'error': error}
144 def get_candidates_from_service(self, ctx, arg):
146 candidate_list = arg["candidate_list"]
147 constraint_name = arg["constraint_name"]
148 constraint_type = arg["constraint_type"]
149 controller = arg["controller"]
150 request = arg["request"]
151 request_type = arg["request_type"]
154 filtered_candidates = []
155 # call service and fetch candidates
156 # TODO(jdandrea): Get rid of the SDN-C reference (outside of plugin!)
157 if controller == "SDN-C":
158 service_model = request.get("service_model")
160 results = self.sc_ext_manager.map_method(
163 candidate_list=candidate_list,
164 constraint_name=constraint_name,
165 constraint_type=constraint_type,
166 request_type=request_type
169 if results and len(results) > 0:
170 filtered_candidates = results[0]
173 _LW("No candidates returned by service "
174 "controller: {}; may be a new service "
175 "instantiation.").format(controller))
177 LOG.error(_LE("Unknown service controller: {}").format(controller))
178 # if response from service controller is empty
179 if filtered_candidates is None:
180 if service_model == "ADIOD":
181 LOG.error("No capacity found from SDN-GC for candidates: "
182 "{}".format(candidate_list))
183 return {'response': [], 'error': error}
185 LOG.debug("Filtered candidates: {}".format(filtered_candidates))
186 candidate_list = [c for c in candidate_list
187 if c in filtered_candidates]
188 return {'response': candidate_list, 'error': error}
190 def get_candidate_discard_set(self, value, candidate_list, value_attrib):
196 if "all" in value_dict:
197 value_list = value_dict.get("all")
198 value_condition = "all"
199 elif "any" in value_dict:
200 value_list = value_dict.get("any")
201 value_condition = "any"
206 for candidate in candidate_list:
209 for value in value_list:
210 if candidate.get(value_attrib) == value:
211 c_any = True # include if any one is met
212 elif candidate.get(value_attrib) != value:
213 c_all = False # discard even if one is not met
214 if value_condition == 'any' and not c_any:
215 discard_set.add(candidate.get("candidate_id"))
216 elif value_condition == 'all' and not c_all:
217 discard_set.add(candidate.get("candidate_id"))
220 #(TODO:Larry) merge this function with the "get_candidate_discard_set"
221 def get_candidate_discard_set_by_cloud_region(self, value, candidate_list, value_attrib):
224 cloud_requests = value.get("cloud-requests")
225 service_requests = value.get("service-requests")
227 for candidate in candidate_list:
228 if candidate.get("inventory_type") == "cloud" and \
229 (candidate.get(value_attrib) not in cloud_requests):
230 discard_set.add(candidate.get("candidate_id"))
232 elif candidate.get("inventory_type") == "service" and \
233 (candidate.get(value_attrib) not in service_requests):
234 discard_set.add(candidate.get("candidate_id"))
240 def get_inventory_group_candidates(self, ctx, arg):
241 candidate_list = arg["candidate_list"]
242 resolved_candidate = arg["resolved_candidate"]
245 service_description = 'DHV_VVIG_PAIR'
246 results = self.ip_ext_manager.map_method(
247 'get_inventory_group_pairs',
248 service_description=service_description
250 if not results or len(results) < 1:
252 _LE("Empty inventory group response for service: {}").format(
253 service_description))
257 if not pairs or len(pairs) < 1:
259 _LE("No inventory group candidates found for service: {}, "
260 "inventory provider: {}").format(
261 service_description, self.ip_ext_manager.names()[0]))
265 "Inventory group pairs: {}, service: {}, "
266 "inventory provider: {}".format(
267 pairs, service_description,
268 self.ip_ext_manager.names()[0]))
270 if resolved_candidate.get("candidate_id") == pair[0]:
271 candidate_names.append(pair[1])
272 elif resolved_candidate.get("candidate_id") == pair[1]:
273 candidate_names.append(pair[0])
275 candidate_list = [c for c in candidate_list
276 if c["candidate_id"] in candidate_names]
278 _LI("Inventory group candidates: {}, service: {}, "
279 "inventory provider: {}").format(
280 candidate_list, service_description,
281 self.ip_ext_manager.names()[0]))
282 return {'response': candidate_list, 'error': error}
284 def get_candidates_by_attributes(self, ctx, arg):
285 candidate_list = arg["candidate_list"]
286 # demand_name = arg["demand_name"]
287 properties = arg["properties"]
290 attributes_to_evaluate = properties.get('evaluate')
291 for attrib, value in attributes_to_evaluate.items():
294 if attrib == 'network_roles':
295 role_candidates = dict()
300 if "all" in nrc_dict:
301 role_list = nrc_dict.get("all")
302 role_condition = "all"
303 elif "any" in nrc_dict:
304 role_list = nrc_dict.get("any")
305 role_condition = "any"
307 # if the role_list is empty do nothing
308 if not role_list or role_list == '':
310 _LE("No roles available, "
311 "inventory provider: {}").format(
312 self.ip_ext_manager.names()[0]))
314 for role in role_list:
315 # query inventory provider to check if
316 # the candidate is in role
317 results = self.ip_ext_manager.map_method(
318 'check_network_roles',
321 if not results or len(results) < 1:
323 _LE("Empty response from inventory "
324 "provider {} for network role {}").format(
325 self.ip_ext_manager.names()[0], role))
327 region_ids = results[0]
330 _LE("No candidates from inventory provider {} "
331 "for network role {}").format(
332 self.ip_ext_manager.names()[0], role))
335 "Network role candidates: {}, role: {},"
336 "inventory provider: {}".format(
338 self.ip_ext_manager.names()[0]))
339 role_candidates[role] = region_ids
341 # find candidates that meet conditions
342 for candidate in candidate_list:
343 # perform this check only for cloud candidates
344 if candidate["inventory_type"] != "cloud":
348 for role in role_list:
349 if role not in role_candidates:
352 rc = role_candidates.get(role)
353 if rc and candidate.get("candidate_id") not in rc:
355 # discard even if one role is not met
356 elif rc and candidate.get("candidate_id") in rc:
358 # include if any one role is met
359 if role_condition == 'any' and not c_any:
360 discard_set.add(candidate.get("candidate_id"))
361 elif role_condition == 'all' and not c_all:
362 discard_set.add(candidate.get("candidate_id"))
364 elif attrib == 'replication_role':
366 for candidate in candidate_list:
368 host_id = candidate.get("host_id")
370 results = self.ip_ext_manager.map_method(
371 'check_candidate_role',
375 if not results or len(results) < 1:
377 _LE("Empty response for replication roles {}").format(role))
378 discard_set.add(candidate.get("candidate_id"))
381 # compare results from A&AI with the value in attribute constraint
382 if value and results[0] != value.upper():
383 discard_set.add(candidate.get("candidate_id"))
385 elif attrib == 'complex':
387 self.get_candidate_discard_set(
389 candidate_list=candidate_list,
390 value_attrib="complex_name")
391 discard_set.update(v_discard_set)
392 elif attrib == "country":
394 self.get_candidate_discard_set(
396 candidate_list=candidate_list,
397 value_attrib="country")
398 discard_set.update(v_discard_set)
399 elif attrib == "state":
401 self.get_candidate_discard_set(
403 candidate_list=candidate_list,
404 value_attrib="state")
405 discard_set.update(v_discard_set)
406 elif attrib == "region":
408 self.get_candidate_discard_set(
410 candidate_list=candidate_list,
411 value_attrib="region")
412 discard_set.update(v_discard_set)
413 elif attrib == "cloud-region":
415 self.get_candidate_discard_set_by_cloud_region(
417 candidate_list=candidate_list,
418 value_attrib="location_id")
419 discard_set.update(v_discard_set)
421 # return candidates not in discard set
422 candidate_list[:] = [c for c in candidate_list
423 if c['candidate_id'] not in discard_set]
425 "Available candidates after attribute checks: {}, "
426 "inventory provider: {}".format(
427 candidate_list, self.ip_ext_manager.names()[0]))
428 return {'response': candidate_list, 'error': False}
430 def resolve_demands(self, ctx, arg):
432 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
435 demands = arg.get('demands')
436 resolved_demands = None
437 results = self.ip_ext_manager.map_method(
441 if results and len(results) > 0:
442 resolved_demands = results[0]
446 return {'response': {'resolved_demands': resolved_demands},
449 def resolve_location(self, ctx, arg):
451 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
454 resolved_location = None
456 host_name = arg.get('host_name')
457 clli_code = arg.get('clli_code')
460 results = self.ip_ext_manager.map_method(
461 'resolve_host_location',
466 results = self.ip_ext_manager.map_method(
467 'resolve_clli_location',
471 # unknown location response
472 LOG.error(_LE("Unknown location type from the input template."
473 "Expected location types are host_name"
476 if results and len(results) > 0:
477 resolved_location = results[0]
480 return {'response': {'resolved_location': resolved_location},
483 def call_reservation_operation(self, ctx, arg):
485 reserved_candidates = None
486 method = arg["method"]
487 candidate_list = arg["candidate_list"]
488 reservation_name = arg["reservation_name"]
489 reservation_type = arg["reservation_type"]
490 controller = arg["controller"]
491 request = arg["request"]
493 if controller == "SDN-C":
494 results = self.sc_ext_manager.map_method(
495 'call_reservation_operation',
497 candidate_list=candidate_list,
498 reservation_name=reservation_name,
499 reservation_type=reservation_type,
502 if results and len(results) > 0:
503 reserved_candidates = results[0]
505 LOG.error(_LE("Unknown service controller: {}").format(controller))
506 if reserved_candidates is None or not reserved_candidates:
509 _LW("Unable to {} for "
510 "candidate {}.").format(method, reserved_candidates))
511 return {'response': result,
514 LOG.debug("{} for the candidate: "
515 "{}".format(method, reserved_candidates))
516 return {'response': result,
519 # def do_something(self, ctx, arg):
520 # """RPC endpoint for data messages
522 # When another service sends a notification over the message
523 # bus, this method receives it.
525 # LOG.debug("Got a message!")
528 # 'note': 'do_something called!',
531 # return {'response': res, 'error': False}