Fix issue where getting empty consul in discovery
[dcaegen2/platform/cli.git] / dcae-cli / dcae_cli / util / discovery.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # -*- coding: utf-8 -*-
22 """
23 Provides Consul helper functions
24 """
25 import re
26 import json
27 import contextlib
28 from collections import defaultdict
29 from itertools import chain
30 from functools import partial
31 from uuid import uuid4
32
33 import six
34 from copy import deepcopy
35 from consul import Consul
36
37 from dcae_cli.util.logger import get_logger
38 from dcae_cli.util.exc import DcaeException
39 from dcae_cli.util.profiles import get_profile
40 from dcae_cli.util.config import get_docker_logins_key
41
42
43 logger = get_logger('Discovery')
44
45 # NOTE: Removed the suffix completely. The useful piece of the suffix was the
46 # location but it was implemented in a static fashion (hardcoded). Rather than
47 # enhancing the existing approach and making the suffix dynamic (to support
48 # "rework-central" and "solutioning"), the thinking is to revisit this name stuff
49 # and use Consul's query interface so that location is a tag attribute.
50 _inst_re = re.compile(r"^(?P<user>[^.]*).(?P<hash>[^.]*).(?P<ver>\d+-\d+-\d+).(?P<comp>.*)$")
51
52
53 class DiscoveryError(DcaeException):
54     pass
55
56 class DiscoveryNoDownstreamComponentError(DiscoveryError):
57     pass
58
59
60 def default_consul_host():
61     """Return default consul host
62
63     This method was created to purposefully make fetching the default lazier than
64     the previous impl. The previous impl had the default as a global variable and
65     thus requiring the configuration to be setup before doing anything further.
66     The pain point of that impl is in unit testing where now all code that
67     imported this module had a strict dependency upon the impure configuration.
68     """
69     return get_profile().consul_host
70
71
72 def _choose_consul_host(consul_host):
73     """Chooses the appropriate consul host
74
75     Chooses between a provided value and a default
76     """
77     return default_consul_host() if consul_host == None else consul_host
78
79
80 def replace_dots(comp_name, reverse=False):
81     '''Converts dots to dashes to prevent downstream users of Consul from exploding'''
82     if not reverse:
83         return comp_name.replace('.', '-')
84     else:
85         return comp_name.replace('-', '.')
86
87 # Utility functions for using Consul
88
89 def _is_healthy_pure(get_health_func, instance):
90     """Checks to see if a component instance is running healthy
91
92     Pure function edition
93
94     Args
95     ----
96     get_health_func: func(string) -> complex object
97         Look at unittests in test_discovery to see examples
98     instance: (string) fully qualified name of component instance
99
100     Returns
101     -------
102     True if instance has been found and is healthy else False
103     """
104     index, resp = get_health_func(instance)
105
106     if resp:
107         def is_passing(instance):
108             return all([check["Status"] == "passing" for check in instance["Checks"]])
109         return any([is_passing(instance) for instance in resp])
110     else:
111         return False
112
113 def is_healthy(consul_host, instance):
114     """Checks to see if a component instance is running healthy
115
116     Impure function edition
117
118     Args
119     ----
120     consul_host: (string) host string of Consul
121     instance: (string) fully qualified name of component instance
122
123     Returns
124     -------
125     True if instance has been found and is healthy else False
126     """
127     cons = Consul(consul_host)
128     return _is_healthy_pure(cons.health.service, instance)
129
130 def _get_instances_from_kv(get_from_kv_func, user):
131     """Get component instances from kv store
132
133     Deployed component instances get entries in a kv store to store configuration
134     information. This is a way to source a list of component instances that were
135     attempted to run. A component could have deployed but failed to register itself.
136     The only trace of that deployment would be checking the kv store.
137
138     Args
139     ----
140     get_from_kv_func: func(string, boolean) -> (don't care, list of dicts)
141         Look at unittests in test_discovery to see examples
142     user: (string) user id
143
144     Returns
145     -------
146     List of unique component instance names
147     """
148     # Keys from KV contain rels key entries and non-rels key entries. Keep the
149     # rels key entries but remove the ":rel" suffix because we are paranoid that
150     # this could exist without the other
151     _, instances_kv = get_from_kv_func(user, recurse=True)
152     return [] if instances_kv is None \
153             else list(set([ dd["Key"].replace(":rel", "") for dd in instances_kv ]))
154
155 def _get_instances_from_catalog(get_from_catalog_func, user):
156     """Get component instances from catalog
157
158     Fetching instances from the catalog covers the deployment cases where
159     components registered successfully regardless of their health check status.
160
161     Args
162     ----
163     get_from_catalog_func: func() -> (don't care, dict)
164         Look at unittests in test_discovery to see examples
165     user: (string) user id
166
167     Returns
168     -------
169     List of unique component instance names
170     """
171     # Get all services and filter here by user
172     response = get_from_catalog_func()
173     return list(set([ instance for instance in response[1].keys() if user in instance ]))
174
175 def _merge_instances(user, *get_funcs):
176     """Merge the result of an arbitrary list of get instance function calls
177
178     Args
179     ----
180     user: (string) user id
181     get_funcs: func(string) -> list of strings
182         Functions that take in a user parameter to output a list of instance
183         names
184
185     Returns
186     -------
187     List of unique component instance names
188     """
189     return list(set(chain.from_iterable([ get_func(user) for get_func in get_funcs ])))
190
191 def _get_instances(consul_host, user):
192     """Get all deployed component instances for a given user
193
194     Sourced from multiple places to ensure we get a complete list of all
195     component instances no matter what state they are in.
196
197     Args
198     ----
199     consul_host: (string) host string of Consul
200     user: (string) user id
201
202     Returns
203     -------
204     List of unique component instance names
205     """
206     cons = Consul(consul_host)
207
208     get_instances_from_kv = partial(_get_instances_from_kv, cons.kv.get)
209     get_instances_from_catalog = partial(_get_instances_from_catalog, cons.catalog.services)
210
211     return _merge_instances(user, get_instances_from_kv, get_instances_from_catalog)
212
213
214 # Custom (sometimes higher order) "discovery" functionality
215
216 def _make_instances_map(instances):
217     """Make an instance map
218
219     Instance map is a dict where the keys are tuples (component type, component version)
220     that map to a set of strings that are instance names.
221     """
222     mapping = defaultdict(set)
223     for instance in instances:
224         match = _inst_re.match(instance)
225         if match is None:
226             continue
227
228         _, _, ver, comp = match.groups()
229         cname = replace_dots(comp, reverse=True)
230         version = replace_dots(ver, reverse=True)
231         key = (cname, version)
232         mapping[key].add(instance)
233     return mapping
234
235
236 def get_user_instances(user, consul_host=None, filter_instances_func=is_healthy):
237     '''Get a user's instance map
238
239     Args:
240     -----
241     filter_instances_func: fn(consul_host, instance) -> boolean
242         Function used to filter instances. Default is is_healthy
243
244     Returns:
245     --------
246     Dict whose keys are component (name,version) tuples and values are list of component instance names
247     '''
248     consul_host = _choose_consul_host(consul_host)
249     filter_func = partial(filter_instances_func, consul_host)
250     instances = list(filter(filter_func, _get_instances(consul_host, user)))
251
252     return _make_instances_map(instances)
253
254
255 def _get_component_instances(filter_instances_func, user, cname, cver, consul_host):
256     """Get component instances that are filtered
257
258     Args:
259     -----
260     filter_instances_func: fn(consul_host, instance) -> boolean
261         Function used to filter instances
262
263     Returns
264     -------
265     List of strings where the strings are fully qualified instance names
266     """
267     instance_map = get_user_instances(user, consul_host=consul_host,
268             filter_instances_func=filter_instances_func)
269
270     # REVIEW: We don't restrict component names from using dashes. We do
271     # transform names with dots to use dashes for domain segmenting reasons.
272     # Instance map creation always reverses that making dashes to dots even though
273     # the component name may have dashes. Thus always search for instances by
274     # a dotted component name. We are open to a collision but that is low chance
275     # - someone has to use the same name in dotted and dashed form which is weird.
276     cname_dashless = replace_dots(cname, reverse=True)
277
278     # WATCH: instances_map.get returns set. Force to be list to have consistent
279     # return
280     return list(instance_map.get((cname_dashless, cver), []))
281
282 def get_healthy_instances(user, cname, cver, consul_host=None):
283     """Lists healthy instances of a particular component for a given user
284
285     Returns
286     -------
287     List of strings where the strings are fully qualified instance names
288     """
289     consul_host = _choose_consul_host(consul_host)
290     return _get_component_instances(is_healthy, user, cname, cver, consul_host)
291
292 def get_defective_instances(user, cname, cver, consul_host=None):
293     """Lists *not* running instances of a particular component for a given user
294
295     This means that there are component instances that are sitting out there
296     deployed but not successfully running.
297
298     Returns
299     -------
300     List of strings where the strings are fully qualified instance names
301     """
302     def is_not_healthy(consul_host, component):
303         return not is_healthy(consul_host, component)
304
305     consul_host = _choose_consul_host(consul_host)
306     return _get_component_instances(is_not_healthy, user, cname, cver, consul_host)
307
308
309 def lookup_instance(consul_host, name):
310     """Query Consul for service details"""
311     cons = Consul(consul_host)
312     index, results = cons.catalog.service(name)
313     return results
314
315 def parse_instance_lookup(results):
316     """Parse the resultset from lookup_instance
317
318     Returns:
319     --------
320     String in host form <address>:<port>
321     """
322     if results:
323         # Just grab first
324         result = results[0]
325         return "{address}:{port}".format(address=result["ServiceAddress"],
326                 port=result["ServicePort"])
327     else:
328         return
329
330
331 def _create_rels_key(config_key):
332     """Create rels key from config key
333
334     Assumes config_key is well-formed"""
335     return "{:}:rel".format(config_key)
336
337
338 def _create_dmaap_key(config_key):
339     """Create dmaap key from config key
340
341     Assumes config_key is well-formed"""
342     return "{:}:dmaap".format(config_key)
343
344
345 def clear_user_instances(user, host=None):
346     '''Removes all Consul key:value entries for a given user'''
347     host = _choose_consul_host(host)
348     cons = Consul(host)
349     cons.kv.delete(user, recurse=True)
350
351
352 _multiple_compat_msg = '''Component '{cname}' config_key '{ckey}' has multiple compatible downstream \
353 components: {compat}. The current infrastructure can only support interacing with a single component. \
354 Only downstream component '{chosen}' will be connected.'''
355
356 _no_compat_msg = "Component '{cname}' config_key '{ckey}' has no compatible downstream components."
357
358 _no_inst_msg = '''Component '{cname}' config_key '{ckey}' is compatible with downstream component '{chosen}' \
359 however there are no instances available for connecting.'''
360
361
362 def _cfmt(*args):
363     '''Returns a string formatted representation for a component and version'''
364     if len(args) == 1:
365         return ':'.join(args[0])
366     elif len(args) == 2:
367         return ':'.join(args)
368     else:
369         raise DiscoveryError('Input should be name, version or (name, version)')
370
371
372 def _get_downstream(cname, cver, config_key, compat_comps, instance_map,
373         force=False):
374     '''
375     Returns a component type and its instances to use for a given config key
376
377     Parameters
378     ----------
379     cname : string
380         Name of the upstream component
381     cver : string
382         Version of the upstream component
383     config_key : string
384         Mainly used for populating warnings meaningfully
385     compat_comps : dict
386         A list of component (name, version) tuples
387     instance_map : dict
388         A dict whose keys are component (name, version) tuples and values are a list of instance names
389     '''
390     if not compat_comps:
391         conn_comp = ('', '')
392         logger.warning(_no_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key))
393     else:
394         conn_comp = six.next(iter(compat_comps))
395         if len(compat_comps) > 1:
396             logger.warning(_multiple_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key,
397                                                        compat=list(map(_cfmt, compat_comps)), chosen=_cfmt(conn_comp)))
398     if all(conn_comp):
399         instances = instance_map.get(conn_comp, tuple())
400         if not instances:
401             if force:
402                 logger.warning(_no_inst_msg.format(cname=_cfmt(cname, cver), \
403                         ckey=config_key, chosen=_cfmt(conn_comp)))
404             else:
405                 logger.error(_no_inst_msg.format(cname=_cfmt(cname, cver), \
406                         ckey=config_key, chosen=_cfmt(conn_comp)))
407                 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
408     else:
409         instances = tuple()
410
411     return conn_comp, instances
412
413
414 def create_config(user, cname, cver, params, interface_map, instance_map, dmaap_map,
415         instance_prefix=None, force=False):
416     '''
417     Creates a config and corresponding rels entries in Consul. Returns the Consul the keys and entries.
418
419     Parameters
420     ----------
421     user : string
422         The user namespace to create the config and rels under. E.g. user.foo.bar...
423     cname : string
424         Name of the upstream component
425     cver : string
426         Version of the upstream component
427     params : dict
428         Parameters of the component, taken directly from the component specification
429     interface_map : dict
430         A dict mapping the config_key of published streams and/or called services to a list of compatible
431         component types and versions
432     instance_map : dict
433         A dict mapping component types and versions to a list of instances currently running
434     dmaap_map : dict
435         A dict that contains config key to dmaap information. This map is checked
436         first before checking the instance_map which means before checking for
437         direct http components.
438     instance_prefix : string, optional
439         The unique prefix to associate with the component instance whose config is being created
440     force: string, optional
441         Config will continue to be created even if there are no downstream compatible
442         component when this flag is set to True. Default is False.
443     '''
444     inst_pref = str(uuid4()) if instance_prefix is None else instance_prefix
445     conf_key = "{:}.{:}.{:}.{:}".format(user, inst_pref, replace_dots(cver), replace_dots(cname))
446     rels_key = _create_rels_key(conf_key)
447     dmaap_key = _create_dmaap_key(conf_key)
448
449     conf = params.copy()
450     rels = list()
451
452     # NOTE: The dmaap_map entries are broken up between the templetized config
453     # and the dmaap json in Consul
454     for config_key, dmaap_goodies in six.iteritems(dmaap_map):
455         conf[config_key] = deepcopy(dmaap_map[config_key])
456         # Here comes the magic. << >> signifies dmaap to downstream config
457         # binding service.
458         conf[config_key]["dmaap_info"] = "<<{:}>>".format(config_key)
459
460     # NOTE: The interface_map may not contain *all* possible interfaces
461     # that may be connected with because the catalog.get_discovery call filters
462     # based upon neighbors. Essentailly the interface_map is being pre-filtered
463     # which is probably a latent bug.
464
465     for config_key, compat_types in six.iteritems(interface_map):
466         # Don't clobber config keys that have been set from above
467         if config_key not in conf:
468             conn_comp, instances = _get_downstream(cname, cver, config_key, \
469                     compat_types, instance_map, force=force)
470             conn_name, conn_ver = conn_comp
471             middle = ''
472
473             if conn_name and conn_ver:
474                 middle = "{:}.{:}".format(replace_dots(conn_ver), replace_dots(conn_name))
475             else:
476                 if not force:
477                     raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
478
479             config_val = '{{' + middle + '}}'
480             conf[config_key] = config_val
481             rels.extend(instances)
482
483     dmaap_map_just_info = { config_key: v["dmaap_info"]
484             for config_key, v in six.iteritems(dmaap_map) }
485     return conf_key, conf, rels_key, rels, dmaap_key, dmaap_map_just_info
486
487
488 def get_docker_logins(host=None):
489     """Get Docker logins from Consul
490
491     Returns
492     -------
493     List of objects where the objects must be of the form
494         {"registry": .., "username":.., "password":.. }
495     """
496     key = get_docker_logins_key()
497     host = _choose_consul_host(host)
498     (index, val) = Consul(host).kv.get(key)
499
500     if val:
501         return json.loads(val['Value'].decode("utf-8"))
502     else:
503         return []
504
505
506 def push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host=None):
507     '''Uploads the config and rels to Consul'''
508     host = _choose_consul_host(host)
509     cons = Consul(host)
510     for k, v in ((conf_key, conf), (rels_key, rels), (dmaap_key, dmaap_map)):
511         cons.kv.put(k, json.dumps(v))
512
513
514 def remove_config(config_key, host=None):
515     """Deletes a config from Consul
516
517     Returns
518     -------
519     True when all artifacts have been successfully deleted else False
520     """
521     host = _choose_consul_host(host)
522     cons = Consul(host)
523     results = [ cons.kv.delete(k) for k in (config_key, _create_rels_key(config_key), \
524             _create_dmaap_key(config_key)) ]
525     return all(results)
526
527
528 def _group_config(config, config_key_map):
529     """Groups config by streams_publishes, streams_subscribes, services_calls"""
530     # Copy non streams and services first
531     grouped_conf = { k: v for k,v in six.iteritems(config)
532             if k not in config_key_map }
533
534     def group(group_name):
535         grouped_conf[group_name] = { k: v for k,v in six.iteritems(config)
536             if k in config_key_map and config_key_map[k]["group"] == group_name }
537
538     # Copy and group the streams and services
539     # Map returns iterator so must force running its course
540     list(map(group, ["streams_publishes", "streams_subscribes", "services_calls"]))
541     return grouped_conf
542
543
544 def _apply_inputs(config, inputs_map):
545     """Update configuration with inputs
546
547     This method updates the values of the configuration parameters using values
548     from the inputs map.
549     """
550     config.update(inputs_map)
551     return config
552
553
554 @contextlib.contextmanager
555 def config_context(user, cname, cver, params, interface_map, instance_map,
556         config_key_map, dmaap_map={}, inputs_map={}, instance_prefix=None,
557         host=None, always_cleanup=True, force_config=False):
558     '''Convenience utility for creating configs and cleaning them up
559
560     Args
561     ----
562     always_cleanup: (boolean) This context manager will cleanup the produced config
563         context always if this is True. When False, cleanup will only occur upon any
564         exception getting thrown in the context manager block. Default is True.
565     force: (boolean)
566         Config will continue to be created even if there are no downstream compatible
567         component when this flag is set to True. Default is False.
568     '''
569     host = _choose_consul_host(host)
570
571     try:
572         conf_key, conf, rels_key, rels, dmaap_key, dmaap_map = create_config(
573                 user, cname, cver, params, interface_map, instance_map, dmaap_map,
574                 instance_prefix, force=force_config)
575
576         conf = _apply_inputs(conf, inputs_map)
577         conf = _group_config(conf, config_key_map)
578
579         push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host)
580         yield (conf_key, conf)
581     except Exception as e:
582         if not always_cleanup:
583             try:
584                 conf_key, rels_key, host
585             except UnboundLocalError:
586                 pass
587             else:
588                 remove_config(conf_key, host)
589
590         raise e
591     finally:
592         if always_cleanup:
593             try:
594                 conf_key, rels_key, host
595             except UnboundLocalError:
596                 pass
597             else:
598                 remove_config(conf_key, host)