add python compatibility module
[optf/has.git] / conductor / conductor / data / service.py
1 #
2 # -------------------------------------------------------------------------
3 #   Copyright (c) 2015-2017 AT&T Intellectual Property
4 #   Copyright (C) 2020 Wipro Limited.
5 #
6 #   Licensed under the Apache License, Version 2.0 (the "License");
7 #   you may not use this file except in compliance with the License.
8 #   You may obtain a copy of the License at
9 #
10 #       http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #   Unless required by applicable law or agreed to in writing, software
13 #   distributed under the License is distributed on an "AS IS" BASIS,
14 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #   See the License for the specific language governing permissions and
16 #   limitations under the License.
17 #
18 # -------------------------------------------------------------------------
19 #
20
21 # import json
22 # import os
23
24 import conductor.common.prometheus_metrics as PC
25 import cotyledon
26 from conductor import messaging
27 # from conductor import __file__ as conductor_root
28 from conductor.common.music import messaging as music_messaging
29 from conductor.common.utils import conductor_logging_util as log_util
30 from conductor.data.plugins.inventory_provider.base import InventoryProviderBase as Base
31 from conductor.data.plugins.inventory_provider import extensions as ip_ext
32 from conductor.data.plugins.service_controller import extensions as sc_ext
33 from conductor.data.plugins.vim_controller import extensions as vc_ext
34 from conductor.i18n import _LE, _LI, _LW
35 from oslo_config import cfg
36 from oslo_log import log
37
38 # from stevedore import driver
39 # from conductor.solver.resource import region
40 # from conductor.solver.resource import service
41
42 LOG = log.getLogger(__name__)
43
44 CONF = cfg.CONF
45
46 DATA_OPTS = [
47     cfg.IntOpt('workers',
48                default=1,
49                min=1,
50                help='Number of workers for data service. '
51                     'Default value is 1.'),
52     cfg.BoolOpt('concurrent',
53                 default=False,
54                 help='Set to True when data will run in active-active '
55                      'mode. When set to False, data will flush any abandoned '
56                      'messages at startup.'),
57     cfg.FloatOpt('existing_placement_cost',
58                  default=-8000.0,
59                  help='Default value is -8000, which is the diameter of the earth.The distance cannot larger than '
60                       'this value'),
61     cfg.FloatOpt('cloud_candidate_cost',
62                  default=2.0),
63     cfg.FloatOpt('service_candidate_cost',
64                  default=1.0),
65     cfg.FloatOpt('nssi_candidate_cost',
66                  default=1.0),
67     cfg.FloatOpt('nsi_candidate_cost',
68                  default=1.0),
69     cfg.FloatOpt('nst_candidate_cost',
70                  default=1.0),
71     cfg.FloatOpt('nsst_candidate_cost',
72                  default=1.0),
73 ]
74
75 CONF.register_opts(DATA_OPTS, group='data')
76
77
78 class DataServiceLauncher(object):
79     """Listener for the data service."""
80
81     def __init__(self, conf):
82         """Initializer."""
83
84         self.conf = conf
85
86         # Initialize Prometheus metrics Endpoint
87         # Data service uses index 0
88         PC._init_metrics(0)
89         self.init_extension_managers(conf)
90
91     def init_extension_managers(self, conf):
92         """Initialize extension managers."""
93         self.ip_ext_manager = (ip_ext.Manager(conf, 'conductor.inventory_provider.plugin'))
94         self.ip_ext_manager.initialize()
95         self.vc_ext_manager = (vc_ext.Manager(conf, 'conductor.vim_controller.plugin'))
96         self.vc_ext_manager.initialize()
97         self.sc_ext_manager = (sc_ext.Manager(conf, 'conductor.service_controller.plugin'))
98         self.sc_ext_manager.initialize()
99
100     def run(self):
101         transport = messaging.get_transport(self.conf)
102         if transport:
103             topic = "data"
104             target = music_messaging.Target(topic=topic)
105             endpoints = [DataEndpoint(self.ip_ext_manager,
106                                       self.vc_ext_manager,
107                                       self.sc_ext_manager), ]
108             flush = not self.conf.data.concurrent
109             kwargs = {'transport': transport,
110                       'target': target,
111                       'endpoints': endpoints,
112                       'flush': flush, }
113             svcmgr = cotyledon.ServiceManager()
114             svcmgr.add(music_messaging.RPCService,
115                        workers=self.conf.data.workers,
116                        args=(self.conf,), kwargs=kwargs)
117             svcmgr.run()
118
119
120 class DataEndpoint(object):
121     def __init__(self, ip_ext_manager, vc_ext_manager, sc_ext_manager):
122
123         self.ip_ext_manager = ip_ext_manager
124         self.vc_ext_manager = vc_ext_manager
125         self.sc_ext_manager = sc_ext_manager
126         self.plugin_cache = {}
127         self.triage_data_trans = {
128             'plan_id': None,
129             'plan_name': None,
130             'translator_triage': []
131         }
132
133     def invoke_method(self, ctx, arg):
134         error = False
135         results = self.ip_ext_manager.map_method('invoke_method', arg)
136         if results:
137             results = list(filter(None, results))
138             results = [item for sublist in results for item in sublist]
139         else:
140             error = True
141         return {'response': results,
142                 'error': error}
143
144     def get_candidate_location(self, ctx, arg):
145         # candidates should have lat long info already
146         error = False
147         location = None
148         candidate = arg["candidate"]
149         lat = candidate.get('latitude', None)
150         lon = candidate.get('longitude', None)
151         if lat and lon:
152             location = (float(lat), float(lon))
153         else:
154             error = True
155         return {'response': location, 'error': error}
156
157     def get_candidate_zone(self, ctx, arg):
158         candidate = arg["candidate"]
159         category = arg["category"]
160         zone = None
161         error = False
162
163         if category == 'region':
164             zone = candidate['location_id']
165         elif category == 'complex':
166             zone = candidate['complex_name']
167         elif category == 'country':
168             zone = candidate['country']
169         else:
170             error = True
171
172         if error:
173             LOG.error(_LE("Unresolvable zone category {}").format(category))
174         else:
175             LOG.info(_LI("Candidate zone is {}").format(zone))
176         return {'response': zone, 'error': error}
177
178     def get_candidates_from_service(self, ctx, arg):
179
180         candidate_list = arg["candidate_list"]
181         constraint_name = arg["constraint_name"]
182         constraint_type = arg["constraint_type"]
183         controller = arg["controller"]
184         request = arg["request"]
185         request_type = arg["request_type"]
186
187         error = False
188         filtered_candidates = []
189         # call service and fetch candidates
190         # TODO(jdandrea): Get rid of the SDN-C reference (outside of plugin!)
191         if controller == "SDN-C":
192             service_model = request.get("service_model")
193
194             results = self.sc_ext_manager.map_method(
195                 'filter_candidates',
196                 request=request,
197                 candidate_list=candidate_list,
198                 constraint_name=constraint_name,
199                 constraint_type=constraint_type,
200                 request_type=request_type
201             )
202
203             if results and len(results) > 0:
204                 filtered_candidates = results[0]
205             else:
206                 LOG.warn(
207                     _LW("No candidates returned by service "
208                         "controller: {}; may be a new service "
209                         "instantiation.").format(controller))
210         else:
211             LOG.error(_LE("Unknown service controller: {}").format(controller))
212         # if response from service controller is empty
213         if filtered_candidates is None:
214             if service_model == "ADIOD":
215                 LOG.error("No capacity found from SDN-GC for candidates: "
216                           "{}".format(candidate_list))
217             return {'response': [], 'error': error}
218         else:
219             LOG.debug("Filtered candidates: {}".format(filtered_candidates))
220             candidate_list = [c for c in candidate_list
221                               if c in filtered_candidates]
222             return {'response': candidate_list, 'error': error}
223
224     def get_candidate_discard_set(self, value, candidate_list, value_attrib):
225         discard_set = set()
226         value_dict = value
227         value_condition = ''
228         value_list = None
229         if value_dict:
230             if "all" in value_dict:
231                 value_list = value_dict.get("all")
232                 value_condition = "all"
233             elif "any" in value_dict:
234                 value_list = value_dict.get("any")
235                 value_condition = "any"
236
237             if not value_list:
238                 return discard_set
239
240             for candidate in candidate_list:
241                 c_any = False
242                 c_all = True
243                 for value in value_list:
244                     if candidate.get(value_attrib) == value:
245                         c_any = True  # include if any one is met
246                     elif candidate.get(value_attrib) != value:
247                         c_all = False  # discard even if one is not met
248                 if value_condition == 'any' and not c_any:
249                     discard_set.add(candidate.get("candidate_id"))
250                 elif value_condition == 'all' and not c_all:
251                     discard_set.add(candidate.get("candidate_id"))
252         return discard_set
253
254     # (TODO:Larry) merge this function with the "get_candidate_discard_set"
255     def get_candidate_discard_set_by_cloud_region(self, value, candidate_list, value_attrib):
256         discard_set = set()
257
258         cloud_requests = value.get("cloud-requests")
259         service_requests = value.get("service-requests")
260         vfmodule_requests = value.get("vfmodule-requests")
261
262         for candidate in candidate_list:
263             if candidate.get("inventory_type") == "cloud" and \
264                     (candidate.get(value_attrib) not in cloud_requests):
265                 discard_set.add(candidate.get("candidate_id"))
266
267             elif candidate.get("inventory_type") == "service" and \
268                     (candidate.get(value_attrib) not in service_requests):
269                 discard_set.add(candidate.get("candidate_id"))
270
271             elif candidate.get("inventory_type") == "vfmodule" and \
272                     (candidate.get(value_attrib) not in vfmodule_requests):
273                 discard_set.add(candidate.get("candidate_id"))
274
275         return discard_set
276
277     def get_inventory_group_candidates(self, ctx, arg):
278         candidate_list = arg["candidate_list"]
279         resolved_candidate = arg["resolved_candidate"]
280         candidate_names = []
281         error = False
282         service_description = 'DHV_VVIG_PAIR'
283         results = self.ip_ext_manager.map_method(
284             'get_inventory_group_pairs',
285             service_description=service_description
286         )
287         if not results or len(results) < 1:
288             LOG.error(
289                 _LE("Empty inventory group response for service: {}").format(
290                     service_description))
291             error = True
292         else:
293             pairs = results[0]
294             if not pairs or len(pairs) < 1:
295                 LOG.error(
296                     _LE("No inventory group candidates found for service: {}, "
297                         "inventory provider: {}").format(
298                         service_description, self.ip_ext_manager.names()[0]))
299                 error = True
300             else:
301                 LOG.debug(
302                     "Inventory group pairs: {}, service: {}, "
303                     "inventory provider: {}".format(
304                         pairs, service_description,
305                         self.ip_ext_manager.names()[0]))
306                 for pair in pairs:
307                     if resolved_candidate.get("candidate_id") == pair[0]:
308                         candidate_names.append(pair[1])
309                     elif resolved_candidate.get("candidate_id") == pair[1]:
310                         candidate_names.append(pair[0])
311
312         candidate_list = [c for c in candidate_list
313                           if c["candidate_id"] in candidate_names]
314         LOG.info(
315             _LI("Inventory group candidates: {}, service: {}, "
316                 "inventory provider: {}").format(
317                 candidate_list, service_description,
318                 self.ip_ext_manager.names()[0]))
319         return {'response': candidate_list, 'error': error}
320
321     def get_candidates_by_attributes(self, ctx, arg):
322         candidate_list = arg["candidate_list"]
323         # demand_name = arg["demand_name"]
324         properties = arg["properties"]
325         discard_set = set()
326
327         attributes_to_evaluate = properties.get('evaluate')
328         for attrib, value in attributes_to_evaluate.items():
329             if value == '':
330                 continue
331             if attrib == 'network_roles':
332                 role_candidates = dict()
333                 role_list = []
334                 nrc_dict = value
335                 role_condition = ''
336                 if nrc_dict:
337                     if "all" in nrc_dict:
338                         role_list = nrc_dict.get("all")
339                         role_condition = "all"
340                     elif "any" in nrc_dict:
341                         role_list = nrc_dict.get("any")
342                         role_condition = "any"
343
344                     # if the role_list is empty do nothing
345                     if not role_list or role_list == '':
346                         LOG.error(
347                             _LE("No roles available, "
348                                 "inventory provider: {}").format(
349                                 self.ip_ext_manager.names()[0]))
350                         continue
351                     for role in role_list:
352                         # query inventory provider to check if
353                         # the candidate is in role
354                         results = self.ip_ext_manager.map_method(
355                             'check_network_roles',
356                             network_role_id=role
357                         )
358                         if not results or len(results) < 1:
359                             LOG.error(
360                                 _LE("Empty response from inventory "
361                                     "provider {} for network role {}").format(
362                                     self.ip_ext_manager.names()[0], role))
363                             continue
364                         region_ids = results[0]
365                         if not region_ids:
366                             LOG.error(
367                                 _LE("No candidates from inventory provider {} "
368                                     "for network role {}").format(
369                                     self.ip_ext_manager.names()[0], role))
370                             continue
371                         LOG.debug(
372                             "Network role candidates: {}, role: {},"
373                             "inventory provider: {}".format(
374                                 region_ids, role,
375                                 self.ip_ext_manager.names()[0]))
376                         role_candidates[role] = region_ids
377
378                     # find candidates that meet conditions
379                     for candidate in candidate_list:
380                         # perform this check only for cloud candidates
381                         if candidate["inventory_type"] != "cloud":
382                             continue
383                         c_any = False
384                         c_all = True
385                         for role in role_list:
386                             if role not in role_candidates:
387                                 c_all = False
388                                 continue
389                             rc = role_candidates.get(role)
390                             if rc and candidate.get("candidate_id") not in rc:
391                                 c_all = False
392                                 # discard even if one role is not met
393                             elif rc and candidate.get("candidate_id") in rc:
394                                 c_any = True
395                                 # include if any one role is met
396                         if role_condition == 'any' and not c_any:
397                             discard_set.add(candidate.get("candidate_id"))
398                         elif role_condition == 'all' and not c_all:
399                             discard_set.add(candidate.get("candidate_id"))
400
401             elif attrib == 'replication_role':
402
403                 for candidate in candidate_list:
404
405                     host_id = candidate.get("host_id")
406                     if host_id:
407                         results = self.ip_ext_manager.map_method(
408                             'check_candidate_role',
409                             host_id = host_id
410                         )
411
412                         if not results or len(results) < 1:
413                             LOG.error(
414                                 _LE("Empty response for replication roles {}").format(role))
415                             discard_set.add(candidate.get("candidate_id"))
416                             continue
417
418                         # compare results from A&AI with the value in attribute constraint
419                         if value and results[0] != value.upper():
420                             discard_set.add(candidate.get("candidate_id"))
421
422             elif attrib == 'complex':
423                 v_discard_set = \
424                     self.get_candidate_discard_set(
425                         value=value,
426                         candidate_list=candidate_list,
427                         value_attrib="complex_name")
428                 discard_set.update(v_discard_set)
429             elif attrib == "country":
430                 v_discard_set = \
431                     self.get_candidate_discard_set(
432                         value=value,
433                         candidate_list=candidate_list,
434                         value_attrib="country")
435                 discard_set.update(v_discard_set)
436             elif attrib == "state":
437                 v_discard_set = \
438                     self.get_candidate_discard_set(
439                         value=value,
440                         candidate_list=candidate_list,
441                         value_attrib="state")
442                 discard_set.update(v_discard_set)
443             elif attrib == "region":
444                 v_discard_set = \
445                     self.get_candidate_discard_set(
446                         value=value,
447                         candidate_list=candidate_list,
448                         value_attrib="region")
449                 discard_set.update(v_discard_set)
450             elif attrib == "cloud-region":
451                 v_discard_set = \
452                     self.get_candidate_discard_set_by_cloud_region(
453                         value=value,
454                         candidate_list=candidate_list,
455                         value_attrib="location_id")
456                 discard_set.update(v_discard_set)
457
458         # return candidates not in discard set
459         candidate_list[:] = [c for c in candidate_list
460                              if c['candidate_id'] not in discard_set]
461         LOG.info(
462             "Available candidates after attribute checks: {}, "
463             "inventory provider: {}".format(
464                 candidate_list, self.ip_ext_manager.names()[0]))
465         return {'response': candidate_list, 'error': False}
466
467     def get_candidates_with_vim_capacity(self, ctx, arg):
468         '''
469         RPC for getting candidates with vim capacity
470         :param ctx: context
471         :param arg: contains input passed from client side for RPC call
472         :return: response candidate_list with with required vim capacity
473         '''
474         error = False
475         candidate_list = arg["candidate_list"]
476         vim_request = arg["request"]
477         vim_list = set()
478         discard_set = set()
479         for candidate in candidate_list:
480             if candidate["inventory_type"] == "cloud":
481                 vim_list.add(candidate['vim-id'])
482
483         vim_request['VIMs'] = list(vim_list)
484         vims_result = self.vc_ext_manager.map_method(
485             'check_vim_capacity',
486             vim_request
487         )
488
489         if vims_result and len(vims_result) > 0 and vims_result[0] is not None:
490             vims_set = set(vims_result[0])
491             for candidate in candidate_list:
492                 # perform this check only for cloud candidates
493                 if candidate["inventory_type"] == "cloud":
494                     if candidate['vim-id'] not in vims_set:
495                         discard_set.add(candidate.get("candidate_id"))
496
497             # return candidates not in discard set
498             candidate_list[:] = [c for c in candidate_list
499                                  if c['candidate_id'] not in discard_set]
500         else:
501             error = True
502             LOG.warn(_LI(
503                 "Multicloud did not respond properly to request: {}".format(
504                     vim_request)))
505
506         LOG.info(_LI(
507             "Candidates with with vim capacity: {}, vim controller: "
508             "{}").format(candidate_list, self.vc_ext_manager.names()[0]))
509
510         return {'response': candidate_list, 'error': error}
511
512     def resolve_demands(self, ctx, arg):
513
514         log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
515
516         error = False
517         demands = arg.get('demands')
518         plan_info = arg.get('plan_info')
519         triage_translator_data = arg.get('triage_translator_data')
520         resolved_demands = None
521         results = self.ip_ext_manager.map_method(
522             'resolve_demands',
523             demands, plan_info, triage_translator_data
524         )
525         if results and len(results) > 0:
526             if len(results) > 1:
527                 resolved_demands = self.get_resolved_demands_from_result(results)
528             else:
529                 resolved_demands = results[0]
530             if self.triage_data_trans['plan_id']== None :
531                 self.triage_data_trans['plan_name'] = triage_translator_data['plan_name']
532                 self.triage_data_trans['plan_id'] = triage_translator_data['plan_id']
533                 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
534             elif not self.triage_data_trans['plan_id'] == triage_translator_data['plan_id'] :
535                 self.triage_data_trans = {'plan_id': triage_translator_data['plan_id'],
536                                           'plan_name': triage_translator_data['plan_name'], 'translator_triage': []}
537                 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
538             else:
539                 self.triage_data_trans['translator_triage'].append(triage_translator_data['dropped_candidates'])
540         else:
541             error = True
542
543         return {'response': {'resolved_demands': resolved_demands,
544                              'trans': self.triage_data_trans},
545                 'error': error}
546
547     def get_resolved_demands_from_result(self, results):
548         resolved_demands = {de: [] for de in results[0].keys()}
549         for result in results:
550             for demand, candidates in result.items():
551                 resolved_demands[demand].extend(candidates)
552         LOG.info('resolved_demands: {}'.format(str(resolved_demands)))
553         return resolved_demands
554
555     def resolve_location(self, ctx, arg):
556
557         log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
558
559         error = False
560         resolved_location = None
561
562         host_name = arg.get('host_name')
563         clli_code = arg.get('clli_code')
564
565         if host_name:
566             results = self.ip_ext_manager.map_method(
567                 'resolve_host_location',
568                 host_name
569             )
570
571         elif clli_code:
572             results = self.ip_ext_manager.map_method(
573                 'resolve_clli_location',
574                 clli_code
575             )
576         else:
577             results = None
578             # unknown location response
579             LOG.error(_LE("Unknown location type from the input template."
580                           "Expected location types are host_name"
581                           " or clli_code."))
582
583         if results and len(results) > 0:
584             resolved_location = results[0]
585         else:
586             error = True
587         return {'response': {'resolved_location': resolved_location},
588                 'error': error}
589
590     def call_reservation_operation(self, ctx, arg):
591         result = True
592         reserved_candidates = None
593         method = arg["method"]
594         candidate_list = arg["candidate_list"]
595         reservation_name = arg["reservation_name"]
596         reservation_type = arg["reservation_type"]
597         controller = arg["controller"]
598         request = arg["request"]
599
600         if controller == "SDN-C":
601             results = self.sc_ext_manager.map_method(
602                 'call_reservation_operation',
603                 method=method,
604                 candidate_list=candidate_list,
605                 reservation_name=reservation_name,
606                 reservation_type=reservation_type,
607                 request=request
608             )
609             if results and len(results) > 0:
610                 reserved_candidates = results[0]
611         else:
612             LOG.error(_LE("Unknown service controller: {}").format(controller))
613         if reserved_candidates is None or not reserved_candidates:
614             result = False
615             LOG.debug(
616                 _LW("Unable to {} for "
617                     "candidate {}.").format(method, reserved_candidates))
618             return {'response': result,
619                     'error': not result}
620         else:
621             LOG.debug("{} for the candidate: "
622                       "{}".format(method, reserved_candidates))
623             return {'response': result,
624                     'error': not result}