1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # -*- coding: utf-8 -*-
23 Provides Consul helper functions
28 from collections import defaultdict
29 from itertools import chain
30 from functools import partial
31 from uuid import uuid4
34 from copy import deepcopy
35 from consul import Consul
37 from dcae_cli.util.logger import get_logger
38 from dcae_cli.util.exc import DcaeException
39 from dcae_cli.util.profiles import get_profile
42 logger = get_logger('Discovery')
44 active_profile = get_profile()
45 consul_host = active_profile.consul_host
46 # NOTE: Removed the suffix completely. The useful piece of the suffix was the
47 # location but it was implemented in a static fashion (hardcoded). Rather than
48 # enhancing the existing approach and making the suffix dynamic (to support
49 # "rework-central" and "solutioning"), the thinking is to revisit this name stuff
50 # and use Consul's query interface so that location is a tag attribute.
51 _inst_re = re.compile(r"^(?P<user>[^.]*).(?P<hash>[^.]*).(?P<ver>\d+-\d+-\d+).(?P<comp>.*)$")
54 class DiscoveryError(DcaeException):
57 class DiscoveryNoDownstreamComponentError(DiscoveryError):
61 def replace_dots(comp_name, reverse=False):
62 '''Converts dots to dashes to prevent downstream users of Consul from exploding'''
64 return comp_name.replace('.', '-')
66 return comp_name.replace('-', '.')
68 # Utility functions for using Consul
70 def _is_healthy_pure(get_health_func, instance):
71 """Checks to see if a component instance is running healthy
77 get_health_func: func(string) -> complex object
78 Look at unittests in test_discovery to see examples
79 instance: (string) fully qualified name of component instance
83 True if instance has been found and is healthy else False
85 index, resp = get_health_func(instance)
88 def is_passing(instance):
89 return all([check["Status"] == "passing" for check in instance["Checks"]])
90 return any([is_passing(instance) for instance in resp])
94 def is_healthy(consul_host, instance):
95 """Checks to see if a component instance is running healthy
97 Impure function edition
101 consul_host: (string) host string of Consul
102 instance: (string) fully qualified name of component instance
106 True if instance has been found and is healthy else False
108 cons = Consul(consul_host)
109 return _is_healthy_pure(cons.health.service, instance)
111 def _get_instances_from_kv(get_from_kv_func, user):
112 """Get component instances from kv store
114 Deployed component instances get entries in a kv store to store configuration
115 information. This is a way to source a list of component instances that were
116 attempted to run. A component could have deployed but failed to register itself.
117 The only trace of that deployment would be checking the kv store.
121 get_from_kv_func: func(string, boolean) -> (don't care, list of dicts)
122 Look at unittests in test_discovery to see examples
123 user: (string) user id
127 List of unique component instance names
129 # Keys from KV contain rels key entries and non-rels key entries. Keep the
130 # rels key entries but remove the ":rel" suffix because we are paranoid that
131 # this could exist without the other
132 _, instances_kv = get_from_kv_func(user, recurse=True)
133 return [] if instances_kv is None \
134 else list(set([ dd["Key"].replace(":rel", "") for dd in instances_kv ]))
136 def _get_instances_from_catalog(get_from_catalog_func, user):
137 """Get component instances from catalog
139 Fetching instances from the catalog covers the deployment cases where
140 components registered successfully regardless of their health check status.
144 get_from_catalog_func: func() -> (don't care, dict)
145 Look at unittests in test_discovery to see examples
146 user: (string) user id
150 List of unique component instance names
152 # Get all services and filter here by user
153 response = get_from_catalog_func()
154 return list(set([ instance for instance in response[1].keys() if user in instance ]))
156 def _merge_instances(user, *get_funcs):
157 """Merge the result of an arbitrary list of get instance function calls
161 user: (string) user id
162 get_funcs: func(string) -> list of strings
163 Functions that take in a user parameter to output a list of instance
168 List of unique component instance names
170 return list(set(chain.from_iterable([ get_func(user) for get_func in get_funcs ])))
172 def _get_instances(consul_host, user):
173 """Get all deployed component instances for a given user
175 Sourced from multiple places to ensure we get a complete list of all
176 component instances no matter what state they are in.
180 consul_host: (string) host string of Consul
181 user: (string) user id
185 List of unique component instance names
187 cons = Consul(consul_host)
189 get_instances_from_kv = partial(_get_instances_from_kv, cons.kv.get)
190 get_instances_from_catalog = partial(_get_instances_from_catalog, cons.catalog.services)
192 return _merge_instances(user, get_instances_from_kv, get_instances_from_catalog)
195 # Custom (sometimes higher order) "discovery" functionality
197 def _make_instances_map(instances):
198 """Make an instance map
200 Instance map is a dict where the keys are tuples (component type, component version)
201 that map to a set of strings that are instance names.
203 mapping = defaultdict(set)
204 for instance in instances:
205 match = _inst_re.match(instance_target)
209 _, _, ver, comp = match.groups()
210 cname = replace_dots(comp, reverse=True)
211 version = replace_dots(ver, reverse=True)
212 key = (cname, version)
213 mapping[key].add(instance)
217 def get_user_instances(user, consul_host=consul_host, filter_instances_func=is_healthy):
218 '''Get a user's instance map
222 filter_instances_func: fn(consul_host, instance) -> boolean
223 Function used to filter instances. Default is is_healthy
227 Dict whose keys are component (name,version) tuples and values are list of component instance names
229 filter_func = partial(filter_instances_func, consul_host)
230 instances = list(filter(filter_func, _get_instances(consul_host, user)))
232 return _make_instances_map(instances)
235 def _get_component_instances(filter_instances_func, user, cname, cver, consul_host):
236 """Get component instances that are filtered
240 filter_instances_func: fn(consul_host, instance) -> boolean
241 Function used to filter instances
245 List of strings where the strings are fully qualified instance names
247 instance_map = get_user_instances(user, consul_host=consul_host,
248 filter_instances_func=filter_instances_func)
250 # REVIEW: We don't restrict component names from using dashes. We do
251 # transform names with dots to use dashes for domain segmenting reasons.
252 # Instance map creation always reverses that making dashes to dots even though
253 # the component name may have dashes. Thus always search for instances by
254 # a dotted component name. We are open to a collision but that is low chance
255 # - someone has to use the same name in dotted and dashed form which is weird.
256 cname_dashless = replace_dots(cname, reverse=True)
258 # WATCH: instances_map.get returns set. Force to be list to have consistent
260 return list(instance_map.get((cname_dashless, cver), []))
262 def get_healthy_instances(user, cname, cver, consul_host=consul_host):
263 """Lists healthy instances of a particular component for a given user
267 List of strings where the strings are fully qualified instance names
269 return _get_component_instances(is_healthy, user, cname, cver, consul_host)
271 def get_defective_instances(user, cname, cver, consul_host=consul_host):
272 """Lists *not* running instances of a particular component for a given user
274 This means that there are component instances that are sitting out there
275 deployed but not successfully running.
279 List of strings where the strings are fully qualified instance names
281 def is_not_healthy(consul_host, component):
282 return not is_healthy(consul_host, component)
284 return _get_component_instances(is_not_healthy, user, cname, cver, consul_host)
287 def lookup_instance(consul_host, name):
288 """Query Consul for service details"""
289 cons = Consul(consul_host)
290 index, results = cons.catalog.service(name)
293 def parse_instance_lookup(results):
294 """Parse the resultset from lookup_instance
298 String in host form <address>:<port>
303 return "{address}:{port}".format(address=result["ServiceAddress"],
304 port=result["ServicePort"])
309 def _create_rels_key(config_key):
310 """Create rels key from config key
312 Assumes config_key is well-formed"""
313 return "{:}:rel".format(config_key)
316 def _create_dmaap_key(config_key):
317 """Create dmaap key from config key
319 Assumes config_key is well-formed"""
320 return "{:}:dmaap".format(config_key)
323 def clear_user_instances(user, host=consul_host):
324 '''Removes all Consul key:value entries for a given user'''
326 cons.kv.delete(user, recurse=True)
329 _multiple_compat_msg = '''Component '{cname}' config_key '{ckey}' has multiple compatible downstream \
330 components: {compat}. The current infrastructure can only support interacing with a single component. \
331 Only downstream component '{chosen}' will be connected.'''
333 _no_compat_msg = "Component '{cname}' config_key '{ckey}' has no compatible downstream components."
335 _no_inst_msg = '''Component '{cname}' config_key '{ckey}' is compatible with downstream component '{chosen}' \
336 however there are no instances available for connecting.'''
340 '''Returns a string formatted representation for a component and version'''
342 return ':'.join(args[0])
344 return ':'.join(args)
346 raise DiscoveryError('Input should be name, version or (name, version)')
349 def _get_downstream(cname, cver, config_key, compat_comps, instance_map,
352 Returns a component type and its instances to use for a given config key
357 Name of the upstream component
359 Version of the upstream component
361 Mainly used for populating warnings meaningfully
363 A list of component (name, version) tuples
365 A dict whose keys are component (name, version) tuples and values are a list of instance names
369 logger.warning(_no_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key))
371 conn_comp = six.next(iter(compat_comps))
372 if len(compat_comps) > 1:
373 logger.warning(_multiple_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key,
374 compat=list(map(_cfmt, compat_comps)), chosen=_cfmt(conn_comp)))
376 instances = instance_map.get(conn_comp, tuple())
379 logger.warning(_no_inst_msg.format(cname=_cfmt(cname, cver), \
380 ckey=config_key, chosen=_cfmt(conn_comp)))
382 logger.error(_no_inst_msg.format(cname=_cfmt(cname, cver), \
383 ckey=config_key, chosen=_cfmt(conn_comp)))
384 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
388 return conn_comp, instances
391 def create_config(user, cname, cver, params, interface_map, instance_map, dmaap_map,
392 instance_prefix=None, force=False):
394 Creates a config and corresponding rels entries in Consul. Returns the Consul the keys and entries.
399 The user namespace to create the config and rels under. E.g. user.foo.bar...
401 Name of the upstream component
403 Version of the upstream component
405 Parameters of the component, taken directly from the component specification
407 A dict mapping the config_key of published streams and/or called services to a list of compatible
408 component types and versions
410 A dict mapping component types and versions to a list of instances currently running
412 A dict that contains config key to dmaap information. This map is checked
413 first before checking the instance_map which means before checking for
414 direct http components.
415 instance_prefix : string, optional
416 The unique prefix to associate with the component instance whose config is being created
417 force: string, optional
418 Config will continue to be created even if there are no downstream compatible
419 component when this flag is set to True. Default is False.
421 inst_pref = str(uuid4()) if instance_prefix is None else instance_prefix
422 conf_key = "{:}.{:}.{:}.{:}".format(user, inst_pref, replace_dots(cver), replace_dots(cname))
423 rels_key = _create_rels_key(conf_key)
424 dmaap_key = _create_dmaap_key(conf_key)
429 # NOTE: The dmaap_map entries are broken up between the templetized config
430 # and the dmaap json in Consul
431 for config_key, dmaap_goodies in six.iteritems(dmaap_map):
432 conf[config_key] = deepcopy(dmaap_map[config_key])
433 # Here comes the magic. << >> signifies dmaap to downstream config
435 conf[config_key]["dmaap_info"] = "<<{:}>>".format(config_key)
437 # NOTE: The interface_map may not contain *all* possible interfaces
438 # that may be connected with because the catalog.get_discovery call filters
439 # based upon neighbors. Essentailly the interface_map is being pre-filtered
440 # which is probably a latent bug.
442 for config_key, compat_types in six.iteritems(interface_map):
443 # Don't clobber config keys that have been set from above
444 if config_key not in conf:
445 conn_comp, instances = _get_downstream(cname, cver, config_key, \
446 compat_types, instance_map, force=force)
447 conn_name, conn_ver = conn_comp
450 if conn_name and conn_ver:
451 middle = "{:}.{:}".format(replace_dots(conn_ver), replace_dots(conn_name))
454 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
456 config_val = '{{' + middle + '}}'
457 conf[config_key] = config_val
458 rels.extend(instances)
460 dmaap_map_just_info = { config_key: v["dmaap_info"]
461 for config_key, v in six.iteritems(dmaap_map) }
462 return conf_key, conf, rels_key, rels, dmaap_key, dmaap_map_just_info
465 def push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host=consul_host):
466 '''Uploads the config and rels to Consul'''
468 for k, v in ((conf_key, conf), (rels_key, rels), (dmaap_key, dmaap_map)):
469 cons.kv.put(k, json.dumps(v))
472 def remove_config(config_key, host=consul_host):
473 """Deletes a config from Consul
477 True when all artifacts have been successfully deleted else False
480 results = [ cons.kv.delete(k) for k in (config_key, _create_rels_key(config_key), \
481 _create_dmaap_key(config_key)) ]
485 def _group_config(config, config_key_map):
486 """Groups config by streams_publishes, streams_subscribes, services_calls"""
487 # Copy non streams and services first
488 grouped_conf = { k: v for k,v in six.iteritems(config)
489 if k not in config_key_map }
491 def group(group_name):
492 grouped_conf[group_name] = { k: v for k,v in six.iteritems(config)
493 if k in config_key_map and config_key_map[k]["group"] == group_name }
495 # Copy and group the streams and services
496 # Map returns iterator so must force running its course
497 list(map(group, ["streams_publishes", "streams_subscribes", "services_calls"]))
501 @contextlib.contextmanager
502 def config_context(user, cname, cver, params, interface_map, instance_map,
503 config_key_map, dmaap_map={}, instance_prefix=None, host=consul_host,
504 always_cleanup=True, force_config=False):
505 '''Convenience utility for creating configs and cleaning them up
509 always_cleanup: (boolean) This context manager will cleanup the produced config
510 context always if this is True. When False, cleanup will only occur upon any
511 exception getting thrown in the context manager block. Default is True.
513 Config will continue to be created even if there are no downstream compatible
514 component when this flag is set to True. Default is False.
517 conf_key, conf, rels_key, rels, dmaap_key, dmaap_map = create_config(
518 user, cname, cver, params, interface_map, instance_map, dmaap_map,
519 instance_prefix, force=force_config)
521 conf = _group_config(conf, config_key_map)
523 push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host)
524 yield (conf_key, conf)
525 except Exception as e:
526 if not always_cleanup:
528 conf_key, rels_key, host
529 except UnboundLocalError:
532 remove_config(conf_key, host)
538 conf_key, rels_key, host
539 except UnboundLocalError:
542 remove_config(conf_key, host)