1b25f3e94dc8c5004d49f2f1a28ed27900455e33
[dcaegen2/platform.git] / oti / event-handler / otihandler / consul_client.py
1 # ================================================================================
2 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16
17 """client to talk to consul at consul port 8500"""
18
19 import base64
20 import copy
21 import json
22 import logging
23 import os
24 import re
25 import socket
26
27 import requests
28
29
30 class ConsulClientError(RuntimeError):
31     pass
32
33 class ConsulClientConnectionError(RuntimeError):
34     pass
35
36 class ConsulClientServiceNotFoundError(RuntimeError):
37     pass
38
39 class ConsulClientNodeNotFoundError(RuntimeError):
40     pass
41
42 class ConsulClientKVEntryNotFoundError(RuntimeError):
43     pass
44
45
46 class ConsulClient(object):
47     """talking to consul"""
48
49     CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}"
50     CONSUL_KV_MASK = "{}/v1/kv/{}"
51     CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true"
52     CONSUL_TRANSACTION_URL = "{}/v1/txn"
53     _logger = logging.getLogger("oti_handler.consul_client")
54
55     MAX_OPS_PER_TXN = 64
56     # MAX_VALUE_LEN = 512 * 1000
57
58     OPERATION_SET = "set"
59     OPERATION_DELETE = "delete"
60     OPERATION_DELETE_FOLDER = "delete-tree"
61
62
63     #----- Methods for Consul services
64
65     @staticmethod
66     def lookup_service(service_name):
67         """find the service record in consul"""
68
69         service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
70
71         ConsulClient._logger.info("lookup_service(%s)", service_path)
72
73         try:
74             response = requests.get(service_path, timeout=30)
75             response.raise_for_status()
76         # except requests.exceptions.HTTPError as e:
77         # except requests.exceptions.ConnectionError as e:
78         # except requests.exceptions.Timeout as e:
79         except requests.exceptions.RequestException as e:
80             msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format(
81                       service_name, service_path, type(e).__name__, e)
82             ConsulClient._logger.error(msg)
83             raise ConsulClientConnectionError(msg)
84
85         try:
86             return_list = response.json()
87         # except ValueError as e:
88         except Exception as e:
89             msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
90                       service_name, service_path, type(e).__name__, e)
91             ConsulClient._logger.error(msg)
92             raise ConsulClientServiceNotFoundError(msg)
93
94         if not return_list:
95             msg = "lookup_service({}) got empty or no value from requests.get({})".format(
96                       service_name, service_path)
97             ConsulClient._logger.error(msg)
98             raise ConsulClientServiceNotFoundError(msg)
99
100         return return_list
101
102
103     @staticmethod
104     def get_all_services():
105         """List all services from consul"""
106
107         service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/"))
108
109         ConsulClient._logger.info("get_all_services(%s)", service_path)
110
111         try:
112             response = requests.get(service_path, timeout=30)
113             response.raise_for_status()
114         except requests.exceptions.RequestException as e:
115             msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format(
116                       service_path, type(e).__name__, e)
117             ConsulClient._logger.error(msg)
118             raise ConsulClientConnectionError(msg)
119
120         try:
121             return_dict = response.json()
122         except Exception as e:
123             msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format(
124                       service_path, type(e).__name__, e)
125             ConsulClient._logger.error(msg)
126             raise ConsulClientServiceNotFoundError(msg)
127
128         if not return_dict:
129             msg = "get_all_services() got empty or no value from requests.get({})".format(
130                       service_path)
131             ConsulClient._logger.info(msg)
132             # raise ConsulClientServiceNotFoundError(msg)
133
134         return return_dict
135
136
137     @staticmethod
138     def _find_matching_services(services, name_search, tags):
139         """Find matching services given search criteria"""
140         sub_tags = tags[0][4:6]
141         tags.append(sub_tags)
142
143         def is_match(service):
144             srv_name, srv_tags = service
145             return name_search in srv_name and \
146                     any([tag in srv_tags for tag in tags])
147
148         return [ srv[0] for srv in list(services.items()) if is_match(srv) ]
149
150
151     @staticmethod
152     def search_services(name_search, tags):
153         """
154         Search for services that match criteria
155
156         Args:
157         -----
158         name_search: (string) Name to search for as a substring
159         tags: (list) List of strings that are tags. A service must match **ANY OF** the
160             tags in the list.
161
162         Returns:
163         --------
164         List of names of services that matched
165         """
166
167         matches = []
168
169         # srvs is dict where key is service name and value is list of tags
170         srvs = ConsulClient.get_all_services()
171
172         if srvs:
173             matches = ConsulClient._find_matching_services(srvs, name_search, tags)
174
175         return matches
176
177
178     @staticmethod
179     def get_service_fqdn_port(service_name, node_meta=False):
180         """find the service record in consul"""
181
182         service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
183
184         ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path)
185
186         try:
187             response = requests.get(service_path, timeout=30)
188             response.raise_for_status()
189         except requests.exceptions.RequestException as e:
190             msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format(
191                       service_name, service_path, type(e).__name__, e)
192             ConsulClient._logger.error(msg)
193             raise ConsulClientConnectionError(msg)
194
195         try:
196             service = response.json()
197         except Exception as e:
198             msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
199                       service_name, service_path, type(e).__name__, e)
200             ConsulClient._logger.error(msg)
201             raise ConsulClientServiceNotFoundError(msg)
202
203         if not service:
204             msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format(
205                       service_name, service_path)
206             ConsulClient._logger.error(msg)
207             raise ConsulClientServiceNotFoundError(msg)
208
209         try:
210             service = service[0]     # arbitrarily choose the first one
211             port = service["ServicePort"]
212
213             # HTTPS certificate validation requires FQDN not IP address
214             fqdn = ""
215             if node_meta:
216                 meta = service.get("NodeMeta")
217                 if meta:
218                     fqdn = meta.get("fqdn")
219             if not fqdn:
220                 fqdn = socket.getfqdn(str(service["ServiceAddress"]))
221         except Exception as e:
222             msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format(
223                       service_name, service_path, type(e).__name__, e)
224             ConsulClient._logger.error(msg)
225             raise ConsulClientServiceNotFoundError(msg)
226
227         return (fqdn, port)
228
229
230     #----- Methods for Consul nodes
231
232     @staticmethod
233     def lookup_node(node_name):
234         """find the node record in consul"""
235
236         node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name)
237
238         ConsulClient._logger.info("lookup_node(%s)", node_path)
239
240         try:
241             response = requests.get(node_path, timeout=30)
242             response.raise_for_status()
243         # except requests.exceptions.HTTPError as e:
244         # except requests.exceptions.ConnectionError as e:
245         # except requests.exceptions.Timeout as e:
246         except requests.exceptions.RequestException as e:
247             msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format(
248                       node_name, node_path, type(e).__name__, e)
249             ConsulClient._logger.error(msg)
250             raise ConsulClientConnectionError(msg)
251
252         try:
253             return_dict = response.json()
254         # except ValueError as e:
255         except Exception as e:
256             msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
257                       node_name, node_path, type(e).__name__, e)
258             ConsulClient._logger.error(msg)
259             raise ConsulClientNodeNotFoundError(msg)
260
261         if not return_dict:
262             msg = "lookup_node({}) got empty or no value from requests.get({})".format(
263                       node_name, node_path)
264             ConsulClient._logger.error(msg)
265             raise ConsulClientNodeNotFoundError(msg)
266
267         return return_dict
268
269
270     #----- Methods for Consul key-values
271
272     @staticmethod
273     def put_value(key, data, cas=None):
274         """put the value for key into consul-kv"""
275
276         # ConsulClient._logger.info("put_value(%s)", str(key))
277
278         URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
279         if cas is not None:
280             URL = '{}?cas={}'.format(URL, cas)
281
282         try:
283             response = requests.put(URL, data=json.dumps(data), timeout=30)
284             response.raise_for_status()
285         except requests.exceptions.RequestException as e:
286             msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format(
287                       key, URL, type(e).__name__, e)
288             ConsulClient._logger.error(msg)
289             raise ConsulClientConnectionError(msg)
290
291         try:
292             updated = response.json()
293         except Exception as e:
294             msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format(
295                       key, URL, type(e).__name__, e)
296             ConsulClient._logger.error(msg)
297             raise ConsulClientKVEntryNotFoundError(msg)
298
299         return updated
300
301
302     @staticmethod
303     def get_value(key, get_index=False):
304         """get the value for key from consul-kv"""
305
306         URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
307
308         try:
309             response = requests.get(URL, timeout=30)
310             response.raise_for_status()
311         except requests.exceptions.RequestException as e:
312             msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format(
313                       key, URL, type(e).__name__, e)
314             ConsulClient._logger.error(msg)
315             raise ConsulClientConnectionError(msg)
316
317         try:
318             data = response.json()
319         except Exception as e:
320             msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
321                       key, URL, type(e).__name__, e)
322             ConsulClient._logger.error(msg)
323             raise ConsulClientKVEntryNotFoundError(msg)
324
325         if not data:
326             msg = "get_value({}) got empty or no value from requests.get({})".format(
327                       key, URL)
328             ConsulClient._logger.error(msg)
329             raise ConsulClientKVEntryNotFoundError(msg)
330
331         try:
332             value = base64.b64decode(data[0]["Value"]).decode("utf-8")
333             value_dict = json.loads(value)
334         except Exception as e:
335             msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format(
336                       key, URL, type(e).__name__, e)
337             ConsulClient._logger.error(msg)
338             raise ConsulClientKVEntryNotFoundError(msg)
339
340         ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s",
341                                      key, value, json.dumps(data))
342
343         if get_index:
344             return data[0]["ModifyIndex"], value_dict
345
346         return value_dict
347
348
349     @staticmethod
350     def get_kvs(prefix, nest=True, trim_prefix=False):
351         """get key-values for keys beginning with prefix from consul-kv"""
352
353         URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix)
354
355         try:
356             response = requests.get(URL, timeout=30)
357             response.raise_for_status()
358         except requests.exceptions.RequestException as e:
359             msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format(
360                       prefix, URL, type(e).__name__, e)
361             ConsulClient._logger.error(msg)
362             raise ConsulClientConnectionError(msg)
363
364         try:
365             data = response.json()
366         except Exception as e:
367             msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
368                       prefix, URL, type(e).__name__, e)
369             ConsulClient._logger.error(msg)
370             raise ConsulClientKVEntryNotFoundError(msg)
371
372         if not data:
373             msg = "get_kvs({}) got empty or no value from requests.get({})".format(
374                       prefix, URL)
375             ConsulClient._logger.error(msg)
376             raise ConsulClientKVEntryNotFoundError(msg)
377
378         def put_level_value(level_keys, value, level_dict={}):
379             if level_keys:
380                 key = level_keys.pop(0)
381                 level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {}))
382                 return level_dict
383             else:
384                 return value
385
386         rdict = {}
387         for item in data:
388             v = base64.b64decode(item["Value"]).decode("utf-8")
389             try:
390                 value = json.loads(v)
391             except Exception as e:
392                 value = v
393             key = item['Key']
394             if trim_prefix:
395                 key = key[len(prefix):]
396             if nest:
397                 level_keys = key.split('/')
398                 rdict = put_level_value(level_keys, value, rdict)
399             else:
400                 rdict[key] = value
401
402         ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s",
403                                   prefix, json.dumps(rdict), json.dumps(data))
404         return rdict
405
406
407     @staticmethod
408     def _gen_txn_operation(verb, key, value=None):
409         """returns the properly formatted operation to be used inside transaction"""
410
411         # key = urllib.quote(key)  # can't use urllib.quote() because it kills ':' in the key
412         if value:
413             return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(bytes(value, "utf-8")).decode("utf-8")}}
414         return {"KV": {"Verb": verb, "Key": key}}
415
416
417     @staticmethod
418     def _run_transaction(operation_name, txn):
419         """run a single transaction of several operations at consul /txn"""
420
421         if not txn:
422             return
423
424         txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/"))
425         response = None
426         try:
427             response = requests.put(txn_url, json=txn, timeout=30)
428         except requests.exceptions.RequestException as e:
429             ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}"
430                 .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn)))
431             return
432
433         if response.status_code != requests.codes.ok:
434             ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}"
435                 .format(operation_name, txn_url, response.status_code,
436                         response.text, json.dumps(txn),
437                         json.dumps(dict(list(response.request.headers.items())))))
438             return
439
440         ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}"
441             .format(operation_name, txn_url, response.status_code,
442                     response.text, json.dumps(txn),
443                     json.dumps(dict(list(response.request.headers.items())))))
444
445         return True
446
447
448     @staticmethod
449     def store_kvs(kvs):
450         """put kvs into consul-kv"""
451
452         if not kvs:
453             ConsulClient._logger.warning("kvs not supplied to store_kvs()")
454             return
455
456         store_kvs = [
457             ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET,
458                                             key, json.dumps(value))
459                 for key, value in kvs.items()
460         ]
461         txn = []
462         idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn)
463         for idx in range(0, len(store_kvs), idx_step):
464             txn += store_kvs[idx : idx + idx_step]
465             if not ConsulClient._run_transaction("store_kvs", txn):
466                 return False
467             txn = []
468
469         return ConsulClient._run_transaction("store_kvs", txn)
470
471
472     @staticmethod
473     def delete_key(key):
474         """delete key from consul-kv"""
475
476         if not key:
477             ConsulClient._logger.warning("key not supplied to delete_key()")
478             return
479
480         delete_key = [
481             ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key)
482         ]
483         return ConsulClient._run_transaction("delete_key", delete_key)
484
485
486     @staticmethod
487     def delete_kvs(key):
488         """delete key from consul-kv"""
489
490         if not key:
491             ConsulClient._logger.warning("key not supplied to delete_kvs()")
492             return
493
494         delete_kvs = [
495             ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key)
496         ]
497         return ConsulClient._run_transaction("delete_kvs", delete_kvs)
498
499
500     #----- Methods for Config Binding Service
501
502     @staticmethod
503     def get_service_component(scn):
504         config = json.dumps(ConsulClient.get_value(scn))
505
506         try:
507             dmaap = ConsulClient.get_value(scn + ":dmaap")
508         except Exception as e:
509             dmaap = None
510         if dmaap:
511             for key in list(dmaap.keys()):
512                 config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config)
513
514         try:
515             rel = ConsulClient.get_value(scn + ":rel")
516         except Exception as e:
517             rel = None
518         if rel:
519             for key in list(rel.keys()):
520                 config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config)
521
522         return json.loads(config)
523
524
525     @staticmethod
526     def get_service_component_all(scn, policies_as_list=True):
527         t_scn = scn + ":"
528         t_len = len(t_scn)
529         a_dict = ConsulClient.get_kvs(scn)
530         b_dict = {}
531         for key in a_dict:
532             b_key = None
533             if key == scn:
534                 b_dict["config"] = ConsulClient.get_service_component(scn)
535             elif key == scn + ":dmaap":
536                 continue
537             elif key[0:t_len] == t_scn:
538                 b_key = key[t_len:]
539                 # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys
540                 if policies_as_list and b_key == "policies":  # convert items from KVs to a values list
541                     b_dict[b_key] = {}
542                     for sub_key in a_dict[key]:
543                         if sub_key == "items":
544                             b_dict[b_key][sub_key] = []
545                             d_dict = a_dict[key][sub_key]
546                             for item in sorted(d_dict.keys()):  # old CBS sorted them so we emulate
547                                 b_dict[b_key][sub_key].append(d_dict[item])
548                         else:
549                             b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key])
550                 else:
551                     b_dict[b_key] = copy.deepcopy(a_dict[key])
552         return b_dict
553
554
555     @staticmethod
556     def add_vnf_id(scn, vnf_type, vnf_id, dti_dict):
557         """
558         Add VNF instance to Consul scn:oti key.
559
560         Treat its value as a JSON string representing a dict.
561         Extend the dict by adding a dti_dict for vnf_id under vnf_type.
562         Turn the resulting extended dict into a JSON string.
563         Store the string back into Consul under scn:oti key.
564         Watch out for conflicting concurrent updates.
565         """
566
567         key = scn + ':oti'
568         lc_vnf_type = vnf_type.lower()
569         while True:     # do until update succeeds
570             (mod_index, v) = ConsulClient.get_value(key, get_index=True)
571             lc_v = {ky.lower():vl for ky,vl in list(v.items())}  # aware this arbitrarily picks keys that only differ in case
572                                                            # but DCAE-C doesn't create such keys
573
574             if lc_vnf_type not in lc_v:
575                 return  # That VNF type is not supported by this component
576             lc_v[lc_vnf_type][vnf_id] = dti_dict  # add or replace the VNF instance
577
578             updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
579             if updated:
580                 return lc_v
581
582
583     @staticmethod
584     def delete_vnf_id(scn, vnf_type, vnf_id):
585         """
586         Delete VNF instance from Consul scn:oti key.
587
588         Treat its value as a JSON string representing a dict.
589         Modify the dict by deleting the vnf_id key entry from under vnf_type.
590         Turn the resulting extended dict into a JSON string.
591         Store the string back into Consul under scn:oti key.
592         Watch out for conflicting concurrent updates.
593         """
594
595         key = scn + ':oti'
596         lc_vnf_type = vnf_type.lower()
597         while True:     # do until update succeeds
598             (mod_index, v) = ConsulClient.get_value(key, get_index=True)
599             lc_v = {ky.lower():vl for ky,vl in list(v.items())}  # aware this arbitrarily picks keys that only differ in case
600                                                            # but DCAE-C doesn't create such keys
601
602             if lc_vnf_type not in lc_v:
603                 return  # That VNF type is not supported by this component
604             if vnf_id not in lc_v[lc_vnf_type]:
605                 return lc_v
606             del lc_v[lc_vnf_type][vnf_id]  # delete the VNF instance
607
608             updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
609             if updated:
610                 return lc_v
611
612
613 if __name__ == "__main__":
614     value = None
615
616     if value:
617         print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': ')))