Fetch docker logins from Consul
[dcaegen2/platform/cli.git] / dcae-cli / dcae_cli / util / discovery.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
20
21 # -*- coding: utf-8 -*-
22 """
23 Provides Consul helper functions
24 """
25 import re
26 import json
27 import contextlib
28 from collections import defaultdict
29 from itertools import chain
30 from functools import partial
31 from uuid import uuid4
32
33 import six
34 from copy import deepcopy
35 from consul import Consul
36
37 from dcae_cli.util.logger import get_logger
38 from dcae_cli.util.exc import DcaeException
39 from dcae_cli.util.profiles import get_profile
40
41
42 logger = get_logger('Discovery')
43
44 active_profile = get_profile()
45 consul_host = active_profile.consul_host
46 # NOTE: Removed the suffix completely. The useful piece of the suffix was the
47 # location but it was implemented in a static fashion (hardcoded). Rather than
48 # enhancing the existing approach and making the suffix dynamic (to support
49 # "rework-central" and "solutioning"), the thinking is to revisit this name stuff
50 # and use Consul's query interface so that location is a tag attribute.
51 _inst_re = re.compile(r"^(?P<user>[^.]*).(?P<hash>[^.]*).(?P<ver>\d+-\d+-\d+).(?P<comp>.*)$")
52
53
54 class DiscoveryError(DcaeException):
55     pass
56
57 class DiscoveryNoDownstreamComponentError(DiscoveryError):
58     pass
59
60
61 def replace_dots(comp_name, reverse=False):
62     '''Converts dots to dashes to prevent downstream users of Consul from exploding'''
63     if not reverse:
64         return comp_name.replace('.', '-')
65     else:
66         return comp_name.replace('-', '.')
67
68 # Utility functions for using Consul
69
70 def _is_healthy_pure(get_health_func, instance):
71     """Checks to see if a component instance is running healthy
72
73     Pure function edition
74
75     Args
76     ----
77     get_health_func: func(string) -> complex object
78         Look at unittests in test_discovery to see examples
79     instance: (string) fully qualified name of component instance
80
81     Returns
82     -------
83     True if instance has been found and is healthy else False
84     """
85     index, resp = get_health_func(instance)
86
87     if resp:
88         def is_passing(instance):
89             return all([check["Status"] == "passing" for check in instance["Checks"]])
90         return any([is_passing(instance) for instance in resp])
91     else:
92         return False
93
94 def is_healthy(consul_host, instance):
95     """Checks to see if a component instance is running healthy
96
97     Impure function edition
98
99     Args
100     ----
101     consul_host: (string) host string of Consul
102     instance: (string) fully qualified name of component instance
103
104     Returns
105     -------
106     True if instance has been found and is healthy else False
107     """
108     cons = Consul(consul_host)
109     return _is_healthy_pure(cons.health.service, instance)
110
111 def _get_instances_from_kv(get_from_kv_func, user):
112     """Get component instances from kv store
113
114     Deployed component instances get entries in a kv store to store configuration
115     information. This is a way to source a list of component instances that were
116     attempted to run. A component could have deployed but failed to register itself.
117     The only trace of that deployment would be checking the kv store.
118
119     Args
120     ----
121     get_from_kv_func: func(string, boolean) -> (don't care, list of dicts)
122         Look at unittests in test_discovery to see examples
123     user: (string) user id
124
125     Returns
126     -------
127     List of unique component instance names
128     """
129     # Keys from KV contain rels key entries and non-rels key entries. Keep the
130     # rels key entries but remove the ":rel" suffix because we are paranoid that
131     # this could exist without the other
132     _, instances_kv = get_from_kv_func(user, recurse=True)
133     return [] if instances_kv is None \
134             else list(set([ dd["Key"].replace(":rel", "") for dd in instances_kv ]))
135
136 def _get_instances_from_catalog(get_from_catalog_func, user):
137     """Get component instances from catalog
138
139     Fetching instances from the catalog covers the deployment cases where
140     components registered successfully regardless of their health check status.
141
142     Args
143     ----
144     get_from_catalog_func: func() -> (don't care, dict)
145         Look at unittests in test_discovery to see examples
146     user: (string) user id
147
148     Returns
149     -------
150     List of unique component instance names
151     """
152     # Get all services and filter here by user
153     response = get_from_catalog_func()
154     return list(set([ instance for instance in response[1].keys() if user in instance ]))
155
156 def _merge_instances(user, *get_funcs):
157     """Merge the result of an arbitrary list of get instance function calls
158
159     Args
160     ----
161     user: (string) user id
162     get_funcs: func(string) -> list of strings
163         Functions that take in a user parameter to output a list of instance
164         names
165
166     Returns
167     -------
168     List of unique component instance names
169     """
170     return list(set(chain.from_iterable([ get_func(user) for get_func in get_funcs ])))
171
172 def _get_instances(consul_host, user):
173     """Get all deployed component instances for a given user
174
175     Sourced from multiple places to ensure we get a complete list of all
176     component instances no matter what state they are in.
177
178     Args
179     ----
180     consul_host: (string) host string of Consul
181     user: (string) user id
182
183     Returns
184     -------
185     List of unique component instance names
186     """
187     cons = Consul(consul_host)
188
189     get_instances_from_kv = partial(_get_instances_from_kv, cons.kv.get)
190     get_instances_from_catalog = partial(_get_instances_from_catalog, cons.catalog.services)
191
192     return _merge_instances(user, get_instances_from_kv, get_instances_from_catalog)
193
194
195 # Custom (sometimes higher order) "discovery" functionality
196
197 def _make_instances_map(instances):
198     """Make an instance map
199
200     Instance map is a dict where the keys are tuples (component type, component version)
201     that map to a set of strings that are instance names.
202     """
203     mapping = defaultdict(set)
204     for instance in instances:
205         match = _inst_re.match(instance)
206         if match is None:
207             continue
208
209         _, _, ver, comp = match.groups()
210         cname = replace_dots(comp, reverse=True)
211         version = replace_dots(ver, reverse=True)
212         key = (cname, version)
213         mapping[key].add(instance)
214     return mapping
215
216
217 def get_user_instances(user, consul_host=consul_host, filter_instances_func=is_healthy):
218     '''Get a user's instance map
219
220     Args:
221     -----
222     filter_instances_func: fn(consul_host, instance) -> boolean
223         Function used to filter instances. Default is is_healthy
224
225     Returns:
226     --------
227     Dict whose keys are component (name,version) tuples and values are list of component instance names
228     '''
229     filter_func = partial(filter_instances_func, consul_host)
230     instances = list(filter(filter_func, _get_instances(consul_host, user)))
231
232     return _make_instances_map(instances)
233
234
235 def _get_component_instances(filter_instances_func, user, cname, cver, consul_host):
236     """Get component instances that are filtered
237
238     Args:
239     -----
240     filter_instances_func: fn(consul_host, instance) -> boolean
241         Function used to filter instances
242
243     Returns
244     -------
245     List of strings where the strings are fully qualified instance names
246     """
247     instance_map = get_user_instances(user, consul_host=consul_host,
248             filter_instances_func=filter_instances_func)
249
250     # REVIEW: We don't restrict component names from using dashes. We do
251     # transform names with dots to use dashes for domain segmenting reasons.
252     # Instance map creation always reverses that making dashes to dots even though
253     # the component name may have dashes. Thus always search for instances by
254     # a dotted component name. We are open to a collision but that is low chance
255     # - someone has to use the same name in dotted and dashed form which is weird.
256     cname_dashless = replace_dots(cname, reverse=True)
257
258     # WATCH: instances_map.get returns set. Force to be list to have consistent
259     # return
260     return list(instance_map.get((cname_dashless, cver), []))
261
262 def get_healthy_instances(user, cname, cver, consul_host=consul_host):
263     """Lists healthy instances of a particular component for a given user
264
265     Returns
266     -------
267     List of strings where the strings are fully qualified instance names
268     """
269     return _get_component_instances(is_healthy, user, cname, cver, consul_host)
270
271 def get_defective_instances(user, cname, cver, consul_host=consul_host):
272     """Lists *not* running instances of a particular component for a given user
273
274     This means that there are component instances that are sitting out there
275     deployed but not successfully running.
276
277     Returns
278     -------
279     List of strings where the strings are fully qualified instance names
280     """
281     def is_not_healthy(consul_host, component):
282         return not is_healthy(consul_host, component)
283
284     return _get_component_instances(is_not_healthy, user, cname, cver, consul_host)
285
286
287 def lookup_instance(consul_host, name):
288     """Query Consul for service details"""
289     cons = Consul(consul_host)
290     index, results = cons.catalog.service(name)
291     return results
292
293 def parse_instance_lookup(results):
294     """Parse the resultset from lookup_instance
295
296     Returns:
297     --------
298     String in host form <address>:<port>
299     """
300     if results:
301         # Just grab first
302         result = results[0]
303         return "{address}:{port}".format(address=result["ServiceAddress"],
304                 port=result["ServicePort"])
305     else:
306         return
307
308
309 def _create_rels_key(config_key):
310     """Create rels key from config key
311
312     Assumes config_key is well-formed"""
313     return "{:}:rel".format(config_key)
314
315
316 def _create_dmaap_key(config_key):
317     """Create dmaap key from config key
318
319     Assumes config_key is well-formed"""
320     return "{:}:dmaap".format(config_key)
321
322
323 def clear_user_instances(user, host=consul_host):
324     '''Removes all Consul key:value entries for a given user'''
325     cons = Consul(host)
326     cons.kv.delete(user, recurse=True)
327
328
329 _multiple_compat_msg = '''Component '{cname}' config_key '{ckey}' has multiple compatible downstream \
330 components: {compat}. The current infrastructure can only support interacing with a single component. \
331 Only downstream component '{chosen}' will be connected.'''
332
333 _no_compat_msg = "Component '{cname}' config_key '{ckey}' has no compatible downstream components."
334
335 _no_inst_msg = '''Component '{cname}' config_key '{ckey}' is compatible with downstream component '{chosen}' \
336 however there are no instances available for connecting.'''
337
338
339 def _cfmt(*args):
340     '''Returns a string formatted representation for a component and version'''
341     if len(args) == 1:
342         return ':'.join(args[0])
343     elif len(args) == 2:
344         return ':'.join(args)
345     else:
346         raise DiscoveryError('Input should be name, version or (name, version)')
347
348
349 def _get_downstream(cname, cver, config_key, compat_comps, instance_map,
350         force=False):
351     '''
352     Returns a component type and its instances to use for a given config key
353
354     Parameters
355     ----------
356     cname : string
357         Name of the upstream component
358     cver : string
359         Version of the upstream component
360     config_key : string
361         Mainly used for populating warnings meaningfully
362     compat_comps : dict
363         A list of component (name, version) tuples
364     instance_map : dict
365         A dict whose keys are component (name, version) tuples and values are a list of instance names
366     '''
367     if not compat_comps:
368         conn_comp = ('', '')
369         logger.warning(_no_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key))
370     else:
371         conn_comp = six.next(iter(compat_comps))
372         if len(compat_comps) > 1:
373             logger.warning(_multiple_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key,
374                                                        compat=list(map(_cfmt, compat_comps)), chosen=_cfmt(conn_comp)))
375     if all(conn_comp):
376         instances = instance_map.get(conn_comp, tuple())
377         if not instances:
378             if force:
379                 logger.warning(_no_inst_msg.format(cname=_cfmt(cname, cver), \
380                         ckey=config_key, chosen=_cfmt(conn_comp)))
381             else:
382                 logger.error(_no_inst_msg.format(cname=_cfmt(cname, cver), \
383                         ckey=config_key, chosen=_cfmt(conn_comp)))
384                 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
385     else:
386         instances = tuple()
387
388     return conn_comp, instances
389
390
391 def create_config(user, cname, cver, params, interface_map, instance_map, dmaap_map,
392         instance_prefix=None, force=False):
393     '''
394     Creates a config and corresponding rels entries in Consul. Returns the Consul the keys and entries.
395
396     Parameters
397     ----------
398     user : string
399         The user namespace to create the config and rels under. E.g. user.foo.bar...
400     cname : string
401         Name of the upstream component
402     cver : string
403         Version of the upstream component
404     params : dict
405         Parameters of the component, taken directly from the component specification
406     interface_map : dict
407         A dict mapping the config_key of published streams and/or called services to a list of compatible
408         component types and versions
409     instance_map : dict
410         A dict mapping component types and versions to a list of instances currently running
411     dmaap_map : dict
412         A dict that contains config key to dmaap information. This map is checked
413         first before checking the instance_map which means before checking for
414         direct http components.
415     instance_prefix : string, optional
416         The unique prefix to associate with the component instance whose config is being created
417     force: string, optional
418         Config will continue to be created even if there are no downstream compatible
419         component when this flag is set to True. Default is False.
420     '''
421     inst_pref = str(uuid4()) if instance_prefix is None else instance_prefix
422     conf_key = "{:}.{:}.{:}.{:}".format(user, inst_pref, replace_dots(cver), replace_dots(cname))
423     rels_key = _create_rels_key(conf_key)
424     dmaap_key = _create_dmaap_key(conf_key)
425
426     conf = params.copy()
427     rels = list()
428
429     # NOTE: The dmaap_map entries are broken up between the templetized config
430     # and the dmaap json in Consul
431     for config_key, dmaap_goodies in six.iteritems(dmaap_map):
432         conf[config_key] = deepcopy(dmaap_map[config_key])
433         # Here comes the magic. << >> signifies dmaap to downstream config
434         # binding service.
435         conf[config_key]["dmaap_info"] = "<<{:}>>".format(config_key)
436
437     # NOTE: The interface_map may not contain *all* possible interfaces
438     # that may be connected with because the catalog.get_discovery call filters
439     # based upon neighbors. Essentailly the interface_map is being pre-filtered
440     # which is probably a latent bug.
441
442     for config_key, compat_types in six.iteritems(interface_map):
443         # Don't clobber config keys that have been set from above
444         if config_key not in conf:
445             conn_comp, instances = _get_downstream(cname, cver, config_key, \
446                     compat_types, instance_map, force=force)
447             conn_name, conn_ver = conn_comp
448             middle = ''
449
450             if conn_name and conn_ver:
451                 middle = "{:}.{:}".format(replace_dots(conn_ver), replace_dots(conn_name))
452             else:
453                 if not force:
454                     raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
455
456             config_val = '{{' + middle + '}}'
457             conf[config_key] = config_val
458             rels.extend(instances)
459
460     dmaap_map_just_info = { config_key: v["dmaap_info"]
461             for config_key, v in six.iteritems(dmaap_map) }
462     return conf_key, conf, rels_key, rels, dmaap_key, dmaap_map_just_info
463
464
465 def get_docker_logins(host=consul_host):
466     """Get Docker logins from Consul
467
468     Returns
469     -------
470     List of objects where the objects must be of the form
471         {"registry": .., "username":.., "password":.. }
472     """
473     key = "dockerlogin_info"
474     (index, val) = Consul(host).kv.get(key)
475
476     if val:
477         return json.loads(val['Value'].decode("utf-8"))
478     else:
479         return []
480
481
482 def push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host=consul_host):
483     '''Uploads the config and rels to Consul'''
484     cons = Consul(host)
485     for k, v in ((conf_key, conf), (rels_key, rels), (dmaap_key, dmaap_map)):
486         cons.kv.put(k, json.dumps(v))
487
488
489 def remove_config(config_key, host=consul_host):
490     """Deletes a config from Consul
491
492     Returns
493     -------
494     True when all artifacts have been successfully deleted else False
495     """
496     cons = Consul(host)
497     results = [ cons.kv.delete(k) for k in (config_key, _create_rels_key(config_key), \
498             _create_dmaap_key(config_key)) ]
499     return all(results)
500
501
502 def _group_config(config, config_key_map):
503     """Groups config by streams_publishes, streams_subscribes, services_calls"""
504     # Copy non streams and services first
505     grouped_conf = { k: v for k,v in six.iteritems(config)
506             if k not in config_key_map }
507
508     def group(group_name):
509         grouped_conf[group_name] = { k: v for k,v in six.iteritems(config)
510             if k in config_key_map and config_key_map[k]["group"] == group_name }
511
512     # Copy and group the streams and services
513     # Map returns iterator so must force running its course
514     list(map(group, ["streams_publishes", "streams_subscribes", "services_calls"]))
515     return grouped_conf
516
517
518 def _apply_inputs(config, inputs_map):
519     """Update configuration with inputs
520
521     This method updates the values of the configuration parameters using values
522     from the inputs map.
523     """
524     config.update(inputs_map)
525     return config
526
527
528 @contextlib.contextmanager
529 def config_context(user, cname, cver, params, interface_map, instance_map,
530         config_key_map, dmaap_map={}, inputs_map={}, instance_prefix=None,
531         host=consul_host, always_cleanup=True, force_config=False):
532     '''Convenience utility for creating configs and cleaning them up
533
534     Args
535     ----
536     always_cleanup: (boolean) This context manager will cleanup the produced config
537         context always if this is True. When False, cleanup will only occur upon any
538         exception getting thrown in the context manager block. Default is True.
539     force: (boolean)
540         Config will continue to be created even if there are no downstream compatible
541         component when this flag is set to True. Default is False.
542     '''
543     try:
544         conf_key, conf, rels_key, rels, dmaap_key, dmaap_map = create_config(
545                 user, cname, cver, params, interface_map, instance_map, dmaap_map,
546                 instance_prefix, force=force_config)
547
548         conf = _apply_inputs(conf, inputs_map)
549         conf = _group_config(conf, config_key_map)
550
551         push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host)
552         yield (conf_key, conf)
553     except Exception as e:
554         if not always_cleanup:
555             try:
556                 conf_key, rels_key, host
557             except UnboundLocalError:
558                 pass
559             else:
560                 remove_config(conf_key, host)
561
562         raise e
563     finally:
564         if always_cleanup:
565             try:
566                 conf_key, rels_key, host
567             except UnboundLocalError:
568                 pass
569             else:
570                 remove_config(conf_key, host)