Merge "Add INFO.yaml file"
[optf/has.git] / conductor / conductor / data / service.py
1 #
2 # -------------------------------------------------------------------------
3 #   Copyright (c) 2015-2017 AT&T Intellectual Property
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #       http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 # -------------------------------------------------------------------------
18 #
19
20 # import json
21 # import os
22
23 import cotyledon
24 from oslo_config import cfg
25 from oslo_log import log
26 # from stevedore import driver
27
28 # from conductor import __file__ as conductor_root
29 from conductor.common.music import messaging as music_messaging
30 from conductor.data.plugins.inventory_provider import extensions as ip_ext
31 from conductor.data.plugins.service_controller import extensions as sc_ext
32 from conductor.i18n import _LE, _LI, _LW
33 from conductor import messaging
34 from conductor.common.utils import conductor_logging_util as log_util
35 # from conductor.solver.resource import region
36 # from conductor.solver.resource import service
37
38 LOG = log.getLogger(__name__)
39
40 CONF = cfg.CONF
41
42 DATA_OPTS = [
43     cfg.IntOpt('workers',
44                default=1,
45                min=1,
46                help='Number of workers for data service. '
47                     'Default value is 1.'),
48     cfg.BoolOpt('concurrent',
49                 default=False,
50                 help='Set to True when data will run in active-active '
51                      'mode. When set to False, data will flush any abandoned '
52                      'messages at startup.'),
53     cfg.FloatOpt('existing_placement_cost',
54                default=-8000.0,
55                help='Default value is -8000, which is the diameter of the earth. '
56                     'The distance cannot larger than this value'),
57     cfg.FloatOpt('cloud_candidate_cost',
58                default=2.0),
59     cfg.FloatOpt('service_candidate_cost',
60                default=1.0),
61 ]
62
63 CONF.register_opts(DATA_OPTS, group='data')
64
65
66 class DataServiceLauncher(object):
67     """Listener for the data service."""
68
69     def __init__(self, conf):
70         """Initializer."""
71
72         self.conf = conf
73         self.init_extension_managers(conf)
74
75     def init_extension_managers(self, conf):
76         """Initialize extension managers."""
77         self.ip_ext_manager = (
78             ip_ext.Manager(conf, 'conductor.inventory_provider.plugin'))
79         self.ip_ext_manager.initialize()
80         self.sc_ext_manager = (
81             sc_ext.Manager(conf, 'conductor.service_controller.plugin'))
82         self.sc_ext_manager.initialize()
83
84     def run(self):
85         transport = messaging.get_transport(self.conf)
86         if transport:
87             topic = "data"
88             target = music_messaging.Target(topic=topic)
89             endpoints = [DataEndpoint(self.ip_ext_manager,
90                                       self.sc_ext_manager), ]
91             flush = not self.conf.data.concurrent
92             kwargs = {'transport': transport,
93                       'target': target,
94                       'endpoints': endpoints,
95                       'flush': flush, }
96             svcmgr = cotyledon.ServiceManager()
97             svcmgr.add(music_messaging.RPCService,
98                        workers=self.conf.data.workers,
99                        args=(self.conf,), kwargs=kwargs)
100             svcmgr.run()
101
102
103 class DataEndpoint(object):
104     def __init__(self, ip_ext_manager, sc_ext_manager):
105
106         self.ip_ext_manager = ip_ext_manager
107         self.sc_ext_manager = sc_ext_manager
108         self.plugin_cache = {}
109
110     def get_candidate_location(self, ctx, arg):
111         # candidates should have lat long info already
112         error = False
113         location = None
114         candidate = arg["candidate"]
115         lat = candidate.get('latitude', None)
116         lon = candidate.get('longitude', None)
117         if lat and lon:
118             location = (float(lat), float(lon))
119         else:
120             error = True
121         return {'response': location, 'error': error}
122
123     def get_candidate_zone(self, ctx, arg):
124         candidate = arg["candidate"]
125         category = arg["category"]
126         zone = None
127         error = False
128
129         if category == 'region':
130             zone = candidate['location_id']
131         elif category == 'complex':
132             zone = candidate['complex_name']
133         elif category == 'country':
134             zone = candidate['country']
135         else:
136             error = True
137
138         if error:
139             LOG.error(_LE("Unresolvable zone category {}").format(category))
140         else:
141             LOG.info(_LI("Candidate zone is {}").format(zone))
142         return {'response': zone, 'error': error}
143
144     def get_candidates_from_service(self, ctx, arg):
145
146         candidate_list = arg["candidate_list"]
147         constraint_name = arg["constraint_name"]
148         constraint_type = arg["constraint_type"]
149         controller = arg["controller"]
150         request = arg["request"]
151         request_type = arg["request_type"]
152
153         error = False
154         filtered_candidates = []
155         # call service and fetch candidates
156         # TODO(jdandrea): Get rid of the SDN-C reference (outside of plugin!)
157         if controller == "SDN-C":
158             service_model = request.get("service_model")
159
160             results = self.sc_ext_manager.map_method(
161                 'filter_candidates',
162                 request=request,
163                 candidate_list=candidate_list,
164                 constraint_name=constraint_name,
165                 constraint_type=constraint_type,
166                 request_type=request_type
167             )
168
169             if results and len(results) > 0:
170                 filtered_candidates = results[0]
171             else:
172                 LOG.warn(
173                     _LW("No candidates returned by service "
174                         "controller: {}; may be a new service "
175                         "instantiation.").format(controller))
176         else:
177             LOG.error(_LE("Unknown service controller: {}").format(controller))
178         # if response from service controller is empty
179         if filtered_candidates is None:
180             if service_model == "ADIOD":
181                 LOG.error("No capacity found from SDN-GC for candidates: "
182                           "{}".format(candidate_list))
183             return {'response': [], 'error': error}
184         else:
185             LOG.debug("Filtered candidates: {}".format(filtered_candidates))
186             candidate_list = [c for c in candidate_list
187                               if c in filtered_candidates]
188             return {'response': candidate_list, 'error': error}
189
190     def get_candidate_discard_set(self, value, candidate_list, value_attrib):
191         discard_set = set()
192         value_dict = value
193         value_condition = ''
194         value_list = None
195         if value_dict:
196             if "all" in value_dict:
197                 value_list = value_dict.get("all")
198                 value_condition = "all"
199             elif "any" in value_dict:
200                 value_list = value_dict.get("any")
201                 value_condition = "any"
202
203             if not value_list:
204                 return discard_set
205
206             for candidate in candidate_list:
207                 c_any = False
208                 c_all = True
209                 for value in value_list:
210                     if candidate.get(value_attrib) == value:
211                         c_any = True  # include if any one is met
212                     elif candidate.get(value_attrib) != value:
213                         c_all = False  # discard even if one is not met
214                 if value_condition == 'any' and not c_any:
215                     discard_set.add(candidate.get("candidate_id"))
216                 elif value_condition == 'all' and not c_all:
217                     discard_set.add(candidate.get("candidate_id"))
218         return discard_set
219
220     #(TODO:Larry) merge this function with the "get_candidate_discard_set"
221     def get_candidate_discard_set_by_cloud_region(self, value, candidate_list, value_attrib):
222         discard_set = set()
223
224         cloud_requests = value.get("cloud-requests")
225         service_requests = value.get("service-requests")
226
227         for candidate in candidate_list:
228             if candidate.get("inventory_type") == "cloud" and \
229                     (candidate.get(value_attrib) not in cloud_requests):
230                 discard_set.add(candidate.get("candidate_id"))
231
232             elif candidate.get("inventory_type") == "service" and \
233                     (candidate.get(value_attrib) not in service_requests):
234                 discard_set.add(candidate.get("candidate_id"))
235
236
237         return discard_set
238
239
240     def get_inventory_group_candidates(self, ctx, arg):
241         candidate_list = arg["candidate_list"]
242         resolved_candidate = arg["resolved_candidate"]
243         candidate_names = []
244         error = False
245         service_description = 'DHV_VVIG_PAIR'
246         results = self.ip_ext_manager.map_method(
247             'get_inventory_group_pairs',
248             service_description=service_description
249         )
250         if not results or len(results) < 1:
251             LOG.error(
252                 _LE("Empty inventory group response for service: {}").format(
253                     service_description))
254             error = True
255         else:
256             pairs = results[0]
257             if not pairs or len(pairs) < 1:
258                 LOG.error(
259                     _LE("No inventory group candidates found for service: {}, "
260                         "inventory provider: {}").format(
261                         service_description, self.ip_ext_manager.names()[0]))
262                 error = True
263             else:
264                 LOG.debug(
265                     "Inventory group pairs: {}, service: {}, "
266                     "inventory provider: {}".format(
267                         pairs, service_description,
268                         self.ip_ext_manager.names()[0]))
269                 for pair in pairs:
270                     if resolved_candidate.get("candidate_id") == pair[0]:
271                         candidate_names.append(pair[1])
272                     elif resolved_candidate.get("candidate_id") == pair[1]:
273                         candidate_names.append(pair[0])
274
275         candidate_list = [c for c in candidate_list
276                           if c["candidate_id"] in candidate_names]
277         LOG.info(
278             _LI("Inventory group candidates: {}, service: {}, "
279                 "inventory provider: {}").format(
280                 candidate_list, service_description,
281                 self.ip_ext_manager.names()[0]))
282         return {'response': candidate_list, 'error': error}
283
284     def get_candidates_by_attributes(self, ctx, arg):
285         candidate_list = arg["candidate_list"]
286         # demand_name = arg["demand_name"]
287         properties = arg["properties"]
288         discard_set = set()
289
290         attributes_to_evaluate = properties.get('evaluate')
291         for attrib, value in attributes_to_evaluate.items():
292             if value == '':
293                 continue
294             if attrib == 'network_roles':
295                 role_candidates = dict()
296                 role_list = []
297                 nrc_dict = value
298                 role_condition = ''
299                 if nrc_dict:
300                     if "all" in nrc_dict:
301                         role_list = nrc_dict.get("all")
302                         role_condition = "all"
303                     elif "any" in nrc_dict:
304                         role_list = nrc_dict.get("any")
305                         role_condition = "any"
306
307                     # if the role_list is empty do nothing
308                     if not role_list or role_list == '':
309                         LOG.error(
310                             _LE("No roles available, "
311                                 "inventory provider: {}").format(
312                                 self.ip_ext_manager.names()[0]))
313                         continue
314                     for role in role_list:
315                         # query inventory provider to check if
316                         # the candidate is in role
317                         results = self.ip_ext_manager.map_method(
318                             'check_network_roles',
319                             network_role_id=role
320                         )
321                         if not results or len(results) < 1:
322                             LOG.error(
323                                 _LE("Empty response from inventory "
324                                     "provider {} for network role {}").format(
325                                     self.ip_ext_manager.names()[0], role))
326                             continue
327                         region_ids = results[0]
328                         if not region_ids:
329                             LOG.error(
330                                 _LE("No candidates from inventory provider {} "
331                                     "for network role {}").format(
332                                     self.ip_ext_manager.names()[0], role))
333                             continue
334                         LOG.debug(
335                             "Network role candidates: {}, role: {},"
336                             "inventory provider: {}".format(
337                                 region_ids, role,
338                                 self.ip_ext_manager.names()[0]))
339                         role_candidates[role] = region_ids
340
341                     # find candidates that meet conditions
342                     for candidate in candidate_list:
343                         # perform this check only for cloud candidates
344                         if candidate["inventory_type"] != "cloud":
345                             continue
346                         c_any = False
347                         c_all = True
348                         for role in role_list:
349                             if role not in role_candidates:
350                                 c_all = False
351                                 continue
352                             rc = role_candidates.get(role)
353                             if rc and candidate.get("candidate_id") not in rc:
354                                 c_all = False
355                                 # discard even if one role is not met
356                             elif rc and candidate.get("candidate_id") in rc:
357                                 c_any = True
358                                 # include if any one role is met
359                         if role_condition == 'any' and not c_any:
360                             discard_set.add(candidate.get("candidate_id"))
361                         elif role_condition == 'all' and not c_all:
362                             discard_set.add(candidate.get("candidate_id"))
363
364             elif attrib == 'replication_role':
365
366                 for candidate in candidate_list:
367
368                     host_id = candidate.get("host_id")
369                     if host_id:
370                         results = self.ip_ext_manager.map_method(
371                             'check_candidate_role',
372                             host_id = host_id
373                         )
374
375                         if not results or len(results) < 1:
376                             LOG.error(
377                                 _LE("Empty response for replication roles {}").format(role))
378                             discard_set.add(candidate.get("candidate_id"))
379                             continue
380
381                         # compare results from A&AI with the value in attribute constraint
382                         if value and results[0] != value.upper():
383                             discard_set.add(candidate.get("candidate_id"))
384
385             elif attrib == 'complex':
386                 v_discard_set = \
387                     self.get_candidate_discard_set(
388                         value=value,
389                         candidate_list=candidate_list,
390                         value_attrib="complex_name")
391                 discard_set.update(v_discard_set)
392             elif attrib == "country":
393                 v_discard_set = \
394                     self.get_candidate_discard_set(
395                         value=value,
396                         candidate_list=candidate_list,
397                         value_attrib="country")
398                 discard_set.update(v_discard_set)
399             elif attrib == "state":
400                 v_discard_set = \
401                     self.get_candidate_discard_set(
402                         value=value,
403                         candidate_list=candidate_list,
404                         value_attrib="state")
405                 discard_set.update(v_discard_set)
406             elif attrib == "region":
407                 v_discard_set = \
408                     self.get_candidate_discard_set(
409                         value=value,
410                         candidate_list=candidate_list,
411                         value_attrib="region")
412                 discard_set.update(v_discard_set)
413             elif attrib == "cloud-region":
414                 v_discard_set = \
415                     self.get_candidate_discard_set_by_cloud_region(
416                         value=value,
417                         candidate_list=candidate_list,
418                         value_attrib="location_id")
419                 discard_set.update(v_discard_set)
420
421         # return candidates not in discard set
422         candidate_list[:] = [c for c in candidate_list
423                              if c['candidate_id'] not in discard_set]
424         LOG.info(
425             "Available candidates after attribute checks: {}, "
426             "inventory provider: {}".format(
427                 candidate_list, self.ip_ext_manager.names()[0]))
428         return {'response': candidate_list, 'error': False}
429
430     def resolve_demands(self, ctx, arg):
431
432         log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
433
434         error = False
435         demands = arg.get('demands')
436         resolved_demands = None
437         results = self.ip_ext_manager.map_method(
438             'resolve_demands',
439             demands
440         )
441         if results and len(results) > 0:
442             resolved_demands = results[0]
443         else:
444             error = True
445
446         return {'response': {'resolved_demands': resolved_demands},
447                 'error': error}
448
449     def resolve_location(self, ctx, arg):
450
451         log_util.setLoggerFilter(LOG, ctx.get('keyspace'), ctx.get('plan_id'))
452
453         error = False
454         resolved_location = None
455
456         host_name = arg.get('host_name')
457         clli_code = arg.get('clli_code')
458
459         if host_name:
460             results = self.ip_ext_manager.map_method(
461                 'resolve_host_location',
462                 host_name
463             )
464
465         elif clli_code:
466             results = self.ip_ext_manager.map_method(
467                 'resolve_clli_location',
468                 clli_code
469             )
470         else:
471             # unknown location response
472             LOG.error(_LE("Unknown location type from the input template."
473                           "Expected location types are host_name"
474                           " or clli_code."))
475
476         if results and len(results) > 0:
477             resolved_location = results[0]
478         else:
479             error = True
480         return {'response': {'resolved_location': resolved_location},
481                 'error': error}
482
483     def call_reservation_operation(self, ctx, arg):
484         result = True
485         reserved_candidates = None
486         method = arg["method"]
487         candidate_list = arg["candidate_list"]
488         reservation_name = arg["reservation_name"]
489         reservation_type = arg["reservation_type"]
490         controller = arg["controller"]
491         request = arg["request"]
492
493         if controller == "SDN-C":
494             results = self.sc_ext_manager.map_method(
495                 'call_reservation_operation',
496                 method=method,
497                 candidate_list=candidate_list,
498                 reservation_name=reservation_name,
499                 reservation_type=reservation_type,
500                 request=request
501             )
502             if results and len(results) > 0:
503                 reserved_candidates = results[0]
504         else:
505             LOG.error(_LE("Unknown service controller: {}").format(controller))
506         if reserved_candidates is None or not reserved_candidates:
507             result = False
508             LOG.debug(
509                 _LW("Unable to {} for "
510                     "candidate {}.").format(method, reserved_candidates))
511             return {'response': result,
512                     'error': not result}
513         else:
514             LOG.debug("{} for the candidate: "
515                       "{}".format(method, reserved_candidates))
516             return {'response': result,
517                     'error': not result}
518
519     # def do_something(self, ctx, arg):
520     #     """RPC endpoint for data messages
521     #
522     #     When another service sends a notification over the message
523     #     bus, this method receives it.
524     #     """
525     #     LOG.debug("Got a message!")
526     #
527     #     res = {
528     #         'note': 'do_something called!',
529     #         'arg': str(arg),
530     #     }
531     #     return {'response': res, 'error': False}