2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
23 import conductor.common.prometheus_metrics as PC
25 from conductor import messaging
26 # from conductor import __file__ as conductor_root
27 from conductor.common.music import messaging as music_messaging
28 from conductor.common.utils import conductor_logging_util as log_util
29 from conductor.data.plugins.inventory_provider import extensions as ip_ext
30 from conductor.data.plugins.service_controller import extensions as sc_ext
31 from conductor.data.plugins.vim_controller import extensions as vc_ext
32 from conductor.i18n import _LE, _LI, _LW
33 from oslo_config import cfg
34 from oslo_log import log
36 # from stevedore import driver
37 # from conductor.solver.resource import region
38 # from conductor.solver.resource import service
40 LOG = log.getLogger(__name__)
48 help='Number of workers for data service. '
49 'Default value is 1.'),
50 cfg.BoolOpt('concurrent',
52 help='Set to True when data will run in active-active '
53 'mode. When set to False, data will flush any abandoned '
54 'messages at startup.'),
55 cfg.FloatOpt('existing_placement_cost',
57 help='Default value is -8000, which is the diameter of the earth. '
58 'The distance cannot larger than this value'),
59 cfg.FloatOpt('cloud_candidate_cost',
61 cfg.FloatOpt('service_candidate_cost',
65 CONF.register_opts(DATA_OPTS, group='data')
68 class DataServiceLauncher(object):
69 """Listener for the data service."""
71 def __init__(self, conf):
76 # Initialize Prometheus metrics Endpoint
77 # Data service uses index 0
79 self.init_extension_managers(conf)
82 def init_extension_managers(self, conf):
83 """Initialize extension managers."""
84 self.ip_ext_manager = (
85 ip_ext.Manager(conf, 'conductor.inventory_provider.plugin'))
86 self.ip_ext_manager.initialize()
87 self.vc_ext_manager = (
88 vc_ext.Manager(conf, 'conductor.vim_controller.plugin'))
89 self.vc_ext_manager.initialize()
90 self.sc_ext_manager = (
91 sc_ext.Manager(conf, 'conductor.service_controller.plugin'))
92 self.sc_ext_manager.initialize()
95 transport = messaging.get_transport(self.conf)
98 target = music_messaging.Target(topic=topic)
99 endpoints = [DataEndpoint(self.ip_ext_manager,
101 self.sc_ext_manager), ]
102 flush = not self.conf.data.concurrent
103 kwargs = {'transport': transport,
105 'endpoints': endpoints,
107 svcmgr = cotyledon.ServiceManager()
108 svcmgr.add(music_messaging.RPCService,
109 workers=self.conf.data.workers,
110 args=(self.conf,), kwargs=kwargs)
114 class DataEndpoint(object):
115 def __init__(self, ip_ext_manager, vc_ext_manager, sc_ext_manager):
117 self.ip_ext_manager = ip_ext_manager
118 self.vc_ext_manager = vc_ext_manager
119 self.sc_ext_manager = sc_ext_manager
120 self.plugin_cache = {}
121 self.triage_data_trans = {
124 'translator_triage': []
127 def get_candidate_location(self, ctx, arg):
128 # candidates should have lat long info already
131 candidate = arg["candidate"]
132 lat = candidate.get('latitude', None)
133 lon = candidate.get('longitude', None)
135 location = (float(lat), float(lon))
138 return {'response': location, 'error': error}
140 def get_candidate_zone(self, ctx, arg):
141 candidate = arg["candidate"]
142 category = arg["category"]
146 if category == 'region':
147 zone = candidate['location_id']
148 elif category == 'complex':
149 zone = candidate['complex_name']
150 elif category == 'country':
151 zone = candidate['country']
156 LOG.error(_LE("Unresolvable zone category {}").format(category))
158 LOG.info(_LI("Candidate zone is {}").format(zone))
159 return {'response': zone, 'error': error}
161 def get_candidates_from_service(self, ctx, arg):
163 candidate_list = arg["candidate_list"]
164 constraint_name = arg["constraint_name"]
165 constraint_type = arg["constraint_type"]
166 controller = arg["controller"]
167 request = arg["request"]
168 request_type = arg["request_type"]
171 filtered_candidates = []
172 # call service and fetch candidates
173 # TODO(jdandrea): Get rid of the SDN-C reference (outside of plugin!)
174 if controller == "SDN-C":
175 service_model = request.get("service_model")
177 results = self.sc_ext_manager.map_method(
180 candidate_list=candidate_list,
181 constraint_name=constraint_name,
182 constraint_type=constraint_type,
183 request_type=request_type
186 if results and len(results) > 0:
187 filtered_candidates = results[0]
190 _LW("No candidates returned by service "
191 "controller: {}; may be a new service "
192 "instantiation.").format(controller))
194 LOG.error(_LE("Unknown service controller: {}").format(controller))
195 # if response from service controller is empty
196 if filtered_candidates is None:
197 if service_model == "ADIOD":
198 LOG.error("No capacity found from SDN-GC for candidates: "
199 "{}".format(candidate_list))
200 return {'response': [], 'error': error}
202 LOG.debug("Filtered candidates: {}".format(filtered_candidates))
203 candidate_list = [c for c in candidate_list
204 if c in filtered_candidates]
205 return {'response': candidate_list, 'error': error}
207 def get_candidate_discard_set(self, value, candidate_list, value_attrib):
213 if "all" in value_dict:
214 value_list = value_dict.get("all")
215 value_condition = "all"
216 elif "any" in value_dict:
217 value_list = value_dict.get("any")
218 value_condition = "any"
223 for candidate in candidate_list:
226 for value in value_list:
227 if candidate.get(value_attrib) == value:
228 c_any = True # include if any one is met
229 elif candidate.get(value_attrib) != value:
230 c_all = False # discard even if one is not met
231 if value_condition == 'any' and not c_any:
232 discard_set.add(candidate.get("candidate_id"))
233 elif value_condition == 'all' and not c_all:
234 discard_set.add(candidate.get("candidate_id"))
237 #(TODO:Larry) merge this function with the "get_candidate_discard_set"
238 def get_candidate_discard_set_by_cloud_region(self, value, candidate_list, value_attrib):
241 cloud_requests = value.get("cloud-requests")
242 service_requests = value.get("service-requests")
243 vfmodule_requests = value.get("vfmodule-requests")
245 for candidate in candidate_list:
246 if candidate.get("inventory_type") == "cloud" and \
247 (candidate.get(value_attrib) not in cloud_requests):
248 discard_set.add(candidate.get("candidate_id"))
250 elif candidate.get("inventory_type") == "service" and \
251 (candidate.get(value_attrib) not in service_requests):
252 discard_set.add(candidate.get("candidate_id"))
254 elif candidate.get("inventory_type") == "vfmodule" and \
255 (candidate.get(value_attrib) not in vfmodule_requests):
256 discard_set.add(candidate.get("candidate_id"))
261 def get_inventory_group_candidates(self, ctx, arg):
262 candidate_list = arg["candidate_list"]
263 resolved_candidate = arg["resolved_candidate"]
266 service_description = 'DHV_VVIG_PAIR'
267 results = self.ip_ext_manager.map_method(
268 'get_inventory_group_pairs',
269 service_description=service_description
271 if not results or len(results) < 1:
273 _LE("Empty inventory group response for service: {}").format(
274 service_description))
278 if not pairs or len(pairs) < 1:
280 _LE("No inventory group candidates found for service: {}, "
281 "inventory provider: {}").format(
282 service_description, self.ip_ext_manager.names()[0]))
286 "Inventory group pairs: {}, service: {}, "
287 "inventory provider: {}".format(
288 pairs, service_description,
289 self.ip_ext_manager.names()[0]))
291 if resolved_candidate.get("candidate_id") == pair[0]:
292 candidate_names.append(pair[1])
293 elif resolved_candidate.get("candidate_id") == pair[1]:
294 candidate_names.append(pair[0])
296 candidate_list = [c for c in candidate_list
297 if c["candidate_id"] in candidate_names]
299 _LI("Inventory group candidates: {}, service: {}, "
300 "inventory provider: {}").format(
301 candidate_list, service_description,
302 self.ip_ext_manager.names()[0]))
303 return {'response': candidate_list, 'error': error}
305 def get_candidates_by_attributes(self, ctx, arg):
306 candidate_list = arg["candidate_list"]
307 # demand_name = arg["demand_name"]
308 properties = arg["properties"]
311 attributes_to_evaluate = properties.get('evaluate')
312 for attrib, value in attributes_to_evaluate.items():
315 if attrib == 'network_roles':
316 role_candidates = dict()
321 if "all" in nrc_dict:
322 role_list = nrc_dict.get("all")
323 role_condition = "all"
324 elif "any" in nrc_dict:
325 role_list = nrc_dict.get("any")
326 role_condition = "any"
328 # if the role_list is empty do nothing
329 if not role_list or role_list == '':
331 _LE("No roles available, "
332 "inventory provider: {}").format(
333 self.ip_ext_manager.names()[0]))
335 for role in role_list:
336 # query inventory provider to check if
337 # the candidate is in role
338 results = self.ip_ext_manager.map_method(
339 'check_network_roles',
342 if not results or len(results) < 1:
344 _LE("Empty response from inventory "
345 "provider {} for network role {}").format(
346 self.ip_ext_manager.names()[0], role))
348 region_ids = results[0]
351 _LE("No candidates from inventory provider {} "
352 "for network role {}").format(
353 self.ip_ext_manager.names()[0], role))
356 "Network role candidates: {}, role: {},"
357 "inventory provider: {}".format(
359 self.ip_ext_manager.names()[0]))
360 role_candidates[role] = region_ids
362 # find candidates that meet conditions
363 for candidate in candidate_list:
364 # perform this check only for cloud candidates
365 if candidate["inventory_type"] != "cloud":
369 for role in role_list:
370 if role not in role_candidates:
373 rc = role_candidates.get(role)
374 if rc and candidate.get("candidate_id") not in rc:
376 # discard even if one role is not met
377 elif rc and candidate.get("candidate_id") in rc:
379 # include if any one role is met
380 if role_condition == 'any' and not c_any:
381 discard_set.add(candidate.get("candidate_id"))
382 elif role_condition == 'all' and not c_all:
383 discard_set.add(candidate.get("candidate_id"))
385 elif attrib == 'replication_role':
387 for candidate in candidate_list:
389 host_id = candidate.get("host_id")
391 results = self.ip_ext_manager.map_method(
392 'check_candidate_role',
396 if not results or len(results) < 1:
398 _LE("Empty response for replication roles {}").format(role))
399 discard_set.add(candidate.get("candidate_id"))
402 # compare results from A&AI with the value in attribute constraint
403 if value and results[0] != value.upper():
404 discard_set.add(candidate.get("candidate_id"))
406 elif attrib == 'complex':
408 self.get_candidate_discard_set(
410 candidate_list=candidate_list,
411 value_attrib="complex_name")
412 discard_set.update(v_discard_set)
413 elif attrib == "country":
415 self.get_candidate_discard_set(
417 candidate_list=candidate_list,
418 value_attrib="country")
419 discard_set.update(v_discard_set)
420 elif attrib == "state":
422 self.get_candidate_discard_set(
424 candidate_list=candidate_list,
425 value_attrib="state")
426 discard_set.update(v_discard_set)
427 elif attrib == "region":
429 self.get_candidate_discard_set(
431 candidate_list=candidate_list,
432 value_attrib="region")
433 discard_set.update(v_discard_set)
434 elif attrib == "cloud-region":
436 self.get_candidate_discard_set_by_cloud_region(
438 candidate_list=candidate_list,
439 value_attrib="location_id")
440 discard_set.update(v_discard_set)
442 # return candidates not in discard set
443 candidate_list[:] = [c for c in candidate_list
444 if c['candidate_id'] not in discard_set]
446 "Available candidates after attribute checks: {}, "
447 "inventory provider: {}".format(
448 candidate_list, self.ip_ext_manager.names()[0]))
449 return {'response': candidate_list, 'error': False}
451 def get_candidates_with_hpa(self, ctx, arg):
453 RPC for getting candidates flavor mapping for matching hpa
455 :param arg: contains input passed from client side for RPC call
456 :return: response candidate_list with matching label to flavor mapping
459 candidate_list = arg["candidate_list"]
462 directives = arg["directives"]
463 attr = directives[0].get("attributes")
464 label_name = attr[0].get("attribute_name")
465 flavorProperties = arg["flavorProperties"]
467 for i in range(len(candidate_list)):
468 # perform this check only for cloud candidates
469 if candidate_list[i]["inventory_type"] != "cloud":
472 # Check if flavor mapping for current label_name already
473 # exists. This is an invalid condition.
474 if candidate_list[i].get("directives") and attr[0].get(
475 "attribute_value") != "":
476 LOG.error(_LE("Flavor mapping for label name {} already"
477 "exists").format(label_name))
480 # RPC call to inventory provider for matching hpa capabilities
481 results = self.ip_ext_manager.map_method(
483 candidate=candidate_list[i],
484 features=flavorProperties
488 if results and len(results) > 0 and results[0] is not None:
489 LOG.debug("Find results {} and results length {}".format(results, len(results)))
490 flavor_info = results[0].get("flavor_map")
491 req_directives = results[0].get("directives")
492 LOG.debug("Get directives {}".format(req_directives))
497 _LW("No flavor mapping returned by "
498 "inventory provider: {} for candidate: {}").format(
499 self.ip_ext_manager.names()[0],
500 candidate_list[i].get("candidate_id")))
502 # Metrics to Prometheus
503 m_vim_id = candidate_list[i].get("vim-id")
505 discard_set.add(candidate_list[i].get("candidate_id"))
506 PC.HPA_CLOUD_REGION_UNSUCCESSFUL.labels('ONAP', 'N/A',
509 if not flavor_info.get("flavor-name"):
510 discard_set.add(candidate_list[i].get("candidate_id"))
511 PC.HPA_CLOUD_REGION_UNSUCCESSFUL.labels('ONAP', 'N/A',
514 if not candidate_list[i].get("flavor_map"):
515 candidate_list[i]["flavor_map"] = {}
516 # Create flavor mapping for label_name to flavor
517 flavor_name = flavor_info.get("flavor-name")
518 flavor_id = flavor_info.get("flavor-id")
519 candidate_list[i]["flavor_map"][label_name] = flavor_name
520 candidate_list[i]["flavor_map"]["flavorId"] = flavor_id
521 # Create directives if not exist already
522 if not candidate_list[i].get("all_directives"):
523 candidate_list[i]["all_directives"] = {}
524 candidate_list[i]["all_directives"]["directives"] = []
525 # Create flavor mapping and merge directives
526 self.merge_directives(candidate_list, i, id, type, directives, req_directives)
527 if not candidate_list[i].get("hpa_score"):
528 candidate_list[i]["hpa_score"] = 0
529 candidate_list[i]["hpa_score"] += flavor_info.get("score")
531 # Metrics to Prometheus
532 PC.HPA_CLOUD_REGION_SUCCESSFUL.labels('ONAP', 'N/A',
535 # return candidates not in discard set
536 candidate_list[:] = [c for c in candidate_list
537 if c['candidate_id'] not in discard_set]
539 "Candidates with matching hpa capabilities: {}, "
540 "inventory provider: {}").format(candidate_list,
541 self.ip_ext_manager.names()[0]))
542 return {'response': candidate_list, 'error': error}
544 def merge_directives(self, candidate_list, index, id, type, directives, feature_directives):
546 Merge the flavor_directives with other diectives listed under hpa capabilities in the policy
547 :param candidate_list: all candidates
548 :param index: index number
550 :param type: vfc type
551 :param directives: directives for each vfc
552 :param feature_directives: directives for hpa-features
555 directive= {"id": id,
558 flavor_id_attributes = {"attribute_name": "flavorId", "attribute_value": ""}
559 for ele in directives:
560 if "flavor_directives" in ele.get("type"):
562 ele.get("attributes").append(flavor_id_attributes)
567 LOG.error("No flavor directives found in {}".format(id))
568 for item in feature_directives:
569 if item and item not in directives:
570 directives.append(item)
571 directive["directives"] = directives
572 candidate_list[index]["all_directives"]["directives"].append(directive)
574 def get_candidates_with_vim_capacity(self, ctx, arg):
576 RPC for getting candidates with vim capacity
578 :param arg: contains input passed from client side for RPC call
579 :return: response candidate_list with with required vim capacity
582 candidate_list = arg["candidate_list"]
583 vim_request = arg["request"]
586 for candidate in candidate_list:
587 if candidate["inventory_type"] == "cloud":
588 vim_list.add(candidate['vim-id'])
590 vim_request['VIMs'] = list(vim_list)
591 vims_result = self.vc_ext_manager.map_method(
592 'check_vim_capacity',
596 if vims_result and len(vims_result) > 0 and vims_result[0] is not None:
597 vims_set = set(vims_result[0])
598 for candidate in candidate_list:
599 # perform this check only for cloud candidates
600 if candidate["inventory_type"] == "cloud":
601 if candidate['vim-id'] not in vims_set:
602 discard_set.add(candidate.get("candidate_id"))
604 # return candidates not in discard set
605 candidate_list[:] = [c for c in candidate_list
606 if c['candidate_id'] not in discard_set]
610 "Multicloud did not respond properly to request: {}".format(
614 "Candidates with with vim capacity: {}, vim controller: "
615 "{}").format(candidate_list, self.vc_ext_manager.names()[0]))
617 return {'response': candidate_list, 'error': error}
619 def resolve_demands(self, ctx, arg):
621 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
624 demands = arg.get('demands')
625 plan_info = arg.get('plan_info')
626 triage_translator_data = arg.get('triage_translator_data')
627 resolved_demands = None
628 results = self.ip_ext_manager.map_method(
630 demands, plan_info, triage_translator_data
632 if results and len(results) > 0:
633 resolved_demands = results[0]
634 if self.triage_data_trans['plan_id']== None :
635 self.triage_data_trans['plan_name'] = triage_translator_data['plan_name']
636 self.triage_data_trans['plan_id'] = triage_translator_data['plan_id']
637 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
638 elif (not self.triage_data_trans['plan_id'] == triage_translator_data['plan_id']) :
639 self.triage_data_trans = {
642 'translator_triage': []
644 self.triage_data_trans['plan_name'] = triage_translator_data['plan_name']
645 self.triage_data_trans['plan_id'] = triage_translator_data['plan_id']
646 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
648 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
652 return {'response': {'resolved_demands': resolved_demands,
653 'trans': self.triage_data_trans},
656 def resolve_location(self, ctx, arg):
658 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
661 resolved_location = None
663 host_name = arg.get('host_name')
664 clli_code = arg.get('clli_code')
667 results = self.ip_ext_manager.map_method(
668 'resolve_host_location',
673 results = self.ip_ext_manager.map_method(
674 'resolve_clli_location',
679 # unknown location response
680 LOG.error(_LE("Unknown location type from the input template."
681 "Expected location types are host_name"
684 if results and len(results) > 0:
685 resolved_location = results[0]
688 return {'response': {'resolved_location': resolved_location},
691 def call_reservation_operation(self, ctx, arg):
693 reserved_candidates = None
694 method = arg["method"]
695 candidate_list = arg["candidate_list"]
696 reservation_name = arg["reservation_name"]
697 reservation_type = arg["reservation_type"]
698 controller = arg["controller"]
699 request = arg["request"]
701 if controller == "SDN-C":
702 results = self.sc_ext_manager.map_method(
703 'call_reservation_operation',
705 candidate_list=candidate_list,
706 reservation_name=reservation_name,
707 reservation_type=reservation_type,
710 if results and len(results) > 0:
711 reserved_candidates = results[0]
713 LOG.error(_LE("Unknown service controller: {}").format(controller))
714 if reserved_candidates is None or not reserved_candidates:
717 _LW("Unable to {} for "
718 "candidate {}.").format(method, reserved_candidates))
719 return {'response': result,
722 LOG.debug("{} for the candidate: "
723 "{}".format(method, reserved_candidates))
724 return {'response': result,
727 # def do_something(self, ctx, arg):
728 # """RPC endpoint for data messages
730 # When another service sends a notification over the message
731 # bus, this method receives it.
733 # LOG.debug("Got a message!")
736 # 'note': 'do_something called!',
739 # return {'response': res, 'error': False}