1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # -*- coding: utf-8 -*-
23 Provides Consul helper functions
28 from collections import defaultdict
29 from itertools import chain
30 from functools import partial
31 from uuid import uuid4
34 from copy import deepcopy
35 from consul import Consul
37 from dcae_cli.util.logger import get_logger
38 from dcae_cli.util.exc import DcaeException
39 from dcae_cli.util.profiles import get_profile
40 from dcae_cli.util.config import get_docker_logins_key
43 logger = get_logger('Discovery')
45 # NOTE: Removed the suffix completely. The useful piece of the suffix was the
46 # location but it was implemented in a static fashion (hardcoded). Rather than
47 # enhancing the existing approach and making the suffix dynamic (to support
48 # "rework-central" and "solutioning"), the thinking is to revisit this name stuff
49 # and use Consul's query interface so that location is a tag attribute.
50 _inst_re = re.compile(r"^(?P<user>[^.]*).(?P<hash>[^.]*).(?P<ver>\d+-\d+-\d+).(?P<comp>.*)$")
53 class DiscoveryError(DcaeException):
56 class DiscoveryNoDownstreamComponentError(DiscoveryError):
60 def default_consul_host():
61 """Return default consul host
63 This method was created to purposefully make fetching the default lazier than
64 the previous impl. The previous impl had the default as a global variable and
65 thus requiring the configuration to be setup before doing anything further.
66 The pain point of that impl is in unit testing where now all code that
67 imported this module had a strict dependency upon the impure configuration.
69 return get_profile().consul_host
72 def _choose_consul_host(consul_host):
73 """Chooses the appropriate consul host
75 Chooses between a provided value and a default
77 return default_consul_host() if consul_host == None else consul_host
80 def replace_dots(comp_name, reverse=False):
81 '''Converts dots to dashes to prevent downstream users of Consul from exploding'''
83 return comp_name.replace('.', '-')
85 return comp_name.replace('-', '.')
87 # Utility functions for using Consul
89 def _is_healthy_pure(get_health_func, instance):
90 """Checks to see if a component instance is running healthy
96 get_health_func: func(string) -> complex object
97 Look at unittests in test_discovery to see examples
98 instance: (string) fully qualified name of component instance
102 True if instance has been found and is healthy else False
104 index, resp = get_health_func(instance)
107 def is_passing(instance):
108 return all([check["Status"] == "passing" for check in instance["Checks"]])
109 return any([is_passing(instance) for instance in resp])
113 def is_healthy(consul_host, instance):
114 """Checks to see if a component instance is running healthy
116 Impure function edition
120 consul_host: (string) host string of Consul
121 instance: (string) fully qualified name of component instance
125 True if instance has been found and is healthy else False
127 cons = Consul(consul_host)
128 return _is_healthy_pure(cons.health.service, instance)
130 def _get_instances_from_kv(get_from_kv_func, user):
131 """Get component instances from kv store
133 Deployed component instances get entries in a kv store to store configuration
134 information. This is a way to source a list of component instances that were
135 attempted to run. A component could have deployed but failed to register itself.
136 The only trace of that deployment would be checking the kv store.
140 get_from_kv_func: func(string, boolean) -> (don't care, list of dicts)
141 Look at unittests in test_discovery to see examples
142 user: (string) user id
146 List of unique component instance names
148 # Keys from KV contain rels key entries and non-rels key entries. Keep the
149 # rels key entries but remove the ":rel" suffix because we are paranoid that
150 # this could exist without the other
151 _, instances_kv = get_from_kv_func(user, recurse=True)
152 return [] if instances_kv is None \
153 else list(set([ dd["Key"].replace(":rel", "") for dd in instances_kv ]))
155 def _get_instances_from_catalog(get_from_catalog_func, user):
156 """Get component instances from catalog
158 Fetching instances from the catalog covers the deployment cases where
159 components registered successfully regardless of their health check status.
163 get_from_catalog_func: func() -> (don't care, dict)
164 Look at unittests in test_discovery to see examples
165 user: (string) user id
169 List of unique component instance names
171 # Get all services and filter here by user
172 response = get_from_catalog_func()
173 return list(set([ instance for instance in response[1].keys() if user in instance ]))
175 def _merge_instances(user, *get_funcs):
176 """Merge the result of an arbitrary list of get instance function calls
180 user: (string) user id
181 get_funcs: func(string) -> list of strings
182 Functions that take in a user parameter to output a list of instance
187 List of unique component instance names
189 return list(set(chain.from_iterable([ get_func(user) for get_func in get_funcs ])))
191 def _get_instances(consul_host, user):
192 """Get all deployed component instances for a given user
194 Sourced from multiple places to ensure we get a complete list of all
195 component instances no matter what state they are in.
199 consul_host: (string) host string of Consul
200 user: (string) user id
204 List of unique component instance names
206 cons = Consul(consul_host)
208 get_instances_from_kv = partial(_get_instances_from_kv, cons.kv.get)
209 get_instances_from_catalog = partial(_get_instances_from_catalog, cons.catalog.services)
211 return _merge_instances(user, get_instances_from_kv, get_instances_from_catalog)
214 # Custom (sometimes higher order) "discovery" functionality
216 def _make_instances_map(instances):
217 """Make an instance map
219 Instance map is a dict where the keys are tuples (component type, component version)
220 that map to a set of strings that are instance names.
222 mapping = defaultdict(set)
223 for instance in instances:
224 match = _inst_re.match(instance)
228 _, _, ver, comp = match.groups()
229 cname = replace_dots(comp, reverse=True)
230 version = replace_dots(ver, reverse=True)
231 key = (cname, version)
232 mapping[key].add(instance)
236 def get_user_instances(user, consul_host=None, filter_instances_func=is_healthy):
237 '''Get a user's instance map
241 filter_instances_func: fn(consul_host, instance) -> boolean
242 Function used to filter instances. Default is is_healthy
246 Dict whose keys are component (name,version) tuples and values are list of component instance names
248 consul_host = _choose_consul_host(consul_host)
249 filter_func = partial(filter_instances_func, consul_host)
250 instances = list(filter(filter_func, _get_instances(consul_host, user)))
252 return _make_instances_map(instances)
255 def _get_component_instances(filter_instances_func, user, cname, cver, consul_host):
256 """Get component instances that are filtered
260 filter_instances_func: fn(consul_host, instance) -> boolean
261 Function used to filter instances
265 List of strings where the strings are fully qualified instance names
267 instance_map = get_user_instances(user, consul_host=consul_host,
268 filter_instances_func=filter_instances_func)
270 # REVIEW: We don't restrict component names from using dashes. We do
271 # transform names with dots to use dashes for domain segmenting reasons.
272 # Instance map creation always reverses that making dashes to dots even though
273 # the component name may have dashes. Thus always search for instances by
274 # a dotted component name. We are open to a collision but that is low chance
275 # - someone has to use the same name in dotted and dashed form which is weird.
276 cname_dashless = replace_dots(cname, reverse=True)
278 # WATCH: instances_map.get returns set. Force to be list to have consistent
280 return list(instance_map.get((cname_dashless, cver), []))
282 def get_healthy_instances(user, cname, cver, consul_host=None):
283 """Lists healthy instances of a particular component for a given user
287 List of strings where the strings are fully qualified instance names
289 consul_host = _choose_consul_host(consul_host)
290 return _get_component_instances(is_healthy, user, cname, cver, consul_host)
292 def get_defective_instances(user, cname, cver, consul_host=None):
293 """Lists *not* running instances of a particular component for a given user
295 This means that there are component instances that are sitting out there
296 deployed but not successfully running.
300 List of strings where the strings are fully qualified instance names
302 def is_not_healthy(consul_host, component):
303 return not is_healthy(consul_host, component)
305 consul_host = _choose_consul_host(consul_host)
306 return _get_component_instances(is_not_healthy, user, cname, cver, consul_host)
309 def lookup_instance(consul_host, name):
310 """Query Consul for service details"""
311 cons = Consul(consul_host)
312 index, results = cons.catalog.service(name)
315 def parse_instance_lookup(results):
316 """Parse the resultset from lookup_instance
320 String in host form <address>:<port>
325 return "{address}:{port}".format(address=result["ServiceAddress"],
326 port=result["ServicePort"])
331 def _create_rels_key(config_key):
332 """Create rels key from config key
334 Assumes config_key is well-formed"""
335 return "{:}:rel".format(config_key)
338 def _create_dmaap_key(config_key):
339 """Create dmaap key from config key
341 Assumes config_key is well-formed"""
342 return "{:}:dmaap".format(config_key)
345 def clear_user_instances(user, host=None):
346 '''Removes all Consul key:value entries for a given user'''
347 host = _choose_consul_host(host)
349 cons.kv.delete(user, recurse=True)
352 _multiple_compat_msg = '''Component '{cname}' config_key '{ckey}' has multiple compatible downstream \
353 components: {compat}. The current infrastructure can only support interacing with a single component. \
354 Only downstream component '{chosen}' will be connected.'''
356 _no_compat_msg = "Component '{cname}' config_key '{ckey}' has no compatible downstream components."
358 _no_inst_msg = '''Component '{cname}' config_key '{ckey}' is compatible with downstream component '{chosen}' \
359 however there are no instances available for connecting.'''
363 '''Returns a string formatted representation for a component and version'''
365 return ':'.join(args[0])
367 return ':'.join(args)
369 raise DiscoveryError('Input should be name, version or (name, version)')
372 def _get_downstream(cname, cver, config_key, compat_comps, instance_map,
375 Returns a component type and its instances to use for a given config key
380 Name of the upstream component
382 Version of the upstream component
384 Mainly used for populating warnings meaningfully
386 A list of component (name, version) tuples
388 A dict whose keys are component (name, version) tuples and values are a list of instance names
392 logger.warning(_no_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key))
394 conn_comp = six.next(iter(compat_comps))
395 if len(compat_comps) > 1:
396 logger.warning(_multiple_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key,
397 compat=list(map(_cfmt, compat_comps)), chosen=_cfmt(conn_comp)))
399 instances = instance_map.get(conn_comp, tuple())
402 logger.warning(_no_inst_msg.format(cname=_cfmt(cname, cver), \
403 ckey=config_key, chosen=_cfmt(conn_comp)))
405 logger.error(_no_inst_msg.format(cname=_cfmt(cname, cver), \
406 ckey=config_key, chosen=_cfmt(conn_comp)))
407 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
411 return conn_comp, instances
414 def create_config(user, cname, cver, params, interface_map, instance_map, dmaap_map,
415 instance_prefix=None, force=False):
417 Creates a config and corresponding rels entries in Consul. Returns the Consul the keys and entries.
422 The user namespace to create the config and rels under. E.g. user.foo.bar...
424 Name of the upstream component
426 Version of the upstream component
428 Parameters of the component, taken directly from the component specification
430 A dict mapping the config_key of published streams and/or called services to a list of compatible
431 component types and versions
433 A dict mapping component types and versions to a list of instances currently running
435 A dict that contains config key to dmaap information. This map is checked
436 first before checking the instance_map which means before checking for
437 direct http components.
438 instance_prefix : string, optional
439 The unique prefix to associate with the component instance whose config is being created
440 force: string, optional
441 Config will continue to be created even if there are no downstream compatible
442 component when this flag is set to True. Default is False.
444 inst_pref = str(uuid4()) if instance_prefix is None else instance_prefix
445 conf_key = "{:}.{:}.{:}.{:}".format(user, inst_pref, replace_dots(cver), replace_dots(cname))
446 rels_key = _create_rels_key(conf_key)
447 dmaap_key = _create_dmaap_key(conf_key)
452 # NOTE: The dmaap_map entries are broken up between the templetized config
453 # and the dmaap json in Consul
454 for config_key, dmaap_goodies in six.iteritems(dmaap_map):
455 conf[config_key] = deepcopy(dmaap_map[config_key])
456 # Here comes the magic. << >> signifies dmaap to downstream config
458 conf[config_key]["dmaap_info"] = "<<{:}>>".format(config_key)
460 # NOTE: The interface_map may not contain *all* possible interfaces
461 # that may be connected with because the catalog.get_discovery call filters
462 # based upon neighbors. Essentailly the interface_map is being pre-filtered
463 # which is probably a latent bug.
465 for config_key, compat_types in six.iteritems(interface_map):
466 # Don't clobber config keys that have been set from above
467 if config_key not in conf:
468 conn_comp, instances = _get_downstream(cname, cver, config_key, \
469 compat_types, instance_map, force=force)
470 conn_name, conn_ver = conn_comp
473 if conn_name and conn_ver:
474 middle = "{:}.{:}".format(replace_dots(conn_ver), replace_dots(conn_name))
477 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
479 config_val = '{{' + middle + '}}'
480 conf[config_key] = config_val
481 rels.extend(instances)
483 dmaap_map_just_info = { config_key: v["dmaap_info"]
484 for config_key, v in six.iteritems(dmaap_map) }
485 return conf_key, conf, rels_key, rels, dmaap_key, dmaap_map_just_info
488 def get_docker_logins(host=None):
489 """Get Docker logins from Consul
493 List of objects where the objects must be of the form
494 {"registry": .., "username":.., "password":.. }
496 key = get_docker_logins_key()
497 host = _choose_consul_host(host)
498 (index, val) = Consul(host).kv.get(key)
501 return json.loads(val['Value'].decode("utf-8"))
506 def push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host=None):
507 '''Uploads the config and rels to Consul'''
508 host = _choose_consul_host(host)
510 for k, v in ((conf_key, conf), (rels_key, rels), (dmaap_key, dmaap_map)):
511 cons.kv.put(k, json.dumps(v))
514 def remove_config(config_key, host=None):
515 """Deletes a config from Consul
519 True when all artifacts have been successfully deleted else False
521 host = _choose_consul_host(host)
523 results = [ cons.kv.delete(k) for k in (config_key, _create_rels_key(config_key), \
524 _create_dmaap_key(config_key)) ]
528 def _group_config(config, config_key_map):
529 """Groups config by streams_publishes, streams_subscribes, services_calls"""
530 # Copy non streams and services first
531 grouped_conf = { k: v for k,v in six.iteritems(config)
532 if k not in config_key_map }
534 def group(group_name):
535 grouped_conf[group_name] = { k: v for k,v in six.iteritems(config)
536 if k in config_key_map and config_key_map[k]["group"] == group_name }
538 # Copy and group the streams and services
539 # Map returns iterator so must force running its course
540 list(map(group, ["streams_publishes", "streams_subscribes", "services_calls"]))
544 def _apply_inputs(config, inputs_map):
545 """Update configuration with inputs
547 This method updates the values of the configuration parameters using values
550 config.update(inputs_map)
554 @contextlib.contextmanager
555 def config_context(user, cname, cver, params, interface_map, instance_map,
556 config_key_map, dmaap_map={}, inputs_map={}, instance_prefix=None,
557 host=None, always_cleanup=True, force_config=False):
558 '''Convenience utility for creating configs and cleaning them up
562 always_cleanup: (boolean) This context manager will cleanup the produced config
563 context always if this is True. When False, cleanup will only occur upon any
564 exception getting thrown in the context manager block. Default is True.
566 Config will continue to be created even if there are no downstream compatible
567 component when this flag is set to True. Default is False.
569 host = _choose_consul_host(host)
572 conf_key, conf, rels_key, rels, dmaap_key, dmaap_map = create_config(
573 user, cname, cver, params, interface_map, instance_map, dmaap_map,
574 instance_prefix, force=force_config)
576 conf = _apply_inputs(conf, inputs_map)
577 conf = _group_config(conf, config_key_map)
579 push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host)
580 yield (conf_key, conf)
581 except Exception as e:
582 if not always_cleanup:
584 conf_key, rels_key, host
585 except UnboundLocalError:
588 remove_config(conf_key, host)
594 conf_key, rels_key, host
595 except UnboundLocalError:
598 remove_config(conf_key, host)