2b3c597dbf4a246ff1b582c1e70f20300526ac76
[dcaegen2/platform/cli.git] / dcae-cli / dcae_cli / util / discovery.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # -*- coding: utf-8 -*-
22 """
23 Provides Consul helper functions
24 """
25 import re
26 import json
27 import contextlib
28 from collections import defaultdict
29 from itertools import chain
30 from functools import partial
31 from uuid import uuid4
32
33 import six
34 from copy import deepcopy
35 from consul import Consul
36
37 from dcae_cli.util.logger import get_logger
38 from dcae_cli.util.exc import DcaeException
39 from dcae_cli.util.profiles import get_profile
40 from dcae_cli.util.config import get_docker_logins_key
41
42
43 logger = get_logger('Discovery')
44
45 active_profile = get_profile()
46 consul_host = active_profile.consul_host
47 # NOTE: Removed the suffix completely. The useful piece of the suffix was the
48 # location but it was implemented in a static fashion (hardcoded). Rather than
49 # enhancing the existing approach and making the suffix dynamic (to support
50 # "rework-central" and "solutioning"), the thinking is to revisit this name stuff
51 # and use Consul's query interface so that location is a tag attribute.
52 _inst_re = re.compile(r"^(?P<user>[^.]*).(?P<hash>[^.]*).(?P<ver>\d+-\d+-\d+).(?P<comp>.*)$")
53
54
55 class DiscoveryError(DcaeException):
56     pass
57
58 class DiscoveryNoDownstreamComponentError(DiscoveryError):
59     pass
60
61
62 def replace_dots(comp_name, reverse=False):
63     '''Converts dots to dashes to prevent downstream users of Consul from exploding'''
64     if not reverse:
65         return comp_name.replace('.', '-')
66     else:
67         return comp_name.replace('-', '.')
68
69 # Utility functions for using Consul
70
71 def _is_healthy_pure(get_health_func, instance):
72     """Checks to see if a component instance is running healthy
73
74     Pure function edition
75
76     Args
77     ----
78     get_health_func: func(string) -> complex object
79         Look at unittests in test_discovery to see examples
80     instance: (string) fully qualified name of component instance
81
82     Returns
83     -------
84     True if instance has been found and is healthy else False
85     """
86     index, resp = get_health_func(instance)
87
88     if resp:
89         def is_passing(instance):
90             return all([check["Status"] == "passing" for check in instance["Checks"]])
91         return any([is_passing(instance) for instance in resp])
92     else:
93         return False
94
95 def is_healthy(consul_host, instance):
96     """Checks to see if a component instance is running healthy
97
98     Impure function edition
99
100     Args
101     ----
102     consul_host: (string) host string of Consul
103     instance: (string) fully qualified name of component instance
104
105     Returns
106     -------
107     True if instance has been found and is healthy else False
108     """
109     cons = Consul(consul_host)
110     return _is_healthy_pure(cons.health.service, instance)
111
112 def _get_instances_from_kv(get_from_kv_func, user):
113     """Get component instances from kv store
114
115     Deployed component instances get entries in a kv store to store configuration
116     information. This is a way to source a list of component instances that were
117     attempted to run. A component could have deployed but failed to register itself.
118     The only trace of that deployment would be checking the kv store.
119
120     Args
121     ----
122     get_from_kv_func: func(string, boolean) -> (don't care, list of dicts)
123         Look at unittests in test_discovery to see examples
124     user: (string) user id
125
126     Returns
127     -------
128     List of unique component instance names
129     """
130     # Keys from KV contain rels key entries and non-rels key entries. Keep the
131     # rels key entries but remove the ":rel" suffix because we are paranoid that
132     # this could exist without the other
133     _, instances_kv = get_from_kv_func(user, recurse=True)
134     return [] if instances_kv is None \
135             else list(set([ dd["Key"].replace(":rel", "") for dd in instances_kv ]))
136
137 def _get_instances_from_catalog(get_from_catalog_func, user):
138     """Get component instances from catalog
139
140     Fetching instances from the catalog covers the deployment cases where
141     components registered successfully regardless of their health check status.
142
143     Args
144     ----
145     get_from_catalog_func: func() -> (don't care, dict)
146         Look at unittests in test_discovery to see examples
147     user: (string) user id
148
149     Returns
150     -------
151     List of unique component instance names
152     """
153     # Get all services and filter here by user
154     response = get_from_catalog_func()
155     return list(set([ instance for instance in response[1].keys() if user in instance ]))
156
157 def _merge_instances(user, *get_funcs):
158     """Merge the result of an arbitrary list of get instance function calls
159
160     Args
161     ----
162     user: (string) user id
163     get_funcs: func(string) -> list of strings
164         Functions that take in a user parameter to output a list of instance
165         names
166
167     Returns
168     -------
169     List of unique component instance names
170     """
171     return list(set(chain.from_iterable([ get_func(user) for get_func in get_funcs ])))
172
173 def _get_instances(consul_host, user):
174     """Get all deployed component instances for a given user
175
176     Sourced from multiple places to ensure we get a complete list of all
177     component instances no matter what state they are in.
178
179     Args
180     ----
181     consul_host: (string) host string of Consul
182     user: (string) user id
183
184     Returns
185     -------
186     List of unique component instance names
187     """
188     cons = Consul(consul_host)
189
190     get_instances_from_kv = partial(_get_instances_from_kv, cons.kv.get)
191     get_instances_from_catalog = partial(_get_instances_from_catalog, cons.catalog.services)
192
193     return _merge_instances(user, get_instances_from_kv, get_instances_from_catalog)
194
195
196 # Custom (sometimes higher order) "discovery" functionality
197
198 def _make_instances_map(instances):
199     """Make an instance map
200
201     Instance map is a dict where the keys are tuples (component type, component version)
202     that map to a set of strings that are instance names.
203     """
204     mapping = defaultdict(set)
205     for instance in instances:
206         match = _inst_re.match(instance)
207         if match is None:
208             continue
209
210         _, _, ver, comp = match.groups()
211         cname = replace_dots(comp, reverse=True)
212         version = replace_dots(ver, reverse=True)
213         key = (cname, version)
214         mapping[key].add(instance)
215     return mapping
216
217
218 def get_user_instances(user, consul_host=consul_host, filter_instances_func=is_healthy):
219     '''Get a user's instance map
220
221     Args:
222     -----
223     filter_instances_func: fn(consul_host, instance) -> boolean
224         Function used to filter instances. Default is is_healthy
225
226     Returns:
227     --------
228     Dict whose keys are component (name,version) tuples and values are list of component instance names
229     '''
230     filter_func = partial(filter_instances_func, consul_host)
231     instances = list(filter(filter_func, _get_instances(consul_host, user)))
232
233     return _make_instances_map(instances)
234
235
236 def _get_component_instances(filter_instances_func, user, cname, cver, consul_host):
237     """Get component instances that are filtered
238
239     Args:
240     -----
241     filter_instances_func: fn(consul_host, instance) -> boolean
242         Function used to filter instances
243
244     Returns
245     -------
246     List of strings where the strings are fully qualified instance names
247     """
248     instance_map = get_user_instances(user, consul_host=consul_host,
249             filter_instances_func=filter_instances_func)
250
251     # REVIEW: We don't restrict component names from using dashes. We do
252     # transform names with dots to use dashes for domain segmenting reasons.
253     # Instance map creation always reverses that making dashes to dots even though
254     # the component name may have dashes. Thus always search for instances by
255     # a dotted component name. We are open to a collision but that is low chance
256     # - someone has to use the same name in dotted and dashed form which is weird.
257     cname_dashless = replace_dots(cname, reverse=True)
258
259     # WATCH: instances_map.get returns set. Force to be list to have consistent
260     # return
261     return list(instance_map.get((cname_dashless, cver), []))
262
263 def get_healthy_instances(user, cname, cver, consul_host=consul_host):
264     """Lists healthy instances of a particular component for a given user
265
266     Returns
267     -------
268     List of strings where the strings are fully qualified instance names
269     """
270     return _get_component_instances(is_healthy, user, cname, cver, consul_host)
271
272 def get_defective_instances(user, cname, cver, consul_host=consul_host):
273     """Lists *not* running instances of a particular component for a given user
274
275     This means that there are component instances that are sitting out there
276     deployed but not successfully running.
277
278     Returns
279     -------
280     List of strings where the strings are fully qualified instance names
281     """
282     def is_not_healthy(consul_host, component):
283         return not is_healthy(consul_host, component)
284
285     return _get_component_instances(is_not_healthy, user, cname, cver, consul_host)
286
287
288 def lookup_instance(consul_host, name):
289     """Query Consul for service details"""
290     cons = Consul(consul_host)
291     index, results = cons.catalog.service(name)
292     return results
293
294 def parse_instance_lookup(results):
295     """Parse the resultset from lookup_instance
296
297     Returns:
298     --------
299     String in host form <address>:<port>
300     """
301     if results:
302         # Just grab first
303         result = results[0]
304         return "{address}:{port}".format(address=result["ServiceAddress"],
305                 port=result["ServicePort"])
306     else:
307         return
308
309
310 def _create_rels_key(config_key):
311     """Create rels key from config key
312
313     Assumes config_key is well-formed"""
314     return "{:}:rel".format(config_key)
315
316
317 def _create_dmaap_key(config_key):
318     """Create dmaap key from config key
319
320     Assumes config_key is well-formed"""
321     return "{:}:dmaap".format(config_key)
322
323
324 def clear_user_instances(user, host=consul_host):
325     '''Removes all Consul key:value entries for a given user'''
326     cons = Consul(host)
327     cons.kv.delete(user, recurse=True)
328
329
330 _multiple_compat_msg = '''Component '{cname}' config_key '{ckey}' has multiple compatible downstream \
331 components: {compat}. The current infrastructure can only support interacing with a single component. \
332 Only downstream component '{chosen}' will be connected.'''
333
334 _no_compat_msg = "Component '{cname}' config_key '{ckey}' has no compatible downstream components."
335
336 _no_inst_msg = '''Component '{cname}' config_key '{ckey}' is compatible with downstream component '{chosen}' \
337 however there are no instances available for connecting.'''
338
339
340 def _cfmt(*args):
341     '''Returns a string formatted representation for a component and version'''
342     if len(args) == 1:
343         return ':'.join(args[0])
344     elif len(args) == 2:
345         return ':'.join(args)
346     else:
347         raise DiscoveryError('Input should be name, version or (name, version)')
348
349
350 def _get_downstream(cname, cver, config_key, compat_comps, instance_map,
351         force=False):
352     '''
353     Returns a component type and its instances to use for a given config key
354
355     Parameters
356     ----------
357     cname : string
358         Name of the upstream component
359     cver : string
360         Version of the upstream component
361     config_key : string
362         Mainly used for populating warnings meaningfully
363     compat_comps : dict
364         A list of component (name, version) tuples
365     instance_map : dict
366         A dict whose keys are component (name, version) tuples and values are a list of instance names
367     '''
368     if not compat_comps:
369         conn_comp = ('', '')
370         logger.warning(_no_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key))
371     else:
372         conn_comp = six.next(iter(compat_comps))
373         if len(compat_comps) > 1:
374             logger.warning(_multiple_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key,
375                                                        compat=list(map(_cfmt, compat_comps)), chosen=_cfmt(conn_comp)))
376     if all(conn_comp):
377         instances = instance_map.get(conn_comp, tuple())
378         if not instances:
379             if force:
380                 logger.warning(_no_inst_msg.format(cname=_cfmt(cname, cver), \
381                         ckey=config_key, chosen=_cfmt(conn_comp)))
382             else:
383                 logger.error(_no_inst_msg.format(cname=_cfmt(cname, cver), \
384                         ckey=config_key, chosen=_cfmt(conn_comp)))
385                 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
386     else:
387         instances = tuple()
388
389     return conn_comp, instances
390
391
392 def create_config(user, cname, cver, params, interface_map, instance_map, dmaap_map,
393         instance_prefix=None, force=False):
394     '''
395     Creates a config and corresponding rels entries in Consul. Returns the Consul the keys and entries.
396
397     Parameters
398     ----------
399     user : string
400         The user namespace to create the config and rels under. E.g. user.foo.bar...
401     cname : string
402         Name of the upstream component
403     cver : string
404         Version of the upstream component
405     params : dict
406         Parameters of the component, taken directly from the component specification
407     interface_map : dict
408         A dict mapping the config_key of published streams and/or called services to a list of compatible
409         component types and versions
410     instance_map : dict
411         A dict mapping component types and versions to a list of instances currently running
412     dmaap_map : dict
413         A dict that contains config key to dmaap information. This map is checked
414         first before checking the instance_map which means before checking for
415         direct http components.
416     instance_prefix : string, optional
417         The unique prefix to associate with the component instance whose config is being created
418     force: string, optional
419         Config will continue to be created even if there are no downstream compatible
420         component when this flag is set to True. Default is False.
421     '''
422     inst_pref = str(uuid4()) if instance_prefix is None else instance_prefix
423     conf_key = "{:}.{:}.{:}.{:}".format(user, inst_pref, replace_dots(cver), replace_dots(cname))
424     rels_key = _create_rels_key(conf_key)
425     dmaap_key = _create_dmaap_key(conf_key)
426
427     conf = params.copy()
428     rels = list()
429
430     # NOTE: The dmaap_map entries are broken up between the templetized config
431     # and the dmaap json in Consul
432     for config_key, dmaap_goodies in six.iteritems(dmaap_map):
433         conf[config_key] = deepcopy(dmaap_map[config_key])
434         # Here comes the magic. << >> signifies dmaap to downstream config
435         # binding service.
436         conf[config_key]["dmaap_info"] = "<<{:}>>".format(config_key)
437
438     # NOTE: The interface_map may not contain *all* possible interfaces
439     # that may be connected with because the catalog.get_discovery call filters
440     # based upon neighbors. Essentailly the interface_map is being pre-filtered
441     # which is probably a latent bug.
442
443     for config_key, compat_types in six.iteritems(interface_map):
444         # Don't clobber config keys that have been set from above
445         if config_key not in conf:
446             conn_comp, instances = _get_downstream(cname, cver, config_key, \
447                     compat_types, instance_map, force=force)
448             conn_name, conn_ver = conn_comp
449             middle = ''
450
451             if conn_name and conn_ver:
452                 middle = "{:}.{:}".format(replace_dots(conn_ver), replace_dots(conn_name))
453             else:
454                 if not force:
455                     raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
456
457             config_val = '{{' + middle + '}}'
458             conf[config_key] = config_val
459             rels.extend(instances)
460
461     dmaap_map_just_info = { config_key: v["dmaap_info"]
462             for config_key, v in six.iteritems(dmaap_map) }
463     return conf_key, conf, rels_key, rels, dmaap_key, dmaap_map_just_info
464
465
466 def get_docker_logins(host=consul_host):
467     """Get Docker logins from Consul
468
469     Returns
470     -------
471     List of objects where the objects must be of the form
472         {"registry": .., "username":.., "password":.. }
473     """
474     key = get_docker_logins_key()
475     (index, val) = Consul(host).kv.get(key)
476
477     if val:
478         return json.loads(val['Value'].decode("utf-8"))
479     else:
480         return []
481
482
483 def push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host=consul_host):
484     '''Uploads the config and rels to Consul'''
485     cons = Consul(host)
486     for k, v in ((conf_key, conf), (rels_key, rels), (dmaap_key, dmaap_map)):
487         cons.kv.put(k, json.dumps(v))
488
489
490 def remove_config(config_key, host=consul_host):
491     """Deletes a config from Consul
492
493     Returns
494     -------
495     True when all artifacts have been successfully deleted else False
496     """
497     cons = Consul(host)
498     results = [ cons.kv.delete(k) for k in (config_key, _create_rels_key(config_key), \
499             _create_dmaap_key(config_key)) ]
500     return all(results)
501
502
503 def _group_config(config, config_key_map):
504     """Groups config by streams_publishes, streams_subscribes, services_calls"""
505     # Copy non streams and services first
506     grouped_conf = { k: v for k,v in six.iteritems(config)
507             if k not in config_key_map }
508
509     def group(group_name):
510         grouped_conf[group_name] = { k: v for k,v in six.iteritems(config)
511             if k in config_key_map and config_key_map[k]["group"] == group_name }
512
513     # Copy and group the streams and services
514     # Map returns iterator so must force running its course
515     list(map(group, ["streams_publishes", "streams_subscribes", "services_calls"]))
516     return grouped_conf
517
518
519 def _apply_inputs(config, inputs_map):
520     """Update configuration with inputs
521
522     This method updates the values of the configuration parameters using values
523     from the inputs map.
524     """
525     config.update(inputs_map)
526     return config
527
528
529 @contextlib.contextmanager
530 def config_context(user, cname, cver, params, interface_map, instance_map,
531         config_key_map, dmaap_map={}, inputs_map={}, instance_prefix=None,
532         host=consul_host, always_cleanup=True, force_config=False):
533     '''Convenience utility for creating configs and cleaning them up
534
535     Args
536     ----
537     always_cleanup: (boolean) This context manager will cleanup the produced config
538         context always if this is True. When False, cleanup will only occur upon any
539         exception getting thrown in the context manager block. Default is True.
540     force: (boolean)
541         Config will continue to be created even if there are no downstream compatible
542         component when this flag is set to True. Default is False.
543     '''
544     try:
545         conf_key, conf, rels_key, rels, dmaap_key, dmaap_map = create_config(
546                 user, cname, cver, params, interface_map, instance_map, dmaap_map,
547                 instance_prefix, force=force_config)
548
549         conf = _apply_inputs(conf, inputs_map)
550         conf = _group_config(conf, config_key_map)
551
552         push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host)
553         yield (conf_key, conf)
554     except Exception as e:
555         if not always_cleanup:
556             try:
557                 conf_key, rels_key, host
558             except UnboundLocalError:
559                 pass
560             else:
561                 remove_config(conf_key, host)
562
563         raise e
564     finally:
565         if always_cleanup:
566             try:
567                 conf_key, rels_key, host
568             except UnboundLocalError:
569                 pass
570             else:
571                 remove_config(conf_key, host)