1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # -*- coding: utf-8 -*-
23 Provides Consul helper functions
28 from collections import defaultdict
29 from itertools import chain
30 from functools import partial
31 from uuid import uuid4
34 from copy import deepcopy
35 from consul import Consul
37 from dcae_cli.util.logger import get_logger
38 from dcae_cli.util.exc import DcaeException
39 from dcae_cli.util.profiles import get_profile
40 from dcae_cli.util.config import get_docker_logins_key
43 logger = get_logger('Discovery')
45 active_profile = get_profile()
46 consul_host = active_profile.consul_host
47 # NOTE: Removed the suffix completely. The useful piece of the suffix was the
48 # location but it was implemented in a static fashion (hardcoded). Rather than
49 # enhancing the existing approach and making the suffix dynamic (to support
50 # "rework-central" and "solutioning"), the thinking is to revisit this name stuff
51 # and use Consul's query interface so that location is a tag attribute.
52 _inst_re = re.compile(r"^(?P<user>[^.]*).(?P<hash>[^.]*).(?P<ver>\d+-\d+-\d+).(?P<comp>.*)$")
55 class DiscoveryError(DcaeException):
58 class DiscoveryNoDownstreamComponentError(DiscoveryError):
62 def replace_dots(comp_name, reverse=False):
63 '''Converts dots to dashes to prevent downstream users of Consul from exploding'''
65 return comp_name.replace('.', '-')
67 return comp_name.replace('-', '.')
69 # Utility functions for using Consul
71 def _is_healthy_pure(get_health_func, instance):
72 """Checks to see if a component instance is running healthy
78 get_health_func: func(string) -> complex object
79 Look at unittests in test_discovery to see examples
80 instance: (string) fully qualified name of component instance
84 True if instance has been found and is healthy else False
86 index, resp = get_health_func(instance)
89 def is_passing(instance):
90 return all([check["Status"] == "passing" for check in instance["Checks"]])
91 return any([is_passing(instance) for instance in resp])
95 def is_healthy(consul_host, instance):
96 """Checks to see if a component instance is running healthy
98 Impure function edition
102 consul_host: (string) host string of Consul
103 instance: (string) fully qualified name of component instance
107 True if instance has been found and is healthy else False
109 cons = Consul(consul_host)
110 return _is_healthy_pure(cons.health.service, instance)
112 def _get_instances_from_kv(get_from_kv_func, user):
113 """Get component instances from kv store
115 Deployed component instances get entries in a kv store to store configuration
116 information. This is a way to source a list of component instances that were
117 attempted to run. A component could have deployed but failed to register itself.
118 The only trace of that deployment would be checking the kv store.
122 get_from_kv_func: func(string, boolean) -> (don't care, list of dicts)
123 Look at unittests in test_discovery to see examples
124 user: (string) user id
128 List of unique component instance names
130 # Keys from KV contain rels key entries and non-rels key entries. Keep the
131 # rels key entries but remove the ":rel" suffix because we are paranoid that
132 # this could exist without the other
133 _, instances_kv = get_from_kv_func(user, recurse=True)
134 return [] if instances_kv is None \
135 else list(set([ dd["Key"].replace(":rel", "") for dd in instances_kv ]))
137 def _get_instances_from_catalog(get_from_catalog_func, user):
138 """Get component instances from catalog
140 Fetching instances from the catalog covers the deployment cases where
141 components registered successfully regardless of their health check status.
145 get_from_catalog_func: func() -> (don't care, dict)
146 Look at unittests in test_discovery to see examples
147 user: (string) user id
151 List of unique component instance names
153 # Get all services and filter here by user
154 response = get_from_catalog_func()
155 return list(set([ instance for instance in response[1].keys() if user in instance ]))
157 def _merge_instances(user, *get_funcs):
158 """Merge the result of an arbitrary list of get instance function calls
162 user: (string) user id
163 get_funcs: func(string) -> list of strings
164 Functions that take in a user parameter to output a list of instance
169 List of unique component instance names
171 return list(set(chain.from_iterable([ get_func(user) for get_func in get_funcs ])))
173 def _get_instances(consul_host, user):
174 """Get all deployed component instances for a given user
176 Sourced from multiple places to ensure we get a complete list of all
177 component instances no matter what state they are in.
181 consul_host: (string) host string of Consul
182 user: (string) user id
186 List of unique component instance names
188 cons = Consul(consul_host)
190 get_instances_from_kv = partial(_get_instances_from_kv, cons.kv.get)
191 get_instances_from_catalog = partial(_get_instances_from_catalog, cons.catalog.services)
193 return _merge_instances(user, get_instances_from_kv, get_instances_from_catalog)
196 # Custom (sometimes higher order) "discovery" functionality
198 def _make_instances_map(instances):
199 """Make an instance map
201 Instance map is a dict where the keys are tuples (component type, component version)
202 that map to a set of strings that are instance names.
204 mapping = defaultdict(set)
205 for instance in instances:
206 match = _inst_re.match(instance)
210 _, _, ver, comp = match.groups()
211 cname = replace_dots(comp, reverse=True)
212 version = replace_dots(ver, reverse=True)
213 key = (cname, version)
214 mapping[key].add(instance)
218 def get_user_instances(user, consul_host=consul_host, filter_instances_func=is_healthy):
219 '''Get a user's instance map
223 filter_instances_func: fn(consul_host, instance) -> boolean
224 Function used to filter instances. Default is is_healthy
228 Dict whose keys are component (name,version) tuples and values are list of component instance names
230 filter_func = partial(filter_instances_func, consul_host)
231 instances = list(filter(filter_func, _get_instances(consul_host, user)))
233 return _make_instances_map(instances)
236 def _get_component_instances(filter_instances_func, user, cname, cver, consul_host):
237 """Get component instances that are filtered
241 filter_instances_func: fn(consul_host, instance) -> boolean
242 Function used to filter instances
246 List of strings where the strings are fully qualified instance names
248 instance_map = get_user_instances(user, consul_host=consul_host,
249 filter_instances_func=filter_instances_func)
251 # REVIEW: We don't restrict component names from using dashes. We do
252 # transform names with dots to use dashes for domain segmenting reasons.
253 # Instance map creation always reverses that making dashes to dots even though
254 # the component name may have dashes. Thus always search for instances by
255 # a dotted component name. We are open to a collision but that is low chance
256 # - someone has to use the same name in dotted and dashed form which is weird.
257 cname_dashless = replace_dots(cname, reverse=True)
259 # WATCH: instances_map.get returns set. Force to be list to have consistent
261 return list(instance_map.get((cname_dashless, cver), []))
263 def get_healthy_instances(user, cname, cver, consul_host=consul_host):
264 """Lists healthy instances of a particular component for a given user
268 List of strings where the strings are fully qualified instance names
270 return _get_component_instances(is_healthy, user, cname, cver, consul_host)
272 def get_defective_instances(user, cname, cver, consul_host=consul_host):
273 """Lists *not* running instances of a particular component for a given user
275 This means that there are component instances that are sitting out there
276 deployed but not successfully running.
280 List of strings where the strings are fully qualified instance names
282 def is_not_healthy(consul_host, component):
283 return not is_healthy(consul_host, component)
285 return _get_component_instances(is_not_healthy, user, cname, cver, consul_host)
288 def lookup_instance(consul_host, name):
289 """Query Consul for service details"""
290 cons = Consul(consul_host)
291 index, results = cons.catalog.service(name)
294 def parse_instance_lookup(results):
295 """Parse the resultset from lookup_instance
299 String in host form <address>:<port>
304 return "{address}:{port}".format(address=result["ServiceAddress"],
305 port=result["ServicePort"])
310 def _create_rels_key(config_key):
311 """Create rels key from config key
313 Assumes config_key is well-formed"""
314 return "{:}:rel".format(config_key)
317 def _create_dmaap_key(config_key):
318 """Create dmaap key from config key
320 Assumes config_key is well-formed"""
321 return "{:}:dmaap".format(config_key)
324 def clear_user_instances(user, host=consul_host):
325 '''Removes all Consul key:value entries for a given user'''
327 cons.kv.delete(user, recurse=True)
330 _multiple_compat_msg = '''Component '{cname}' config_key '{ckey}' has multiple compatible downstream \
331 components: {compat}. The current infrastructure can only support interacing with a single component. \
332 Only downstream component '{chosen}' will be connected.'''
334 _no_compat_msg = "Component '{cname}' config_key '{ckey}' has no compatible downstream components."
336 _no_inst_msg = '''Component '{cname}' config_key '{ckey}' is compatible with downstream component '{chosen}' \
337 however there are no instances available for connecting.'''
341 '''Returns a string formatted representation for a component and version'''
343 return ':'.join(args[0])
345 return ':'.join(args)
347 raise DiscoveryError('Input should be name, version or (name, version)')
350 def _get_downstream(cname, cver, config_key, compat_comps, instance_map,
353 Returns a component type and its instances to use for a given config key
358 Name of the upstream component
360 Version of the upstream component
362 Mainly used for populating warnings meaningfully
364 A list of component (name, version) tuples
366 A dict whose keys are component (name, version) tuples and values are a list of instance names
370 logger.warning(_no_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key))
372 conn_comp = six.next(iter(compat_comps))
373 if len(compat_comps) > 1:
374 logger.warning(_multiple_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key,
375 compat=list(map(_cfmt, compat_comps)), chosen=_cfmt(conn_comp)))
377 instances = instance_map.get(conn_comp, tuple())
380 logger.warning(_no_inst_msg.format(cname=_cfmt(cname, cver), \
381 ckey=config_key, chosen=_cfmt(conn_comp)))
383 logger.error(_no_inst_msg.format(cname=_cfmt(cname, cver), \
384 ckey=config_key, chosen=_cfmt(conn_comp)))
385 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
389 return conn_comp, instances
392 def create_config(user, cname, cver, params, interface_map, instance_map, dmaap_map,
393 instance_prefix=None, force=False):
395 Creates a config and corresponding rels entries in Consul. Returns the Consul the keys and entries.
400 The user namespace to create the config and rels under. E.g. user.foo.bar...
402 Name of the upstream component
404 Version of the upstream component
406 Parameters of the component, taken directly from the component specification
408 A dict mapping the config_key of published streams and/or called services to a list of compatible
409 component types and versions
411 A dict mapping component types and versions to a list of instances currently running
413 A dict that contains config key to dmaap information. This map is checked
414 first before checking the instance_map which means before checking for
415 direct http components.
416 instance_prefix : string, optional
417 The unique prefix to associate with the component instance whose config is being created
418 force: string, optional
419 Config will continue to be created even if there are no downstream compatible
420 component when this flag is set to True. Default is False.
422 inst_pref = str(uuid4()) if instance_prefix is None else instance_prefix
423 conf_key = "{:}.{:}.{:}.{:}".format(user, inst_pref, replace_dots(cver), replace_dots(cname))
424 rels_key = _create_rels_key(conf_key)
425 dmaap_key = _create_dmaap_key(conf_key)
430 # NOTE: The dmaap_map entries are broken up between the templetized config
431 # and the dmaap json in Consul
432 for config_key, dmaap_goodies in six.iteritems(dmaap_map):
433 conf[config_key] = deepcopy(dmaap_map[config_key])
434 # Here comes the magic. << >> signifies dmaap to downstream config
436 conf[config_key]["dmaap_info"] = "<<{:}>>".format(config_key)
438 # NOTE: The interface_map may not contain *all* possible interfaces
439 # that may be connected with because the catalog.get_discovery call filters
440 # based upon neighbors. Essentailly the interface_map is being pre-filtered
441 # which is probably a latent bug.
443 for config_key, compat_types in six.iteritems(interface_map):
444 # Don't clobber config keys that have been set from above
445 if config_key not in conf:
446 conn_comp, instances = _get_downstream(cname, cver, config_key, \
447 compat_types, instance_map, force=force)
448 conn_name, conn_ver = conn_comp
451 if conn_name and conn_ver:
452 middle = "{:}.{:}".format(replace_dots(conn_ver), replace_dots(conn_name))
455 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
457 config_val = '{{' + middle + '}}'
458 conf[config_key] = config_val
459 rels.extend(instances)
461 dmaap_map_just_info = { config_key: v["dmaap_info"]
462 for config_key, v in six.iteritems(dmaap_map) }
463 return conf_key, conf, rels_key, rels, dmaap_key, dmaap_map_just_info
466 def get_docker_logins(host=consul_host):
467 """Get Docker logins from Consul
471 List of objects where the objects must be of the form
472 {"registry": .., "username":.., "password":.. }
474 key = get_docker_logins_key()
475 (index, val) = Consul(host).kv.get(key)
478 return json.loads(val['Value'].decode("utf-8"))
483 def push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host=consul_host):
484 '''Uploads the config and rels to Consul'''
486 for k, v in ((conf_key, conf), (rels_key, rels), (dmaap_key, dmaap_map)):
487 cons.kv.put(k, json.dumps(v))
490 def remove_config(config_key, host=consul_host):
491 """Deletes a config from Consul
495 True when all artifacts have been successfully deleted else False
498 results = [ cons.kv.delete(k) for k in (config_key, _create_rels_key(config_key), \
499 _create_dmaap_key(config_key)) ]
503 def _group_config(config, config_key_map):
504 """Groups config by streams_publishes, streams_subscribes, services_calls"""
505 # Copy non streams and services first
506 grouped_conf = { k: v for k,v in six.iteritems(config)
507 if k not in config_key_map }
509 def group(group_name):
510 grouped_conf[group_name] = { k: v for k,v in six.iteritems(config)
511 if k in config_key_map and config_key_map[k]["group"] == group_name }
513 # Copy and group the streams and services
514 # Map returns iterator so must force running its course
515 list(map(group, ["streams_publishes", "streams_subscribes", "services_calls"]))
519 def _apply_inputs(config, inputs_map):
520 """Update configuration with inputs
522 This method updates the values of the configuration parameters using values
525 config.update(inputs_map)
529 @contextlib.contextmanager
530 def config_context(user, cname, cver, params, interface_map, instance_map,
531 config_key_map, dmaap_map={}, inputs_map={}, instance_prefix=None,
532 host=consul_host, always_cleanup=True, force_config=False):
533 '''Convenience utility for creating configs and cleaning them up
537 always_cleanup: (boolean) This context manager will cleanup the produced config
538 context always if this is True. When False, cleanup will only occur upon any
539 exception getting thrown in the context manager block. Default is True.
541 Config will continue to be created even if there are no downstream compatible
542 component when this flag is set to True. Default is False.
545 conf_key, conf, rels_key, rels, dmaap_key, dmaap_map = create_config(
546 user, cname, cver, params, interface_map, instance_map, dmaap_map,
547 instance_prefix, force=force_config)
549 conf = _apply_inputs(conf, inputs_map)
550 conf = _group_config(conf, config_key_map)
552 push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host)
553 yield (conf_key, conf)
554 except Exception as e:
555 if not always_cleanup:
557 conf_key, rels_key, host
558 except UnboundLocalError:
561 remove_config(conf_key, host)
567 conf_key, rels_key, host
568 except UnboundLocalError:
571 remove_config(conf_key, host)