2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
24 from conductor import messaging
25 # from conductor import __file__ as conductor_root
26 from conductor.common.music import messaging as music_messaging
27 from conductor.common.utils import conductor_logging_util as log_util
28 from conductor.data.plugins.inventory_provider import extensions as ip_ext
29 from conductor.data.plugins.service_controller import extensions as sc_ext
30 from conductor.data.plugins.vim_controller import extensions as vc_ext
31 from conductor.i18n import _LE, _LI, _LW
32 from oslo_config import cfg
33 from oslo_log import log
35 # from stevedore import driver
36 # from conductor.solver.resource import region
37 # from conductor.solver.resource import service
39 LOG = log.getLogger(__name__)
47 help='Number of workers for data service. '
48 'Default value is 1.'),
49 cfg.BoolOpt('concurrent',
51 help='Set to True when data will run in active-active '
52 'mode. When set to False, data will flush any abandoned '
53 'messages at startup.'),
54 cfg.FloatOpt('existing_placement_cost',
56 help='Default value is -8000, which is the diameter of the earth. '
57 'The distance cannot larger than this value'),
58 cfg.FloatOpt('cloud_candidate_cost',
60 cfg.FloatOpt('service_candidate_cost',
64 CONF.register_opts(DATA_OPTS, group='data')
67 class DataServiceLauncher(object):
68 """Listener for the data service."""
70 def __init__(self, conf):
74 self.init_extension_managers(conf)
77 def init_extension_managers(self, conf):
78 """Initialize extension managers."""
79 self.ip_ext_manager = (
80 ip_ext.Manager(conf, 'conductor.inventory_provider.plugin'))
81 self.ip_ext_manager.initialize()
82 self.vc_ext_manager = (
83 vc_ext.Manager(conf, 'conductor.vim_controller.plugin'))
84 self.vc_ext_manager.initialize()
85 self.sc_ext_manager = (
86 sc_ext.Manager(conf, 'conductor.service_controller.plugin'))
87 self.sc_ext_manager.initialize()
90 transport = messaging.get_transport(self.conf)
93 target = music_messaging.Target(topic=topic)
94 endpoints = [DataEndpoint(self.ip_ext_manager,
96 self.sc_ext_manager), ]
97 flush = not self.conf.data.concurrent
98 kwargs = {'transport': transport,
100 'endpoints': endpoints,
102 svcmgr = cotyledon.ServiceManager()
103 svcmgr.add(music_messaging.RPCService,
104 workers=self.conf.data.workers,
105 args=(self.conf,), kwargs=kwargs)
109 class DataEndpoint(object):
110 def __init__(self, ip_ext_manager, vc_ext_manager, sc_ext_manager):
112 self.ip_ext_manager = ip_ext_manager
113 self.vc_ext_manager = vc_ext_manager
114 self.sc_ext_manager = sc_ext_manager
115 self.plugin_cache = {}
116 self.triage_data_trans = {
119 'translator_triage': []
122 def get_candidate_location(self, ctx, arg):
123 # candidates should have lat long info already
126 candidate = arg["candidate"]
127 lat = candidate.get('latitude', None)
128 lon = candidate.get('longitude', None)
130 location = (float(lat), float(lon))
133 return {'response': location, 'error': error}
135 def get_candidate_zone(self, ctx, arg):
136 candidate = arg["candidate"]
137 category = arg["category"]
141 if category == 'region':
142 zone = candidate['location_id']
143 elif category == 'complex':
144 zone = candidate['complex_name']
145 elif category == 'country':
146 zone = candidate['country']
151 LOG.error(_LE("Unresolvable zone category {}").format(category))
153 LOG.info(_LI("Candidate zone is {}").format(zone))
154 return {'response': zone, 'error': error}
156 def get_candidates_from_service(self, ctx, arg):
158 candidate_list = arg["candidate_list"]
159 constraint_name = arg["constraint_name"]
160 constraint_type = arg["constraint_type"]
161 controller = arg["controller"]
162 request = arg["request"]
163 request_type = arg["request_type"]
166 filtered_candidates = []
167 # call service and fetch candidates
168 # TODO(jdandrea): Get rid of the SDN-C reference (outside of plugin!)
169 if controller == "SDN-C":
170 service_model = request.get("service_model")
172 results = self.sc_ext_manager.map_method(
175 candidate_list=candidate_list,
176 constraint_name=constraint_name,
177 constraint_type=constraint_type,
178 request_type=request_type
181 if results and len(results) > 0:
182 filtered_candidates = results[0]
185 _LW("No candidates returned by service "
186 "controller: {}; may be a new service "
187 "instantiation.").format(controller))
189 LOG.error(_LE("Unknown service controller: {}").format(controller))
190 # if response from service controller is empty
191 if filtered_candidates is None:
192 if service_model == "ADIOD":
193 LOG.error("No capacity found from SDN-GC for candidates: "
194 "{}".format(candidate_list))
195 return {'response': [], 'error': error}
197 LOG.debug("Filtered candidates: {}".format(filtered_candidates))
198 candidate_list = [c for c in candidate_list
199 if c in filtered_candidates]
200 return {'response': candidate_list, 'error': error}
202 def get_candidate_discard_set(self, value, candidate_list, value_attrib):
208 if "all" in value_dict:
209 value_list = value_dict.get("all")
210 value_condition = "all"
211 elif "any" in value_dict:
212 value_list = value_dict.get("any")
213 value_condition = "any"
218 for candidate in candidate_list:
221 for value in value_list:
222 if candidate.get(value_attrib) == value:
223 c_any = True # include if any one is met
224 elif candidate.get(value_attrib) != value:
225 c_all = False # discard even if one is not met
226 if value_condition == 'any' and not c_any:
227 discard_set.add(candidate.get("candidate_id"))
228 elif value_condition == 'all' and not c_all:
229 discard_set.add(candidate.get("candidate_id"))
232 #(TODO:Larry) merge this function with the "get_candidate_discard_set"
233 def get_candidate_discard_set_by_cloud_region(self, value, candidate_list, value_attrib):
236 cloud_requests = value.get("cloud-requests")
237 service_requests = value.get("service-requests")
239 for candidate in candidate_list:
240 if candidate.get("inventory_type") == "cloud" and \
241 (candidate.get(value_attrib) not in cloud_requests):
242 discard_set.add(candidate.get("candidate_id"))
244 elif candidate.get("inventory_type") == "service" and \
245 (candidate.get(value_attrib) not in service_requests):
246 discard_set.add(candidate.get("candidate_id"))
252 def get_inventory_group_candidates(self, ctx, arg):
253 candidate_list = arg["candidate_list"]
254 resolved_candidate = arg["resolved_candidate"]
257 service_description = 'DHV_VVIG_PAIR'
258 results = self.ip_ext_manager.map_method(
259 'get_inventory_group_pairs',
260 service_description=service_description
262 if not results or len(results) < 1:
264 _LE("Empty inventory group response for service: {}").format(
265 service_description))
269 if not pairs or len(pairs) < 1:
271 _LE("No inventory group candidates found for service: {}, "
272 "inventory provider: {}").format(
273 service_description, self.ip_ext_manager.names()[0]))
277 "Inventory group pairs: {}, service: {}, "
278 "inventory provider: {}".format(
279 pairs, service_description,
280 self.ip_ext_manager.names()[0]))
282 if resolved_candidate.get("candidate_id") == pair[0]:
283 candidate_names.append(pair[1])
284 elif resolved_candidate.get("candidate_id") == pair[1]:
285 candidate_names.append(pair[0])
287 candidate_list = [c for c in candidate_list
288 if c["candidate_id"] in candidate_names]
290 _LI("Inventory group candidates: {}, service: {}, "
291 "inventory provider: {}").format(
292 candidate_list, service_description,
293 self.ip_ext_manager.names()[0]))
294 return {'response': candidate_list, 'error': error}
296 def get_candidates_by_attributes(self, ctx, arg):
297 candidate_list = arg["candidate_list"]
298 # demand_name = arg["demand_name"]
299 properties = arg["properties"]
302 attributes_to_evaluate = properties.get('evaluate')
303 for attrib, value in attributes_to_evaluate.items():
306 if attrib == 'network_roles':
307 role_candidates = dict()
312 if "all" in nrc_dict:
313 role_list = nrc_dict.get("all")
314 role_condition = "all"
315 elif "any" in nrc_dict:
316 role_list = nrc_dict.get("any")
317 role_condition = "any"
319 # if the role_list is empty do nothing
320 if not role_list or role_list == '':
322 _LE("No roles available, "
323 "inventory provider: {}").format(
324 self.ip_ext_manager.names()[0]))
326 for role in role_list:
327 # query inventory provider to check if
328 # the candidate is in role
329 results = self.ip_ext_manager.map_method(
330 'check_network_roles',
333 if not results or len(results) < 1:
335 _LE("Empty response from inventory "
336 "provider {} for network role {}").format(
337 self.ip_ext_manager.names()[0], role))
339 region_ids = results[0]
342 _LE("No candidates from inventory provider {} "
343 "for network role {}").format(
344 self.ip_ext_manager.names()[0], role))
347 "Network role candidates: {}, role: {},"
348 "inventory provider: {}".format(
350 self.ip_ext_manager.names()[0]))
351 role_candidates[role] = region_ids
353 # find candidates that meet conditions
354 for candidate in candidate_list:
355 # perform this check only for cloud candidates
356 if candidate["inventory_type"] != "cloud":
360 for role in role_list:
361 if role not in role_candidates:
364 rc = role_candidates.get(role)
365 if rc and candidate.get("candidate_id") not in rc:
367 # discard even if one role is not met
368 elif rc and candidate.get("candidate_id") in rc:
370 # include if any one role is met
371 if role_condition == 'any' and not c_any:
372 discard_set.add(candidate.get("candidate_id"))
373 elif role_condition == 'all' and not c_all:
374 discard_set.add(candidate.get("candidate_id"))
376 elif attrib == 'replication_role':
378 for candidate in candidate_list:
380 host_id = candidate.get("host_id")
382 results = self.ip_ext_manager.map_method(
383 'check_candidate_role',
387 if not results or len(results) < 1:
389 _LE("Empty response for replication roles {}").format(role))
390 discard_set.add(candidate.get("candidate_id"))
393 # compare results from A&AI with the value in attribute constraint
394 if value and results[0] != value.upper():
395 discard_set.add(candidate.get("candidate_id"))
397 elif attrib == 'complex':
399 self.get_candidate_discard_set(
401 candidate_list=candidate_list,
402 value_attrib="complex_name")
403 discard_set.update(v_discard_set)
404 elif attrib == "country":
406 self.get_candidate_discard_set(
408 candidate_list=candidate_list,
409 value_attrib="country")
410 discard_set.update(v_discard_set)
411 elif attrib == "state":
413 self.get_candidate_discard_set(
415 candidate_list=candidate_list,
416 value_attrib="state")
417 discard_set.update(v_discard_set)
418 elif attrib == "region":
420 self.get_candidate_discard_set(
422 candidate_list=candidate_list,
423 value_attrib="region")
424 discard_set.update(v_discard_set)
425 elif attrib == "cloud-region":
427 self.get_candidate_discard_set_by_cloud_region(
429 candidate_list=candidate_list,
430 value_attrib="location_id")
431 discard_set.update(v_discard_set)
433 # return candidates not in discard set
434 candidate_list[:] = [c for c in candidate_list
435 if c['candidate_id'] not in discard_set]
437 "Available candidates after attribute checks: {}, "
438 "inventory provider: {}".format(
439 candidate_list, self.ip_ext_manager.names()[0]))
440 return {'response': candidate_list, 'error': False}
442 def get_candidates_with_hpa(self, ctx, arg):
444 RPC for getting candidates flavor mapping for matching hpa
446 :param arg: contains input passed from client side for RPC call
447 :return: response candidate_list with matching label to flavor mapping
450 candidate_list = arg["candidate_list"]
453 directives = arg["directives"]
454 attr = directives[0].get("attributes")
455 label_name = attr[0].get("attribute_name")
456 flavorProperties = arg["flavorProperties"]
458 for i in range(len(candidate_list)):
459 # perform this check only for cloud candidates
460 if candidate_list[i]["inventory_type"] != "cloud":
463 # Check if flavor mapping for current label_name already
464 # exists. This is an invalid condition.
465 if candidate_list[i].get("directives") and attr[0].get(
466 "attribute_value") != "":
467 LOG.error(_LE("Flavor mapping for label name {} already"
468 "exists").format(label_name))
471 # RPC call to inventory provider for matching hpa capabilities
472 results = self.ip_ext_manager.map_method(
474 candidate=candidate_list[i],
475 features=flavorProperties
479 if results and len(results) > 0 and results[0] is not None:
480 LOG.debug("Find results {} and results length {}".format(results, len(results)))
481 flavor_info = results[0].get("flavor_map")
482 req_directives = results[0].get("directives")
483 LOG.debug("Get directives {}".format(req_directives))
488 _LW("No flavor mapping returned by "
489 "inventory provider: {} for candidate: {}").format(
490 self.ip_ext_manager.names()[0],
491 candidate_list[i].get("candidate_id")))
493 discard_set.add(candidate_list[i].get("candidate_id"))
495 if not flavor_info.get("flavor-name"):
496 discard_set.add(candidate_list[i].get("candidate_id"))
498 if not candidate_list[i].get("flavor_map"):
499 candidate_list[i]["flavor_map"] = {}
500 # Create flavor mapping for label_name to flavor
501 flavor_name = flavor_info.get("flavor-name")
502 candidate_list[i]["flavor_map"][label_name] = flavor_name
503 # Create directives if not exist already
504 if not candidate_list[i].get("all_directives"):
505 candidate_list[i]["all_directives"] = {}
506 candidate_list[i]["all_directives"]["directives"] = []
507 # Create flavor mapping and merge directives
508 self.merge_directives(candidate_list, i, id, type, directives, req_directives)
509 if not candidate_list[i].get("hpa_score"):
510 candidate_list[i]["hpa_score"] = 0
511 candidate_list[i]["hpa_score"] += flavor_info.get("score")
513 # return candidates not in discard set
514 candidate_list[:] = [c for c in candidate_list
515 if c['candidate_id'] not in discard_set]
517 "Candidates with matching hpa capabilities: {}, "
518 "inventory provider: {}").format(candidate_list,
519 self.ip_ext_manager.names()[0]))
520 return {'response': candidate_list, 'error': error}
522 def merge_directives(self, candidate_list, index, id, type, directives, feature_directives):
524 Merge the flavor_directives with other diectives listed under hpa capabilities in the policy
525 :param candidate_list: all candidates
526 :param index: index number
528 :param type: vfc type
529 :param directives: directives for each vfc
530 :param feature_directives: directives for hpa-features
533 directive= {"id": id,
536 for ele in directives:
537 if "flavor_directives" in ele.get("type"):
543 LOG.error("No flavor directives found in {}".format(id))
544 for item in feature_directives:
545 if item and item not in directives:
546 directives.append(item)
547 directive["directives"] = directives
548 candidate_list[index]["all_directives"]["directives"].append(directive)
550 def get_candidates_with_vim_capacity(self, ctx, arg):
552 RPC for getting candidates with vim capacity
554 :param arg: contains input passed from client side for RPC call
555 :return: response candidate_list with with required vim capacity
558 candidate_list = arg["candidate_list"]
559 vim_request = arg["request"]
562 for candidate in candidate_list:
563 if candidate["inventory_type"] == "cloud":
564 vim_list.add(candidate['vim-id'])
566 vim_request['VIMs'] = list(vim_list)
567 vims_result = self.vc_ext_manager.map_method(
568 'check_vim_capacity',
572 if vims_result and len(vims_result) > 0 and vims_result[0] is not None:
573 vims_set = set(vims_result[0])
574 for candidate in candidate_list:
575 # perform this check only for cloud candidates
576 if candidate["inventory_type"] == "cloud":
577 if candidate['vim-id'] not in vims_set:
578 discard_set.add(candidate.get("candidate_id"))
580 # return candidates not in discard set
581 candidate_list[:] = [c for c in candidate_list
582 if c['candidate_id'] not in discard_set]
586 "Multicloud did not respond properly to request: {}".format(
590 "Candidates with with vim capacity: {}, vim controller: "
591 "{}").format(candidate_list, self.vc_ext_manager.names()[0]))
593 return {'response': candidate_list, 'error': error}
595 def resolve_demands(self, ctx, arg):
597 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
600 demands = arg.get('demands')
601 plan_info = arg.get('plan_info')
602 triage_translator_data = arg.get('triage_translator_data')
603 resolved_demands = None
604 results = self.ip_ext_manager.map_method(
606 demands, plan_info, triage_translator_data
608 if results and len(results) > 0:
609 resolved_demands = results[0]
610 if self.triage_data_trans['plan_id']== None :
611 self.triage_data_trans['plan_name'] = triage_translator_data['plan_name']
612 self.triage_data_trans['plan_id'] = triage_translator_data['plan_id']
613 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
614 elif (not self.triage_data_trans['plan_id'] == triage_translator_data['plan_id']) :
615 self.triage_data_trans = {
618 'translator_triage': []
620 self.triage_data_trans['plan_name'] = triage_translator_data['plan_name']
621 self.triage_data_trans['plan_id'] = triage_translator_data['plan_id']
622 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
624 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
628 return {'response': {'resolved_demands': resolved_demands,
629 'trans': self.triage_data_trans},
632 def resolve_location(self, ctx, arg):
634 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
637 resolved_location = None
639 host_name = arg.get('host_name')
640 clli_code = arg.get('clli_code')
643 results = self.ip_ext_manager.map_method(
644 'resolve_host_location',
649 results = self.ip_ext_manager.map_method(
650 'resolve_clli_location',
654 # unknown location response
655 LOG.error(_LE("Unknown location type from the input template."
656 "Expected location types are host_name"
659 if results and len(results) > 0:
660 resolved_location = results[0]
663 return {'response': {'resolved_location': resolved_location},
666 def call_reservation_operation(self, ctx, arg):
668 reserved_candidates = None
669 method = arg["method"]
670 candidate_list = arg["candidate_list"]
671 reservation_name = arg["reservation_name"]
672 reservation_type = arg["reservation_type"]
673 controller = arg["controller"]
674 request = arg["request"]
676 if controller == "SDN-C":
677 results = self.sc_ext_manager.map_method(
678 'call_reservation_operation',
680 candidate_list=candidate_list,
681 reservation_name=reservation_name,
682 reservation_type=reservation_type,
685 if results and len(results) > 0:
686 reserved_candidates = results[0]
688 LOG.error(_LE("Unknown service controller: {}").format(controller))
689 if reserved_candidates is None or not reserved_candidates:
692 _LW("Unable to {} for "
693 "candidate {}.").format(method, reserved_candidates))
694 return {'response': result,
697 LOG.debug("{} for the candidate: "
698 "{}".format(method, reserved_candidates))
699 return {'response': result,
702 # def do_something(self, ctx, arg):
703 # """RPC endpoint for data messages
705 # When another service sends a notification over the message
706 # bus, this method receives it.
708 # LOG.debug("Got a message!")
711 # 'note': 'do_something called!',
714 # return {'response': res, 'error': False}