76c160ae16a54d12721ab3d5df3ea00264664f43
[dcaegen2/platform/plugins.git] / k8s / k8splugin / discovery.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2019 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 #
20 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21
22 import json
23 import logging
24 import re
25 import uuid
26 from functools import partial
27
28 import consul
29 import requests
30
31 logger = logging.getLogger("discovery")
32
33
34 class DiscoveryError(RuntimeError):
35     pass
36
37 class DiscoveryConnectionError(RuntimeError):
38     pass
39
40 class DiscoveryServiceNotFoundError(RuntimeError):
41     pass
42
43 class DiscoveryKVEntryNotFoundError(RuntimeError):
44     pass
45
46
47 def _wrap_consul_call(consul_func, *args, **kwargs):
48     """Wrap Consul call to map errors"""
49     try:
50         return consul_func(*args, **kwargs)
51     except requests.exceptions.ConnectionError as e:
52         raise DiscoveryConnectionError(e)
53
54
55 def generate_service_component_name(service_component_type):
56     """Generate service component id used to pass into the service component
57     instance and used as the key to the service component configuration.
58
59     Updated for use with Kubernetes.  Sometimes the service component name gets
60     used in Kubernetes in contexts (such as naming a Kubernetes Service) that
61     requires the name to conform to the RFC1035 DNS "label" syntax:
62        -- starts with an alpha
63        -- contains only of alphanumerics and "-"
64        -- <= 63 characters long
65
66     Format:
67     s<service component id>-<service component type>,
68         truncated to 63 characters, "_" replaced with "-" in service_component_type,
69         other non-conforming characters removed from service_component_type
70     """
71     # Random generated
72     # Copied from cdap plugin
73     sct = re.sub('[^A-Za-z0-9-]','',(service_component_type.replace('_','-')))
74     return ("s{0}-{1}".format(str(uuid.uuid4()).replace("-",""),sct))[:63]
75
76
77 def create_kv_conn(host):
78     """Create connection to key-value store
79
80     Returns a Consul client to the specified Consul host"""
81     try:
82         [hostname, port] = host.split(":")
83         return consul.Consul(host=hostname, port=int(port))
84     except ValueError as e:
85         return consul.Consul(host=host)
86
87 def push_service_component_config(kv_conn, service_component_name, config):
88     config_string = config if isinstance(config, str) else json.dumps(config)
89     kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put)
90
91     if kv_put_func(service_component_name, config_string):
92         logger.info("Added config for {0}".format(service_component_name))
93     else:
94         raise DiscoveryError("Failed to push configuration")
95
96 def remove_service_component_config(kv_conn, service_component_name):
97     kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete)
98     kv_delete_func(service_component_name)
99
100
101 def get_kv_value(kv_conn, key):
102     """Get a key-value entry's value from Consul
103
104     Raises DiscoveryKVEntryNotFoundError if entry not found
105     """
106     kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get)
107     (index, val) = kv_get_func(key)
108
109     if val:
110         return json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate
111     else:
112         raise DiscoveryKVEntryNotFoundError("{0} kv entry not found".format(key))
113
114
115 def _create_rel_key(service_component_name):
116     return "{0}:rel".format(service_component_name)
117
118 def store_relationship(kv_conn, source_name, target_name):
119     # TODO: Rel entry may already exist in a one-to-many situation. Need to
120     # support that.
121     rel_key = _create_rel_key(source_name)
122     rel_value = [target_name] if target_name else []
123
124     kv_put_func = partial(_wrap_consul_call, kv_conn.kv.put)
125     kv_put_func(rel_key, json.dumps(rel_value))
126     logger.info("Added relationship for {0}".format(rel_key))
127
128 def delete_relationship(kv_conn, service_component_name):
129     rel_key = _create_rel_key(service_component_name)
130     kv_get_func = partial(_wrap_consul_call, kv_conn.kv.get)
131     index, rels = kv_get_func(rel_key)
132
133     if rels:
134         rels = json.loads(rels["Value"].decode("utf-8"))
135         kv_delete_func = partial(_wrap_consul_call, kv_conn.kv.delete)
136         kv_delete_func(rel_key)
137         return rels
138     else:
139         return []
140
141 def lookup_service(kv_conn, service_component_name):
142     catalog_get_func = partial(_wrap_consul_call, kv_conn.catalog.service)
143     index, results = catalog_get_func(service_component_name)
144
145     if results:
146         return results
147     else:
148         raise DiscoveryServiceNotFoundError("Failed to find: {0}".format(service_component_name))
149
150
151 # TODO: Note these functions have been (for the most part) shamelessly lifted from
152 # dcae-cli and should really be shared.
153
154 def _is_healthy_pure(get_health_func, instance):
155     """Checks to see if a component instance is running healthy
156
157     Pure function edition
158
159     Args
160     ----
161     get_health_func: func(string) -> complex object
162         Look at unittests in test_discovery to see examples
163     instance: (string) fully qualified name of component instance
164
165     Returns
166     -------
167     True if instance has been found and is healthy else False
168     """
169     index, resp = get_health_func(instance)
170
171     if resp:
172         def is_passing(instance):
173             return all([check["Status"] == "passing" for check in instance["Checks"]])
174
175         return any([is_passing(instance) for instance in resp])
176     else:
177         return False
178
179 def is_healthy(consul_host, instance):
180     """Checks to see if a component instance is running healthy
181
182     Impure function edition
183
184     Args
185     ----
186     consul_host: (string) host string of Consul
187     instance: (string) fully qualified name of component instance
188
189     Returns
190     -------
191     True if instance has been found and is healthy else False
192     """
193     cons = create_kv_conn(consul_host)
194
195     get_health_func = partial(_wrap_consul_call, cons.health.service)
196     return _is_healthy_pure(get_health_func, instance)
197
198
199 def add_to_entry(conn, key, add_name, add_value):
200     """
201     Find 'key' in consul.
202     Treat its value as a JSON string representing a dict.
203     Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
204     Turn the resulting extended dict into a JSON string.
205     Store the string back into Consul under 'key'.
206     Watch out for conflicting concurrent updates.
207
208     Example:
209     Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
210     add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
211     should result in the value for key 'xyz:dmaap' in consul being updated to
212     '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
213     """
214     while True:     # do until update succeeds
215         (index, val) = conn.kv.get(key)     # index gives version of key retrieved
216
217         if val is None:     # no key yet
218             vstring = '{}'
219             mod_index = 0   # Use 0 as the cas index for initial insertion of the key
220         else:
221             vstring = val['Value']
222             mod_index = val['ModifyIndex']
223
224         # Build the updated dict
225         # Exceptions just propagate
226         v = json.loads(vstring)
227         v[add_name] = add_value
228         new_vstring = json.dumps(v)
229
230         updated = conn.kv.put(key, new_vstring, cas=mod_index)       # if the key has changed since retrieval, this will return false
231         if updated:
232             return v
233
234
235 def _find_matching_services(services, name_search, tags):
236     """Find matching services given search criteria"""
237     tags = set(tags)
238     return [srv_name for srv_name in services
239             if name_search in srv_name and tags <= set(services[srv_name])]
240
241
242 def search_services(conn, name_search, tags):
243     """Search for services that match criteria
244
245     Args:
246     -----
247     name_search: (string) Name to search for as a substring
248     tags: (list) List of strings that are tags. A service must match **all** the
249         tags in the list.
250
251     Retruns:
252     --------
253     List of names of services that matched
254     """
255     # srvs is dict where key is service name and value is list of tags
256     catalog_get_services_func = partial(_wrap_consul_call, conn.catalog.services)
257     index, srvs = catalog_get_services_func()
258
259     if srvs:
260         matches = _find_matching_services(srvs, name_search, tags)
261
262         if matches:
263             return matches
264
265         raise DiscoveryServiceNotFoundError(
266                 "No matches found: {0}, {1}".format(name_search, tags))
267     else:
268         raise DiscoveryServiceNotFoundError("No services found")