1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # -*- coding: utf-8 -*-
23 Provides Consul helper functions
28 from collections import defaultdict
29 from itertools import chain
30 from functools import partial
31 from datetime import datetime
32 from uuid import uuid4
35 from copy import deepcopy
36 from consul import Consul
38 from dcae_cli.util.logger import get_logger
39 from dcae_cli.util.exc import DcaeException
40 from dcae_cli.util.profiles import get_profile
41 from dcae_cli.util.config import get_docker_logins_key
46 logger = get_logger('Discovery')
48 # NOTE: Removed the suffix completely. The useful piece of the suffix was the
49 # location but it was implemented in a static fashion (hardcoded). Rather than
50 # enhancing the existing approach and making the suffix dynamic (to support
51 # "rework-central" and "solutioning"), the thinking is to revisit this name stuff
52 # and use Consul's query interface so that location is a tag attribute.
53 _inst_re = re.compile(r"^(?P<user>[^.]*).(?P<hash>[^.]*).(?P<ver>\d+-\d+-\d+).(?P<comp>.*)$")
56 class DiscoveryError(DcaeException):
59 class DiscoveryNoDownstreamComponentError(DiscoveryError):
63 def default_consul_host():
64 """Return default consul host
66 This method was created to purposefully make fetching the default lazier than
67 the previous impl. The previous impl had the default as a global variable and
68 thus requiring the configuration to be setup before doing anything further.
69 The pain point of that impl is in unit testing where now all code that
70 imported this module had a strict dependency upon the impure configuration.
72 return get_profile().consul_host
75 def _choose_consul_host(consul_host):
76 """Chooses the appropriate consul host
78 Chooses between a provided value and a default
80 return default_consul_host() if consul_host == None else consul_host
83 def replace_dots(comp_name, reverse=False):
84 '''Converts dots to dashes to prevent downstream users of Consul from exploding'''
86 return comp_name.replace('.', '-')
88 return comp_name.replace('-', '.')
90 # Utility functions for using Consul
92 def _is_healthy_pure(get_health_func, instance):
93 """Checks to see if a component instance is running healthy
99 get_health_func: func(string) -> complex object
100 Look at unittests in test_discovery to see examples
101 instance: (string) fully qualified name of component instance
105 True if instance has been found and is healthy else False
107 index, resp = get_health_func(instance)
110 def is_passing(instance):
111 return all([check["Status"] == "passing" for check in instance["Checks"]])
112 return any([is_passing(instance) for instance in resp])
116 def is_healthy(consul_host, instance):
117 """Checks to see if a component instance is running healthy
119 Impure function edition
123 consul_host: (string) host string of Consul
124 instance: (string) fully qualified name of component instance
128 True if instance has been found and is healthy else False
130 cons = Consul(consul_host)
131 return _is_healthy_pure(cons.health.service, instance)
133 def _get_instances_from_kv(get_from_kv_func, user):
134 """Get component instances from kv store
136 Deployed component instances get entries in a kv store to store configuration
137 information. This is a way to source a list of component instances that were
138 attempted to run. A component could have deployed but failed to register itself.
139 The only trace of that deployment would be checking the kv store.
143 get_from_kv_func: func(string, boolean) -> (don't care, list of dicts)
144 Look at unittests in test_discovery to see examples
145 user: (string) user id
149 List of unique component instance names
151 # Keys from KV contain rels key entries and non-rels key entries. Keep the
152 # rels key entries but remove the ":rel" suffix because we are paranoid that
153 # this could exist without the other
154 _, instances_kv = get_from_kv_func(user, recurse=True)
155 return [] if instances_kv is None \
156 else list(set([ dd["Key"].replace(":rel", "") for dd in instances_kv ]))
158 def _get_instances_from_catalog(get_from_catalog_func, user):
159 """Get component instances from catalog
161 Fetching instances from the catalog covers the deployment cases where
162 components registered successfully regardless of their health check status.
166 get_from_catalog_func: func() -> (don't care, dict)
167 Look at unittests in test_discovery to see examples
168 user: (string) user id
172 List of unique component instance names
174 # Get all services and filter here by user
175 response = get_from_catalog_func()
176 return list(set([ instance for instance in response[1].keys() if user in instance ]))
178 def _merge_instances(user, *get_funcs):
179 """Merge the result of an arbitrary list of get instance function calls
183 user: (string) user id
184 get_funcs: func(string) -> list of strings
185 Functions that take in a user parameter to output a list of instance
190 List of unique component instance names
192 return list(set(chain.from_iterable([ get_func(user) for get_func in get_funcs ])))
194 def _get_instances(consul_host, user):
195 """Get all deployed component instances for a given user
197 Sourced from multiple places to ensure we get a complete list of all
198 component instances no matter what state they are in.
202 consul_host: (string) host string of Consul
203 user: (string) user id
207 List of unique component instance names
209 cons = Consul(consul_host)
211 get_instances_from_kv = partial(_get_instances_from_kv, cons.kv.get)
212 get_instances_from_catalog = partial(_get_instances_from_catalog, cons.catalog.services)
214 return _merge_instances(user, get_instances_from_kv, get_instances_from_catalog)
217 # Custom (sometimes higher order) "discovery" functionality
219 def _make_instances_map(instances):
220 """Make an instance map
222 Instance map is a dict where the keys are tuples (component type, component version)
223 that map to a set of strings that are instance names.
225 mapping = defaultdict(set)
226 for instance in instances:
227 match = _inst_re.match(instance)
231 _, _, ver, comp = match.groups()
232 cname = replace_dots(comp, reverse=True)
233 version = replace_dots(ver, reverse=True)
234 key = (cname, version)
235 mapping[key].add(instance)
239 def get_user_instances(user, consul_host=None, filter_instances_func=is_healthy):
240 '''Get a user's instance map
244 filter_instances_func: fn(consul_host, instance) -> boolean
245 Function used to filter instances. Default is is_healthy
249 Dict whose keys are component (name,version) tuples and values are list of component instance names
251 consul_host = _choose_consul_host(consul_host)
252 filter_func = partial(filter_instances_func, consul_host)
253 instances = list(filter(filter_func, _get_instances(consul_host, user)))
255 return _make_instances_map(instances)
258 def _get_component_instances(filter_instances_func, user, cname, cver, consul_host):
259 """Get component instances that are filtered
263 filter_instances_func: fn(consul_host, instance) -> boolean
264 Function used to filter instances
268 List of strings where the strings are fully qualified instance names
270 instance_map = get_user_instances(user, consul_host=consul_host,
271 filter_instances_func=filter_instances_func)
273 # REVIEW: We don't restrict component names from using dashes. We do
274 # transform names with dots to use dashes for domain segmenting reasons.
275 # Instance map creation always reverses that making dashes to dots even though
276 # the component name may have dashes. Thus always search for instances by
277 # a dotted component name. We are open to a collision but that is low chance
278 # - someone has to use the same name in dotted and dashed form which is weird.
279 cname_dashless = replace_dots(cname, reverse=True)
281 # WATCH: instances_map.get returns set. Force to be list to have consistent
283 return list(instance_map.get((cname_dashless, cver), []))
285 def get_healthy_instances(user, cname, cver, consul_host=None):
286 """Lists healthy instances of a particular component for a given user
290 List of strings where the strings are fully qualified instance names
292 consul_host = _choose_consul_host(consul_host)
293 return _get_component_instances(is_healthy, user, cname, cver, consul_host)
295 def get_defective_instances(user, cname, cver, consul_host=None):
296 """Lists *not* running instances of a particular component for a given user
298 This means that there are component instances that are sitting out there
299 deployed but not successfully running.
303 List of strings where the strings are fully qualified instance names
305 def is_not_healthy(consul_host, component):
306 return not is_healthy(consul_host, component)
308 consul_host = _choose_consul_host(consul_host)
309 return _get_component_instances(is_not_healthy, user, cname, cver, consul_host)
312 def lookup_instance(consul_host, name):
313 """Query Consul for service details"""
314 cons = Consul(consul_host)
315 index, results = cons.catalog.service(name)
318 def parse_instance_lookup(results):
319 """Parse the resultset from lookup_instance
323 String in host form <address>:<port>
328 return "{address}:{port}".format(address=result["ServiceAddress"],
329 port=result["ServicePort"])
334 def _create_rels_key(config_key):
335 """Create rels key from config key
337 Assumes config_key is well-formed"""
338 return "{:}:rel".format(config_key)
341 def _create_dmaap_key(config_key):
342 """Create dmaap key from config key
344 Assumes config_key is well-formed"""
345 return "{:}:dmaap".format(config_key)
348 def _create_policies_key(config_key):
349 """Create policies key from config key
351 Assumes config_key is well-formed"""
352 return "{:}:policies/".format(config_key)
354 def clear_user_instances(user, host=None):
355 '''Removes all Consul key:value entries for a given user'''
356 host = _choose_consul_host(host)
358 cons.kv.delete(user, recurse=True)
361 _multiple_compat_msg = '''Component '{cname}' config_key '{ckey}' has multiple compatible downstream \
362 components: {compat}. The current infrastructure can only support interacing with a single component. \
363 Only downstream component '{chosen}' will be connected.'''
365 _no_compat_msg = "Component '{cname}' config_key '{ckey}' has no compatible downstream components."
367 _no_inst_msg = '''Component '{cname}' config_key '{ckey}' is compatible with downstream component '{chosen}' \
368 however there are no instances available for connecting.'''
372 '''Returns a string formatted representation for a component and version'''
374 return ':'.join(args[0])
376 return ':'.join(args)
378 raise DiscoveryError('Input should be name, version or (name, version)')
381 def _get_downstream(cname, cver, config_key, compat_comps, instance_map,
384 Returns a component type and its instances to use for a given config key
389 Name of the upstream component
391 Version of the upstream component
393 Mainly used for populating warnings meaningfully
395 A list of component (name, version) tuples
397 A dict whose keys are component (name, version) tuples and values are a list of instance names
401 logger.warning(_no_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key))
403 conn_comp = six.next(iter(compat_comps))
404 if len(compat_comps) > 1:
405 logger.warning(_multiple_compat_msg.format(cname=_cfmt(cname, cver), ckey=config_key,
406 compat=list(map(_cfmt, compat_comps)), chosen=_cfmt(conn_comp)))
408 instances = instance_map.get(conn_comp, tuple())
411 logger.warning(_no_inst_msg.format(cname=_cfmt(cname, cver), \
412 ckey=config_key, chosen=_cfmt(conn_comp)))
414 logger.error(_no_inst_msg.format(cname=_cfmt(cname, cver), \
415 ckey=config_key, chosen=_cfmt(conn_comp)))
416 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
420 return conn_comp, instances
423 def create_config(user, cname, cver, params, interface_map, instance_map, dmaap_map,
424 instance_prefix=None, force=False):
426 Creates a config and corresponding rels entries in Consul. Returns the Consul the keys and entries.
431 The user namespace to create the config and rels under. E.g. user.foo.bar...
433 Name of the upstream component
435 Version of the upstream component
437 Parameters of the component, taken directly from the component specification
439 A dict mapping the config_key of published streams and/or called services to a list of compatible
440 component types and versions
442 A dict mapping component types and versions to a list of instances currently running
444 A dict that contains config key to dmaap information. This map is checked
445 first before checking the instance_map which means before checking for
446 direct http components.
447 instance_prefix : string, optional
448 The unique prefix to associate with the component instance whose config is being created
449 force: string, optional
450 Config will continue to be created even if there are no downstream compatible
451 component when this flag is set to True. Default is False.
453 inst_pref = str(uuid4()) if instance_prefix is None else instance_prefix
454 conf_key = "{:}.{:}.{:}.{:}".format(user, inst_pref, replace_dots(cver), replace_dots(cname))
455 rels_key = _create_rels_key(conf_key)
456 dmaap_key = _create_dmaap_key(conf_key)
461 # NOTE: The dmaap_map entries are broken up between the templetized config
462 # and the dmaap json in Consul
463 for config_key, dmaap_goodies in six.iteritems(dmaap_map):
464 conf[config_key] = deepcopy(dmaap_map[config_key])
465 # Here comes the magic. << >> signifies dmaap to downstream config
467 conf[config_key]["dmaap_info"] = "<<{:}>>".format(config_key)
469 # NOTE: The interface_map may not contain *all* possible interfaces
470 # that may be connected with because the catalog.get_discovery call filters
471 # based upon neighbors. Essentailly the interface_map is being pre-filtered
472 # which is probably a latent bug.
474 for config_key, compat_types in six.iteritems(interface_map):
475 # Don't clobber config keys that have been set from above
476 if config_key not in conf:
477 conn_comp, instances = _get_downstream(cname, cver, config_key, \
478 compat_types, instance_map, force=force)
479 conn_name, conn_ver = conn_comp
482 if conn_name and conn_ver:
483 middle = "{:}.{:}".format(replace_dots(conn_ver), replace_dots(conn_name))
486 raise DiscoveryNoDownstreamComponentError("No compatible downstream component found.")
488 config_val = '{{' + middle + '}}'
489 conf[config_key] = config_val
490 rels.extend(instances)
492 dmaap_map_just_info = { config_key: v["dmaap_info"]
493 for config_key, v in six.iteritems(dmaap_map) }
494 return conf_key, conf, rels_key, rels, dmaap_key, dmaap_map_just_info
497 def get_docker_logins(host=None):
498 """Get Docker logins from Consul
502 List of objects where the objects must be of the form
503 {"registry": .., "username":.., "password":.. }
505 key = get_docker_logins_key()
506 host = _choose_consul_host(host)
507 (index, val) = Consul(host).kv.get(key)
510 return json.loads(val['Value'].decode("utf-8"))
515 def push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host=None):
516 '''Uploads the config and rels to Consul'''
517 host = _choose_consul_host(host)
519 for k, v in ((conf_key, conf), (rels_key, rels), (dmaap_key, dmaap_map)):
520 cons.kv.put(k, json.dumps(v))
522 logger.info("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *")
523 logger.info("* If you run a 'component reconfig' command, you must first execute the following")
524 logger.info("* export SERVICE_NAME={:}".format(conf_key))
525 logger.info("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *")
528 def remove_config(config_key, host=None):
529 """Deletes a config from Consul
533 True when all artifacts have been successfully deleted else False
535 host = _choose_consul_host(host)
537 # "recurse=True" deletes the SERVICE_NAME KV and all other KVs with suffixes (:rel, :dmaap, :policies)
538 results = cons.kv.delete(config_key, recurse=True)
543 def _group_config(config, config_key_map):
544 """Groups config by streams_publishes, streams_subscribes, services_calls"""
545 # Copy non streams and services first
546 grouped_conf = { k: v for k,v in six.iteritems(config)
547 if k not in config_key_map }
549 def group(group_name):
550 grouped_conf[group_name] = { k: v for k,v in six.iteritems(config)
551 if k in config_key_map and config_key_map[k]["group"] == group_name }
553 # Copy and group the streams and services
554 # Map returns iterator so must force running its course
555 list(map(group, ["streams_publishes", "streams_subscribes", "services_calls"]))
559 def _apply_inputs(config, inputs_map):
560 """Update configuration with inputs
562 This method updates the values of the configuration parameters using values
565 config.update(inputs_map)
569 @contextlib.contextmanager
570 def config_context(user, cname, cver, params, interface_map, instance_map,
571 config_key_map, dmaap_map={}, inputs_map={}, instance_prefix=None,
572 host=None, always_cleanup=True, force_config=False):
573 '''Convenience utility for creating configs and cleaning them up
577 always_cleanup: (boolean)
578 This context manager will cleanup the produced config
579 context always if this is True. When False, cleanup will only occur upon any
580 exception getting thrown in the context manager block. Default is True.
582 Config will continue to be created even if there are no downstream compatible
583 component when this flag is set to True. Default is False.
585 host = _choose_consul_host(host)
588 conf_key, conf, rels_key, rels, dmaap_key, dmaap_map = create_config(
589 user, cname, cver, params, interface_map, instance_map, dmaap_map,
590 instance_prefix, force=force_config)
592 conf = _apply_inputs(conf, inputs_map)
593 conf = _group_config(conf, config_key_map)
595 push_config(conf_key, conf, rels_key, rels, dmaap_key, dmaap_map, host)
596 yield (conf_key, conf)
597 except Exception as e:
598 if not always_cleanup:
600 conf_key, rels_key, host
601 except UnboundLocalError:
604 remove_config(conf_key, host)
610 conf_key, rels_key, host
611 except UnboundLocalError:
614 remove_config(conf_key, host)
617 def policy_update(policy_change_file, consul_host):
619 # Determine if it is an 'updated_policies' or 'removed_policies' change, or if user included ALL policies
620 policies = True if "policies" in policy_change_file.keys() else False
621 updated = True if "updated_policies" in policy_change_file.keys() else False
622 removed = True if "removed_policies" in policy_change_file.keys() else False
624 cons = Consul(consul_host)
625 service_name = os.environ["SERVICE_NAME"]
626 policy_folder = service_name + ":policies/items/"
627 event_folder = service_name + ":policies/event"
630 # User specified ALL "policies" in the Policy File. Ignore "updated_policies"/"removed_policies"
631 logger.warning("The 'policies' specified in the 'policy-file' will replace all policies in Consul.")
632 allPolicies = policy_change_file['policies']
633 if not update_all_policies(cons, policy_folder, allPolicies):
637 # If 'removed_policies', delete the Policy from the Component KV pair
639 policyDeletes = policy_change_file['removed_policies']
640 if not remove_policies(cons, policy_folder, policyDeletes):
643 # If 'updated_policies', update the Component KV pair
645 policyUpdates = policy_change_file['updated_policies']
646 if not update_specified_policies(cons, policy_folder, policyUpdates):
649 return create_policy_event(cons, event_folder, policy_folder)
652 def create_policy_event(cons, event_folder, policy_folder):
653 """ Create a Policy 'event' KV pair in Consol """
655 timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
656 update_id = str(uuid4())
657 policies = cons.kv.get(policy_folder, recurse=True)
658 policies_count = str(policies).count("'Key':")
660 event = '{"action": "gathered", "timestamp": "' + timestamp + '", "update_id": "' + update_id + '", "policies_count": ' + str(policies_count) + '}'
661 if not cons.kv.put(event_folder, event):
662 logger.error("Policy 'Event' creation of ({:}) in Consul failed".format(event_folder))
668 def update_all_policies(cons, policy_folder, allPolicies):
669 """ Delete all policies from Consul, then add the policies the user specified in the 'policies' section of the policy-file """
671 if not cons.kv.delete(policy_folder, recurse=True): # Deletes all Policies under the /policies/items folder
672 logger.error("Policy delete of ({:}) in Consul failed".format(policy_folder))
675 if not update_specified_policies(cons, policy_folder, allPolicies):
680 def update_specified_policies(cons, policy_folder, policyUpdates):
681 """ Replace the policies the user specified in the 'updated_policies' (or 'policies') section of the policy-file """
683 for policy in policyUpdates:
684 policy_folder_id = extract_policy_id(policy_folder, policy)
686 policyBody = json.dumps(policy)
687 if not cons.kv.put(policy_folder_id, policyBody):
688 logger.error("Policy update of ({:}) in Consul failed".format(policy_folder_id))
696 def remove_policies(cons, policy_folder, policyDeletes):
697 """ Delete the policies that the user specified in the 'removed_policies' section of the policy-file """
699 for policy in policyDeletes:
700 policy_folder_id = extract_policy_id(policy_folder, policy)
702 if not cons.kv.delete(policy_folder_id):
703 logger.error("Policy delete of ({:}) in Consul failed".format(policy_folder_id))
710 def extract_policy_id(policy_folder, policy):
711 """ Extract the Policy ID from the policyName.
712 Return the Consul key (Policy Folder with Policy ID) """
714 policyId_re = re.compile(r"(.*)\.\d+\.[a-zA-Z]+$")
716 policyName = policy['policyName'] # Extract the policy Id "Consul Key" from the policy name
717 match = policyId_re.match(policyName)
720 policy_id = match.group(1)
721 policy_folder_id = policy_folder + policy_id
723 return policy_folder_id
725 logger.error("policyName ({:}) needs to end in '.#.xml' in order to extract the Policy ID".format(policyName))
729 def build_policy_command(policy_reconfig_path, policy_change_file, consul_host):
730 """ Build command to execute the Policy Reconfig script in the Docker container """
732 # Determine if it is an 'updated_policies' and/or 'removed_policies' change, or if user included ALL policies
733 all_policies = True if "policies" in policy_change_file.keys() else False
734 updated = True if "updated_policies" in policy_change_file.keys() else False
735 removed = True if "removed_policies" in policy_change_file.keys() else False
737 # Create the Reconfig Script command (3 parts: Command and 2 ARGs)
739 command.append(policy_reconfig_path)
740 command.append("policies")
742 # Create a Dictionary of 'updated', 'removed', and 'ALL' policies
744 # 'updated' policies - policies come from the --policy-file
746 updated_policies = policy_change_file['updated_policies']
747 else: updated_policies = []
750 policies["updated_policies"] = updated_policies
752 # 'removed' policies - policies come from the --policy-file
754 removed_policies = policy_change_file['removed_policies']
755 else: removed_policies = []
757 policies["removed_policies"] = removed_policies
759 # ALL 'policies' - policies come from Consul
760 cons = Consul(consul_host)
761 service_name = os.environ["SERVICE_NAME"]
762 policy_folder = service_name + ":policies/items/"
764 id, consul_policies = cons.kv.get(policy_folder, recurse=True)
768 for policy in consul_policies:
769 policy_value = json.loads(policy['Value'])
770 policy_values.append(policy_value)
772 policies["policies"] = policy_values
774 # Add the policies to the Docker "command" as a JSON string
775 command.append(json.dumps(policies))