2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
23 import conductor.common.prometheus_metrics as PC
25 from conductor import messaging
26 # from conductor import __file__ as conductor_root
27 from conductor.common.music import messaging as music_messaging
28 from conductor.common.utils import conductor_logging_util as log_util
29 from conductor.data.plugins.inventory_provider import extensions as ip_ext
30 from conductor.data.plugins.service_controller import extensions as sc_ext
31 from conductor.data.plugins.vim_controller import extensions as vc_ext
32 from conductor.i18n import _LE, _LI, _LW
33 from oslo_config import cfg
34 from oslo_log import log
36 # from stevedore import driver
37 # from conductor.solver.resource import region
38 # from conductor.solver.resource import service
40 LOG = log.getLogger(__name__)
48 help='Number of workers for data service. '
49 'Default value is 1.'),
50 cfg.BoolOpt('concurrent',
52 help='Set to True when data will run in active-active '
53 'mode. When set to False, data will flush any abandoned '
54 'messages at startup.'),
55 cfg.FloatOpt('existing_placement_cost',
57 help='Default value is -8000, which is the diameter of the earth. '
58 'The distance cannot larger than this value'),
59 cfg.FloatOpt('cloud_candidate_cost',
61 cfg.FloatOpt('service_candidate_cost',
65 CONF.register_opts(DATA_OPTS, group='data')
68 class DataServiceLauncher(object):
69 """Listener for the data service."""
71 def __init__(self, conf):
76 # Initialize Prometheus metrics Endpoint
77 # Data service uses index 0
79 self.init_extension_managers(conf)
82 def init_extension_managers(self, conf):
83 """Initialize extension managers."""
84 self.ip_ext_manager = (
85 ip_ext.Manager(conf, 'conductor.inventory_provider.plugin'))
86 self.ip_ext_manager.initialize()
87 self.vc_ext_manager = (
88 vc_ext.Manager(conf, 'conductor.vim_controller.plugin'))
89 self.vc_ext_manager.initialize()
90 self.sc_ext_manager = (
91 sc_ext.Manager(conf, 'conductor.service_controller.plugin'))
92 self.sc_ext_manager.initialize()
95 transport = messaging.get_transport(self.conf)
98 target = music_messaging.Target(topic=topic)
99 endpoints = [DataEndpoint(self.ip_ext_manager,
101 self.sc_ext_manager), ]
102 flush = not self.conf.data.concurrent
103 kwargs = {'transport': transport,
105 'endpoints': endpoints,
107 svcmgr = cotyledon.ServiceManager()
108 svcmgr.add(music_messaging.RPCService,
109 workers=self.conf.data.workers,
110 args=(self.conf,), kwargs=kwargs)
114 class DataEndpoint(object):
115 def __init__(self, ip_ext_manager, vc_ext_manager, sc_ext_manager):
117 self.ip_ext_manager = ip_ext_manager
118 self.vc_ext_manager = vc_ext_manager
119 self.sc_ext_manager = sc_ext_manager
120 self.plugin_cache = {}
121 self.triage_data_trans = {
124 'translator_triage': []
127 def get_candidate_location(self, ctx, arg):
128 # candidates should have lat long info already
131 candidate = arg["candidate"]
132 lat = candidate.get('latitude', None)
133 lon = candidate.get('longitude', None)
135 location = (float(lat), float(lon))
138 return {'response': location, 'error': error}
140 def get_candidate_zone(self, ctx, arg):
141 candidate = arg["candidate"]
142 category = arg["category"]
146 if category == 'region':
147 zone = candidate['location_id']
148 elif category == 'complex':
149 zone = candidate['complex_name']
150 elif category == 'country':
151 zone = candidate['country']
156 LOG.error(_LE("Unresolvable zone category {}").format(category))
158 LOG.info(_LI("Candidate zone is {}").format(zone))
159 return {'response': zone, 'error': error}
161 def get_candidates_from_service(self, ctx, arg):
163 candidate_list = arg["candidate_list"]
164 constraint_name = arg["constraint_name"]
165 constraint_type = arg["constraint_type"]
166 controller = arg["controller"]
167 request = arg["request"]
168 request_type = arg["request_type"]
171 filtered_candidates = []
172 # call service and fetch candidates
173 # TODO(jdandrea): Get rid of the SDN-C reference (outside of plugin!)
174 if controller == "SDN-C":
175 service_model = request.get("service_model")
177 results = self.sc_ext_manager.map_method(
180 candidate_list=candidate_list,
181 constraint_name=constraint_name,
182 constraint_type=constraint_type,
183 request_type=request_type
186 if results and len(results) > 0:
187 filtered_candidates = results[0]
190 _LW("No candidates returned by service "
191 "controller: {}; may be a new service "
192 "instantiation.").format(controller))
194 LOG.error(_LE("Unknown service controller: {}").format(controller))
195 # if response from service controller is empty
196 if filtered_candidates is None:
197 if service_model == "ADIOD":
198 LOG.error("No capacity found from SDN-GC for candidates: "
199 "{}".format(candidate_list))
200 return {'response': [], 'error': error}
202 LOG.debug("Filtered candidates: {}".format(filtered_candidates))
203 candidate_list = [c for c in candidate_list
204 if c in filtered_candidates]
205 return {'response': candidate_list, 'error': error}
207 def get_candidate_discard_set(self, value, candidate_list, value_attrib):
213 if "all" in value_dict:
214 value_list = value_dict.get("all")
215 value_condition = "all"
216 elif "any" in value_dict:
217 value_list = value_dict.get("any")
218 value_condition = "any"
223 for candidate in candidate_list:
226 for value in value_list:
227 if candidate.get(value_attrib) == value:
228 c_any = True # include if any one is met
229 elif candidate.get(value_attrib) != value:
230 c_all = False # discard even if one is not met
231 if value_condition == 'any' and not c_any:
232 discard_set.add(candidate.get("candidate_id"))
233 elif value_condition == 'all' and not c_all:
234 discard_set.add(candidate.get("candidate_id"))
237 #(TODO:Larry) merge this function with the "get_candidate_discard_set"
238 def get_candidate_discard_set_by_cloud_region(self, value, candidate_list, value_attrib):
241 cloud_requests = value.get("cloud-requests")
242 service_requests = value.get("service-requests")
244 for candidate in candidate_list:
245 if candidate.get("inventory_type") == "cloud" and \
246 (candidate.get(value_attrib) not in cloud_requests):
247 discard_set.add(candidate.get("candidate_id"))
249 elif candidate.get("inventory_type") == "service" and \
250 (candidate.get(value_attrib) not in service_requests):
251 discard_set.add(candidate.get("candidate_id"))
257 def get_inventory_group_candidates(self, ctx, arg):
258 candidate_list = arg["candidate_list"]
259 resolved_candidate = arg["resolved_candidate"]
262 service_description = 'DHV_VVIG_PAIR'
263 results = self.ip_ext_manager.map_method(
264 'get_inventory_group_pairs',
265 service_description=service_description
267 if not results or len(results) < 1:
269 _LE("Empty inventory group response for service: {}").format(
270 service_description))
274 if not pairs or len(pairs) < 1:
276 _LE("No inventory group candidates found for service: {}, "
277 "inventory provider: {}").format(
278 service_description, self.ip_ext_manager.names()[0]))
282 "Inventory group pairs: {}, service: {}, "
283 "inventory provider: {}".format(
284 pairs, service_description,
285 self.ip_ext_manager.names()[0]))
287 if resolved_candidate.get("candidate_id") == pair[0]:
288 candidate_names.append(pair[1])
289 elif resolved_candidate.get("candidate_id") == pair[1]:
290 candidate_names.append(pair[0])
292 candidate_list = [c for c in candidate_list
293 if c["candidate_id"] in candidate_names]
295 _LI("Inventory group candidates: {}, service: {}, "
296 "inventory provider: {}").format(
297 candidate_list, service_description,
298 self.ip_ext_manager.names()[0]))
299 return {'response': candidate_list, 'error': error}
301 def get_candidates_by_attributes(self, ctx, arg):
302 candidate_list = arg["candidate_list"]
303 # demand_name = arg["demand_name"]
304 properties = arg["properties"]
307 attributes_to_evaluate = properties.get('evaluate')
308 for attrib, value in attributes_to_evaluate.items():
311 if attrib == 'network_roles':
312 role_candidates = dict()
317 if "all" in nrc_dict:
318 role_list = nrc_dict.get("all")
319 role_condition = "all"
320 elif "any" in nrc_dict:
321 role_list = nrc_dict.get("any")
322 role_condition = "any"
324 # if the role_list is empty do nothing
325 if not role_list or role_list == '':
327 _LE("No roles available, "
328 "inventory provider: {}").format(
329 self.ip_ext_manager.names()[0]))
331 for role in role_list:
332 # query inventory provider to check if
333 # the candidate is in role
334 results = self.ip_ext_manager.map_method(
335 'check_network_roles',
338 if not results or len(results) < 1:
340 _LE("Empty response from inventory "
341 "provider {} for network role {}").format(
342 self.ip_ext_manager.names()[0], role))
344 region_ids = results[0]
347 _LE("No candidates from inventory provider {} "
348 "for network role {}").format(
349 self.ip_ext_manager.names()[0], role))
352 "Network role candidates: {}, role: {},"
353 "inventory provider: {}".format(
355 self.ip_ext_manager.names()[0]))
356 role_candidates[role] = region_ids
358 # find candidates that meet conditions
359 for candidate in candidate_list:
360 # perform this check only for cloud candidates
361 if candidate["inventory_type"] != "cloud":
365 for role in role_list:
366 if role not in role_candidates:
369 rc = role_candidates.get(role)
370 if rc and candidate.get("candidate_id") not in rc:
372 # discard even if one role is not met
373 elif rc and candidate.get("candidate_id") in rc:
375 # include if any one role is met
376 if role_condition == 'any' and not c_any:
377 discard_set.add(candidate.get("candidate_id"))
378 elif role_condition == 'all' and not c_all:
379 discard_set.add(candidate.get("candidate_id"))
381 elif attrib == 'replication_role':
383 for candidate in candidate_list:
385 host_id = candidate.get("host_id")
387 results = self.ip_ext_manager.map_method(
388 'check_candidate_role',
392 if not results or len(results) < 1:
394 _LE("Empty response for replication roles {}").format(role))
395 discard_set.add(candidate.get("candidate_id"))
398 # compare results from A&AI with the value in attribute constraint
399 if value and results[0] != value.upper():
400 discard_set.add(candidate.get("candidate_id"))
402 elif attrib == 'complex':
404 self.get_candidate_discard_set(
406 candidate_list=candidate_list,
407 value_attrib="complex_name")
408 discard_set.update(v_discard_set)
409 elif attrib == "country":
411 self.get_candidate_discard_set(
413 candidate_list=candidate_list,
414 value_attrib="country")
415 discard_set.update(v_discard_set)
416 elif attrib == "state":
418 self.get_candidate_discard_set(
420 candidate_list=candidate_list,
421 value_attrib="state")
422 discard_set.update(v_discard_set)
423 elif attrib == "region":
425 self.get_candidate_discard_set(
427 candidate_list=candidate_list,
428 value_attrib="region")
429 discard_set.update(v_discard_set)
430 elif attrib == "cloud-region":
432 self.get_candidate_discard_set_by_cloud_region(
434 candidate_list=candidate_list,
435 value_attrib="location_id")
436 discard_set.update(v_discard_set)
438 # return candidates not in discard set
439 candidate_list[:] = [c for c in candidate_list
440 if c['candidate_id'] not in discard_set]
442 "Available candidates after attribute checks: {}, "
443 "inventory provider: {}".format(
444 candidate_list, self.ip_ext_manager.names()[0]))
445 return {'response': candidate_list, 'error': False}
447 def get_candidates_with_hpa(self, ctx, arg):
449 RPC for getting candidates flavor mapping for matching hpa
451 :param arg: contains input passed from client side for RPC call
452 :return: response candidate_list with matching label to flavor mapping
455 candidate_list = arg["candidate_list"]
458 directives = arg["directives"]
459 attr = directives[0].get("attributes")
460 label_name = attr[0].get("attribute_name")
461 flavorProperties = arg["flavorProperties"]
463 for i in range(len(candidate_list)):
464 # perform this check only for cloud candidates
465 if candidate_list[i]["inventory_type"] != "cloud":
468 # Check if flavor mapping for current label_name already
469 # exists. This is an invalid condition.
470 if candidate_list[i].get("directives") and attr[0].get(
471 "attribute_value") != "":
472 LOG.error(_LE("Flavor mapping for label name {} already"
473 "exists").format(label_name))
476 # RPC call to inventory provider for matching hpa capabilities
477 results = self.ip_ext_manager.map_method(
479 candidate=candidate_list[i],
480 features=flavorProperties
484 if results and len(results) > 0 and results[0] is not None:
485 LOG.debug("Find results {} and results length {}".format(results, len(results)))
486 flavor_info = results[0].get("flavor_map")
487 req_directives = results[0].get("directives")
488 LOG.debug("Get directives {}".format(req_directives))
493 _LW("No flavor mapping returned by "
494 "inventory provider: {} for candidate: {}").format(
495 self.ip_ext_manager.names()[0],
496 candidate_list[i].get("candidate_id")))
498 # Metrics to Prometheus
499 m_vim_id = candidate_list[i].get("vim-id")
501 discard_set.add(candidate_list[i].get("candidate_id"))
502 PC.HPA_CLOUD_REGION_UNSUCCESSFUL.labels('ONAP', 'N/A',
505 if not flavor_info.get("flavor-name"):
506 discard_set.add(candidate_list[i].get("candidate_id"))
507 PC.HPA_CLOUD_REGION_UNSUCCESSFUL.labels('ONAP', 'N/A',
510 if not candidate_list[i].get("flavor_map"):
511 candidate_list[i]["flavor_map"] = {}
512 # Create flavor mapping for label_name to flavor
513 flavor_name = flavor_info.get("flavor-name")
514 flavor_id = flavor_info.get("flavor-id")
515 candidate_list[i]["flavor_map"][label_name] = flavor_name
516 candidate_list[i]["flavor_map"]["flavorId"] = flavor_id
517 # Create directives if not exist already
518 if not candidate_list[i].get("all_directives"):
519 candidate_list[i]["all_directives"] = {}
520 candidate_list[i]["all_directives"]["directives"] = []
521 # Create flavor mapping and merge directives
522 self.merge_directives(candidate_list, i, id, type, directives, req_directives)
523 if not candidate_list[i].get("hpa_score"):
524 candidate_list[i]["hpa_score"] = 0
525 candidate_list[i]["hpa_score"] += flavor_info.get("score")
527 # Metrics to Prometheus
528 PC.HPA_CLOUD_REGION_SUCCESSFUL.labels('ONAP', 'N/A',
531 # return candidates not in discard set
532 candidate_list[:] = [c for c in candidate_list
533 if c['candidate_id'] not in discard_set]
535 "Candidates with matching hpa capabilities: {}, "
536 "inventory provider: {}").format(candidate_list,
537 self.ip_ext_manager.names()[0]))
538 return {'response': candidate_list, 'error': error}
540 def merge_directives(self, candidate_list, index, id, type, directives, feature_directives):
542 Merge the flavor_directives with other diectives listed under hpa capabilities in the policy
543 :param candidate_list: all candidates
544 :param index: index number
546 :param type: vfc type
547 :param directives: directives for each vfc
548 :param feature_directives: directives for hpa-features
551 directive= {"id": id,
554 flavor_id_attributes = {"attribute_name": "flavorId", "attribute_value": ""}
555 for ele in directives:
556 if "flavor_directives" in ele.get("type"):
558 ele.get("attributes").append(flavor_id_attributes)
563 LOG.error("No flavor directives found in {}".format(id))
564 for item in feature_directives:
565 if item and item not in directives:
566 directives.append(item)
567 directive["directives"] = directives
568 candidate_list[index]["all_directives"]["directives"].append(directive)
570 def get_candidates_with_vim_capacity(self, ctx, arg):
572 RPC for getting candidates with vim capacity
574 :param arg: contains input passed from client side for RPC call
575 :return: response candidate_list with with required vim capacity
578 candidate_list = arg["candidate_list"]
579 vim_request = arg["request"]
582 for candidate in candidate_list:
583 if candidate["inventory_type"] == "cloud":
584 vim_list.add(candidate['vim-id'])
586 vim_request['VIMs'] = list(vim_list)
587 vims_result = self.vc_ext_manager.map_method(
588 'check_vim_capacity',
592 if vims_result and len(vims_result) > 0 and vims_result[0] is not None:
593 vims_set = set(vims_result[0])
594 for candidate in candidate_list:
595 # perform this check only for cloud candidates
596 if candidate["inventory_type"] == "cloud":
597 if candidate['vim-id'] not in vims_set:
598 discard_set.add(candidate.get("candidate_id"))
600 # return candidates not in discard set
601 candidate_list[:] = [c for c in candidate_list
602 if c['candidate_id'] not in discard_set]
606 "Multicloud did not respond properly to request: {}".format(
610 "Candidates with with vim capacity: {}, vim controller: "
611 "{}").format(candidate_list, self.vc_ext_manager.names()[0]))
613 return {'response': candidate_list, 'error': error}
615 def resolve_demands(self, ctx, arg):
617 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
620 demands = arg.get('demands')
621 plan_info = arg.get('plan_info')
622 triage_translator_data = arg.get('triage_translator_data')
623 resolved_demands = None
624 results = self.ip_ext_manager.map_method(
626 demands, plan_info, triage_translator_data
628 if results and len(results) > 0:
629 resolved_demands = results[0]
630 if self.triage_data_trans['plan_id']== None :
631 self.triage_data_trans['plan_name'] = triage_translator_data['plan_name']
632 self.triage_data_trans['plan_id'] = triage_translator_data['plan_id']
633 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
634 elif (not self.triage_data_trans['plan_id'] == triage_translator_data['plan_id']) :
635 self.triage_data_trans = {
638 'translator_triage': []
640 self.triage_data_trans['plan_name'] = triage_translator_data['plan_name']
641 self.triage_data_trans['plan_id'] = triage_translator_data['plan_id']
642 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
644 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
648 return {'response': {'resolved_demands': resolved_demands,
649 'trans': self.triage_data_trans},
652 def resolve_location(self, ctx, arg):
654 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
657 resolved_location = None
659 host_name = arg.get('host_name')
660 clli_code = arg.get('clli_code')
663 results = self.ip_ext_manager.map_method(
664 'resolve_host_location',
669 results = self.ip_ext_manager.map_method(
670 'resolve_clli_location',
674 # unknown location response
675 LOG.error(_LE("Unknown location type from the input template."
676 "Expected location types are host_name"
679 if results and len(results) > 0:
680 resolved_location = results[0]
683 return {'response': {'resolved_location': resolved_location},
686 def call_reservation_operation(self, ctx, arg):
688 reserved_candidates = None
689 method = arg["method"]
690 candidate_list = arg["candidate_list"]
691 reservation_name = arg["reservation_name"]
692 reservation_type = arg["reservation_type"]
693 controller = arg["controller"]
694 request = arg["request"]
696 if controller == "SDN-C":
697 results = self.sc_ext_manager.map_method(
698 'call_reservation_operation',
700 candidate_list=candidate_list,
701 reservation_name=reservation_name,
702 reservation_type=reservation_type,
705 if results and len(results) > 0:
706 reserved_candidates = results[0]
708 LOG.error(_LE("Unknown service controller: {}").format(controller))
709 if reserved_candidates is None or not reserved_candidates:
712 _LW("Unable to {} for "
713 "candidate {}.").format(method, reserved_candidates))
714 return {'response': result,
717 LOG.debug("{} for the candidate: "
718 "{}".format(method, reserved_candidates))
719 return {'response': result,
722 # def do_something(self, ctx, arg):
723 # """RPC endpoint for data messages
725 # When another service sends a notification over the message
726 # bus, this method receives it.
728 # LOG.debug("Got a message!")
731 # 'note': 'do_something called!',
734 # return {'response': res, 'error': False}