2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
4 # Copyright (C) 2020 Wipro Limited.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
18 # -------------------------------------------------------------------------
24 import conductor.common.prometheus_metrics as PC
26 from conductor import messaging
27 # from conductor import __file__ as conductor_root
28 from conductor.common.music import messaging as music_messaging
29 from conductor.common.utils import conductor_logging_util as log_util
30 from conductor.data.plugins.inventory_provider.base import InventoryProviderBase as Base
31 from conductor.data.plugins.inventory_provider import extensions as ip_ext
32 from conductor.data.plugins.service_controller import extensions as sc_ext
33 from conductor.data.plugins.vim_controller import extensions as vc_ext
34 from conductor.i18n import _LE, _LI, _LW
35 from oslo_config import cfg
36 from oslo_log import log
38 # from stevedore import driver
39 # from conductor.solver.resource import region
40 # from conductor.solver.resource import service
42 LOG = log.getLogger(__name__)
50 help='Number of workers for data service. '
51 'Default value is 1.'),
52 cfg.BoolOpt('concurrent',
54 help='Set to True when data will run in active-active '
55 'mode. When set to False, data will flush any abandoned '
56 'messages at startup.'),
57 cfg.FloatOpt('existing_placement_cost',
59 help='Default value is -8000, which is the diameter of the earth.The distance cannot larger than '
61 cfg.FloatOpt('cloud_candidate_cost',
63 cfg.FloatOpt('service_candidate_cost',
65 cfg.FloatOpt('nssi_candidate_cost',
67 cfg.FloatOpt('nsi_candidate_cost',
69 cfg.FloatOpt('nst_candidate_cost',
71 cfg.FloatOpt('nsst_candidate_cost',
75 CONF.register_opts(DATA_OPTS, group='data')
78 class DataServiceLauncher(object):
79 """Listener for the data service."""
81 def __init__(self, conf):
86 # Initialize Prometheus metrics Endpoint
87 # Data service uses index 0
89 self.init_extension_managers(conf)
91 def init_extension_managers(self, conf):
92 """Initialize extension managers."""
93 self.ip_ext_manager = (ip_ext.Manager(conf, 'conductor.inventory_provider.plugin'))
94 self.ip_ext_manager.initialize()
95 self.vc_ext_manager = (vc_ext.Manager(conf, 'conductor.vim_controller.plugin'))
96 self.vc_ext_manager.initialize()
97 self.sc_ext_manager = (sc_ext.Manager(conf, 'conductor.service_controller.plugin'))
98 self.sc_ext_manager.initialize()
101 transport = messaging.get_transport(self.conf)
104 target = music_messaging.Target(topic=topic)
105 endpoints = [DataEndpoint(self.ip_ext_manager,
107 self.sc_ext_manager), ]
108 flush = not self.conf.data.concurrent
109 kwargs = {'transport': transport,
111 'endpoints': endpoints,
113 svcmgr = cotyledon.ServiceManager()
114 svcmgr.add(music_messaging.RPCService,
115 workers=self.conf.data.workers,
116 args=(self.conf,), kwargs=kwargs)
120 class DataEndpoint(object):
121 def __init__(self, ip_ext_manager, vc_ext_manager, sc_ext_manager):
123 self.ip_ext_manager = ip_ext_manager
124 self.vc_ext_manager = vc_ext_manager
125 self.sc_ext_manager = sc_ext_manager
126 self.plugin_cache = {}
127 self.triage_data_trans = {
130 'translator_triage': []
133 def invoke_method(self, ctx, arg):
135 results = self.ip_ext_manager.map_method('invoke_method', arg)
137 results = list(filter(None, results))
138 results = [item for sublist in results for item in sublist]
141 return {'response': results,
144 def get_candidate_location(self, ctx, arg):
145 # candidates should have lat long info already
148 candidate = arg["candidate"]
149 lat = candidate.get('latitude', None)
150 lon = candidate.get('longitude', None)
152 location = (float(lat), float(lon))
155 return {'response': location, 'error': error}
157 def get_candidate_zone(self, ctx, arg):
158 candidate = arg["candidate"]
159 category = arg["category"]
163 if category == 'region':
164 zone = candidate['location_id']
165 elif category == 'complex':
166 zone = candidate['complex_name']
167 elif category == 'country':
168 zone = candidate['country']
173 LOG.error(_LE("Unresolvable zone category {}").format(category))
175 LOG.info(_LI("Candidate zone is {}").format(zone))
176 return {'response': zone, 'error': error}
178 def get_candidates_from_service(self, ctx, arg):
180 candidate_list = arg["candidate_list"]
181 constraint_name = arg["constraint_name"]
182 constraint_type = arg["constraint_type"]
183 controller = arg["controller"]
184 request = arg["request"]
185 request_type = arg["request_type"]
188 filtered_candidates = []
189 # call service and fetch candidates
190 # TODO(jdandrea): Get rid of the SDN-C reference (outside of plugin!)
191 if controller == "SDN-C":
192 service_model = request.get("service_model")
194 results = self.sc_ext_manager.map_method(
197 candidate_list=candidate_list,
198 constraint_name=constraint_name,
199 constraint_type=constraint_type,
200 request_type=request_type
203 if results and len(results) > 0:
204 filtered_candidates = results[0]
207 _LW("No candidates returned by service "
208 "controller: {}; may be a new service "
209 "instantiation.").format(controller))
211 LOG.error(_LE("Unknown service controller: {}").format(controller))
212 # if response from service controller is empty
213 if filtered_candidates is None:
214 if service_model == "ADIOD":
215 LOG.error("No capacity found from SDN-GC for candidates: "
216 "{}".format(candidate_list))
217 return {'response': [], 'error': error}
219 LOG.debug("Filtered candidates: {}".format(filtered_candidates))
220 candidate_list = [c for c in candidate_list
221 if c in filtered_candidates]
222 return {'response': candidate_list, 'error': error}
224 def get_candidate_discard_set(self, value, candidate_list, value_attrib):
230 if "all" in value_dict:
231 value_list = value_dict.get("all")
232 value_condition = "all"
233 elif "any" in value_dict:
234 value_list = value_dict.get("any")
235 value_condition = "any"
240 for candidate in candidate_list:
243 for value in value_list:
244 if candidate.get(value_attrib) == value:
245 c_any = True # include if any one is met
246 elif candidate.get(value_attrib) != value:
247 c_all = False # discard even if one is not met
248 if value_condition == 'any' and not c_any:
249 discard_set.add(candidate.get("candidate_id"))
250 elif value_condition == 'all' and not c_all:
251 discard_set.add(candidate.get("candidate_id"))
254 # (TODO:Larry) merge this function with the "get_candidate_discard_set"
255 def get_candidate_discard_set_by_cloud_region(self, value, candidate_list, value_attrib):
258 cloud_requests = value.get("cloud-requests")
259 service_requests = value.get("service-requests")
260 vfmodule_requests = value.get("vfmodule-requests")
262 for candidate in candidate_list:
263 if candidate.get("inventory_type") == "cloud" and \
264 (candidate.get(value_attrib) not in cloud_requests):
265 discard_set.add(candidate.get("candidate_id"))
267 elif candidate.get("inventory_type") == "service" and \
268 (candidate.get(value_attrib) not in service_requests):
269 discard_set.add(candidate.get("candidate_id"))
271 elif candidate.get("inventory_type") == "vfmodule" and \
272 (candidate.get(value_attrib) not in vfmodule_requests):
273 discard_set.add(candidate.get("candidate_id"))
277 def get_inventory_group_candidates(self, ctx, arg):
278 candidate_list = arg["candidate_list"]
279 resolved_candidate = arg["resolved_candidate"]
282 service_description = 'DHV_VVIG_PAIR'
283 results = self.ip_ext_manager.map_method(
284 'get_inventory_group_pairs',
285 service_description=service_description
287 if not results or len(results) < 1:
289 _LE("Empty inventory group response for service: {}").format(
290 service_description))
294 if not pairs or len(pairs) < 1:
296 _LE("No inventory group candidates found for service: {}, "
297 "inventory provider: {}").format(
298 service_description, self.ip_ext_manager.names()[0]))
302 "Inventory group pairs: {}, service: {}, "
303 "inventory provider: {}".format(
304 pairs, service_description,
305 self.ip_ext_manager.names()[0]))
307 if resolved_candidate.get("candidate_id") == pair[0]:
308 candidate_names.append(pair[1])
309 elif resolved_candidate.get("candidate_id") == pair[1]:
310 candidate_names.append(pair[0])
312 candidate_list = [c for c in candidate_list
313 if c["candidate_id"] in candidate_names]
315 _LI("Inventory group candidates: {}, service: {}, "
316 "inventory provider: {}").format(
317 candidate_list, service_description,
318 self.ip_ext_manager.names()[0]))
319 return {'response': candidate_list, 'error': error}
321 def get_candidates_by_attributes(self, ctx, arg):
322 candidate_list = arg["candidate_list"]
323 # demand_name = arg["demand_name"]
324 properties = arg["properties"]
327 attributes_to_evaluate = properties.get('evaluate')
328 for attrib, value in attributes_to_evaluate.items():
331 if attrib == 'network_roles':
332 role_candidates = dict()
337 if "all" in nrc_dict:
338 role_list = nrc_dict.get("all")
339 role_condition = "all"
340 elif "any" in nrc_dict:
341 role_list = nrc_dict.get("any")
342 role_condition = "any"
344 # if the role_list is empty do nothing
345 if not role_list or role_list == '':
347 _LE("No roles available, "
348 "inventory provider: {}").format(
349 self.ip_ext_manager.names()[0]))
351 for role in role_list:
352 # query inventory provider to check if
353 # the candidate is in role
354 results = self.ip_ext_manager.map_method(
355 'check_network_roles',
358 if not results or len(results) < 1:
360 _LE("Empty response from inventory "
361 "provider {} for network role {}").format(
362 self.ip_ext_manager.names()[0], role))
364 region_ids = results[0]
367 _LE("No candidates from inventory provider {} "
368 "for network role {}").format(
369 self.ip_ext_manager.names()[0], role))
372 "Network role candidates: {}, role: {},"
373 "inventory provider: {}".format(
375 self.ip_ext_manager.names()[0]))
376 role_candidates[role] = region_ids
378 # find candidates that meet conditions
379 for candidate in candidate_list:
380 # perform this check only for cloud candidates
381 if candidate["inventory_type"] != "cloud":
385 for role in role_list:
386 if role not in role_candidates:
389 rc = role_candidates.get(role)
390 if rc and candidate.get("candidate_id") not in rc:
392 # discard even if one role is not met
393 elif rc and candidate.get("candidate_id") in rc:
395 # include if any one role is met
396 if role_condition == 'any' and not c_any:
397 discard_set.add(candidate.get("candidate_id"))
398 elif role_condition == 'all' and not c_all:
399 discard_set.add(candidate.get("candidate_id"))
401 elif attrib == 'replication_role':
403 for candidate in candidate_list:
405 host_id = candidate.get("host_id")
407 results = self.ip_ext_manager.map_method(
408 'check_candidate_role',
412 if not results or len(results) < 1:
414 _LE("Empty response for replication roles {}").format(role))
415 discard_set.add(candidate.get("candidate_id"))
418 # compare results from A&AI with the value in attribute constraint
419 if value and results[0] != value.upper():
420 discard_set.add(candidate.get("candidate_id"))
422 elif attrib == 'complex':
424 self.get_candidate_discard_set(
426 candidate_list=candidate_list,
427 value_attrib="complex_name")
428 discard_set.update(v_discard_set)
429 elif attrib == "country":
431 self.get_candidate_discard_set(
433 candidate_list=candidate_list,
434 value_attrib="country")
435 discard_set.update(v_discard_set)
436 elif attrib == "state":
438 self.get_candidate_discard_set(
440 candidate_list=candidate_list,
441 value_attrib="state")
442 discard_set.update(v_discard_set)
443 elif attrib == "region":
445 self.get_candidate_discard_set(
447 candidate_list=candidate_list,
448 value_attrib="region")
449 discard_set.update(v_discard_set)
450 elif attrib == "cloud-region":
452 self.get_candidate_discard_set_by_cloud_region(
454 candidate_list=candidate_list,
455 value_attrib="location_id")
456 discard_set.update(v_discard_set)
458 # return candidates not in discard set
459 candidate_list[:] = [c for c in candidate_list
460 if c['candidate_id'] not in discard_set]
462 "Available candidates after attribute checks: {}, "
463 "inventory provider: {}".format(
464 candidate_list, self.ip_ext_manager.names()[0]))
465 return {'response': candidate_list, 'error': False}
467 def get_candidates_with_vim_capacity(self, ctx, arg):
469 RPC for getting candidates with vim capacity
471 :param arg: contains input passed from client side for RPC call
472 :return: response candidate_list with with required vim capacity
475 candidate_list = arg["candidate_list"]
476 vim_request = arg["request"]
479 for candidate in candidate_list:
480 if candidate["inventory_type"] == "cloud":
481 vim_list.add(candidate['vim-id'])
483 vim_request['VIMs'] = list(vim_list)
484 vims_result = self.vc_ext_manager.map_method(
485 'check_vim_capacity',
489 if vims_result and len(vims_result) > 0 and vims_result[0] is not None:
490 vims_set = set(vims_result[0])
491 for candidate in candidate_list:
492 # perform this check only for cloud candidates
493 if candidate["inventory_type"] == "cloud":
494 if candidate['vim-id'] not in vims_set:
495 discard_set.add(candidate.get("candidate_id"))
497 # return candidates not in discard set
498 candidate_list[:] = [c for c in candidate_list
499 if c['candidate_id'] not in discard_set]
503 "Multicloud did not respond properly to request: {}".format(
507 "Candidates with with vim capacity: {}, vim controller: "
508 "{}").format(candidate_list, self.vc_ext_manager.names()[0]))
510 return {'response': candidate_list, 'error': error}
512 def resolve_demands(self, ctx, arg):
514 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
517 demands = arg.get('demands')
518 plan_info = arg.get('plan_info')
519 triage_translator_data = arg.get('triage_translator_data')
520 resolved_demands = None
521 results = self.ip_ext_manager.map_method(
523 demands, plan_info, triage_translator_data
525 if results and len(results) > 0:
527 resolved_demands = self.get_resolved_demands_from_result(results)
529 resolved_demands = results[0]
530 if self.triage_data_trans['plan_id']== None :
531 self.triage_data_trans['plan_name'] = triage_translator_data['plan_name']
532 self.triage_data_trans['plan_id'] = triage_translator_data['plan_id']
533 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
534 elif not self.triage_data_trans['plan_id'] == triage_translator_data['plan_id'] :
535 self.triage_data_trans = {'plan_id': triage_translator_data['plan_id'],
536 'plan_name': triage_translator_data['plan_name'], 'translator_triage': []}
537 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
539 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
543 return {'response': {'resolved_demands': resolved_demands,
544 'trans': self.triage_data_trans},
547 def get_resolved_demands_from_result(self, results):
548 resolved_demands = {de: [] for de in results[0].keys()}
549 for result in results:
550 for demand, candidates in result.items():
551 resolved_demands[demand].extend(candidates)
552 LOG.info('resolved_demands: {}'.format(str(resolved_demands)))
553 return resolved_demands
555 def resolve_location(self, ctx, arg):
557 log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
560 resolved_location = None
562 host_name = arg.get('host_name')
563 clli_code = arg.get('clli_code')
566 results = self.ip_ext_manager.map_method(
567 'resolve_host_location',
572 results = self.ip_ext_manager.map_method(
573 'resolve_clli_location',
578 # unknown location response
579 LOG.error(_LE("Unknown location type from the input template."
580 "Expected location types are host_name"
583 if results and len(results) > 0:
584 resolved_location = results[0]
587 return {'response': {'resolved_location': resolved_location},
590 def call_reservation_operation(self, ctx, arg):
592 reserved_candidates = None
593 method = arg["method"]
594 candidate_list = arg["candidate_list"]
595 reservation_name = arg["reservation_name"]
596 reservation_type = arg["reservation_type"]
597 controller = arg["controller"]
598 request = arg["request"]
600 if controller == "SDN-C":
601 results = self.sc_ext_manager.map_method(
602 'call_reservation_operation',
604 candidate_list=candidate_list,
605 reservation_name=reservation_name,
606 reservation_type=reservation_type,
609 if results and len(results) > 0:
610 reserved_candidates = results[0]
612 LOG.error(_LE("Unknown service controller: {}").format(controller))
613 if reserved_candidates is None or not reserved_candidates:
616 _LW("Unable to {} for "
617 "candidate {}.").format(method, reserved_candidates))
618 return {'response': result,
621 LOG.debug("{} for the candidate: "
622 "{}".format(method, reserved_candidates))
623 return {'response': result,