1 # ================================================================================
2 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 """client to talk to consul at consul port 8500"""
30 class ConsulClientError(RuntimeError):
33 class ConsulClientConnectionError(RuntimeError):
36 class ConsulClientServiceNotFoundError(RuntimeError):
39 class ConsulClientNodeNotFoundError(RuntimeError):
42 class ConsulClientKVEntryNotFoundError(RuntimeError):
46 class ConsulClient(object):
47 """talking to consul"""
49 CONSUL_SERVICE_MASK = "{}/v1/catalog/service/{}"
50 CONSUL_KV_MASK = "{}/v1/kv/{}"
51 CONSUL_KVS_MASK = "{}/v1/kv/{}?recurse=true"
52 CONSUL_TRANSACTION_URL = "{}/v1/txn"
53 _logger = logging.getLogger("oti_handler.consul_client")
56 # MAX_VALUE_LEN = 512 * 1000
59 OPERATION_DELETE = "delete"
60 OPERATION_DELETE_FOLDER = "delete-tree"
63 #----- Methods for Consul services
66 def lookup_service(service_name):
67 """find the service record in consul"""
69 service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
71 ConsulClient._logger.info("lookup_service(%s)", service_path)
74 response = requests.get(service_path, timeout=30)
75 response.raise_for_status()
76 # except requests.exceptions.HTTPError as e:
77 # except requests.exceptions.ConnectionError as e:
78 # except requests.exceptions.Timeout as e:
79 except requests.exceptions.RequestException as e:
80 msg = "lookup_service({}) requests.get({}) threw exception {}: {!s}".format(
81 service_name, service_path, type(e).__name__, e)
82 ConsulClient._logger.error(msg)
83 raise ConsulClientConnectionError(msg)
86 return_list = response.json()
87 # except ValueError as e:
88 except Exception as e:
89 msg = "lookup_service({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
90 service_name, service_path, type(e).__name__, e)
91 ConsulClient._logger.error(msg)
92 raise ConsulClientServiceNotFoundError(msg)
95 msg = "lookup_service({}) got empty or no value from requests.get({})".format(
96 service_name, service_path)
97 ConsulClient._logger.error(msg)
98 raise ConsulClientServiceNotFoundError(msg)
104 def get_all_services():
105 """List all services from consul"""
107 service_path = "{}/v1/catalog/services".format(os.environ.get("CONSUL_URL").rstrip("/"))
109 ConsulClient._logger.info("get_all_services(%s)", service_path)
112 response = requests.get(service_path, timeout=30)
113 response.raise_for_status()
114 except requests.exceptions.RequestException as e:
115 msg = "get_all_services() requests.get({}) threw exception {}: {!s}".format(
116 service_path, type(e).__name__, e)
117 ConsulClient._logger.error(msg)
118 raise ConsulClientConnectionError(msg)
121 return_dict = response.json()
122 except Exception as e:
123 msg = "get_all_services() parsing JSON from requests.get({}) threw exception {}: {!s}".format(
124 service_path, type(e).__name__, e)
125 ConsulClient._logger.error(msg)
126 raise ConsulClientServiceNotFoundError(msg)
129 msg = "get_all_services() got empty or no value from requests.get({})".format(
131 ConsulClient._logger.info(msg)
132 # raise ConsulClientServiceNotFoundError(msg)
138 def _find_matching_services(services, name_search, tags):
139 """Find matching services given search criteria"""
140 sub_tags = tags[0][4:6]
141 tags.append(sub_tags)
143 def is_match(service):
144 srv_name, srv_tags = service
145 return name_search in srv_name and \
146 any([tag in srv_tags for tag in tags])
148 return [ srv[0] for srv in list(services.items()) if is_match(srv) ]
152 def search_services(name_search, tags):
154 Search for services that match criteria
158 name_search: (string) Name to search for as a substring
159 tags: (list) List of strings that are tags. A service must match **ANY OF** the
164 List of names of services that matched
169 # srvs is dict where key is service name and value is list of tags
170 srvs = ConsulClient.get_all_services()
173 matches = ConsulClient._find_matching_services(srvs, name_search, tags)
179 def get_service_fqdn_port(service_name, node_meta=False):
180 """find the service record in consul"""
182 service_path = ConsulClient.CONSUL_SERVICE_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), service_name)
184 ConsulClient._logger.info("get_service_fqdn_port(%s)", service_path)
187 response = requests.get(service_path, timeout=30)
188 response.raise_for_status()
189 except requests.exceptions.RequestException as e:
190 msg = "get_service_fqdn_port({}) requests.get({}) threw exception {}: {!s}".format(
191 service_name, service_path, type(e).__name__, e)
192 ConsulClient._logger.error(msg)
193 raise ConsulClientConnectionError(msg)
196 service = response.json()
197 except Exception as e:
198 msg = "get_service_fqdn_port({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
199 service_name, service_path, type(e).__name__, e)
200 ConsulClient._logger.error(msg)
201 raise ConsulClientServiceNotFoundError(msg)
204 msg = "get_service_fqdn_port({}) got empty or no value from requests.get({})".format(
205 service_name, service_path)
206 ConsulClient._logger.error(msg)
207 raise ConsulClientServiceNotFoundError(msg)
210 service = service[0] # arbitrarily choose the first one
211 port = service["ServicePort"]
213 # HTTPS certificate validation requires FQDN not IP address
216 meta = service.get("NodeMeta")
218 fqdn = meta.get("fqdn")
220 fqdn = socket.getfqdn(str(service["ServiceAddress"]))
221 except Exception as e:
222 msg = "get_service_fqdn_port({}) parsing result from requests.get({}) threw exception {}: {!s}".format(
223 service_name, service_path, type(e).__name__, e)
224 ConsulClient._logger.error(msg)
225 raise ConsulClientServiceNotFoundError(msg)
230 #----- Methods for Consul nodes
233 def lookup_node(node_name):
234 """find the node record in consul"""
236 node_path = "{}/v1/catalog/node/{}".format(os.environ.get("CONSUL_URL").rstrip("/"), node_name)
238 ConsulClient._logger.info("lookup_node(%s)", node_path)
241 response = requests.get(node_path, timeout=30)
242 response.raise_for_status()
243 # except requests.exceptions.HTTPError as e:
244 # except requests.exceptions.ConnectionError as e:
245 # except requests.exceptions.Timeout as e:
246 except requests.exceptions.RequestException as e:
247 msg = "lookup_node({}) requests.get({}) threw exception {}: {!s}".format(
248 node_name, node_path, type(e).__name__, e)
249 ConsulClient._logger.error(msg)
250 raise ConsulClientConnectionError(msg)
253 return_dict = response.json()
254 # except ValueError as e:
255 except Exception as e:
256 msg = "lookup_node({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
257 node_name, node_path, type(e).__name__, e)
258 ConsulClient._logger.error(msg)
259 raise ConsulClientNodeNotFoundError(msg)
262 msg = "lookup_node({}) got empty or no value from requests.get({})".format(
263 node_name, node_path)
264 ConsulClient._logger.error(msg)
265 raise ConsulClientNodeNotFoundError(msg)
270 #----- Methods for Consul key-values
273 def put_value(key, data, cas=None):
274 """put the value for key into consul-kv"""
276 # ConsulClient._logger.info("put_value(%s)", str(key))
278 URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
280 URL = '{}?cas={}'.format(URL, cas)
283 response = requests.put(URL, data=json.dumps(data), timeout=30)
284 response.raise_for_status()
285 except requests.exceptions.RequestException as e:
286 msg = "put_value({}) requests.put({}) threw exception {}: {!s}".format(
287 key, URL, type(e).__name__, e)
288 ConsulClient._logger.error(msg)
289 raise ConsulClientConnectionError(msg)
292 updated = response.json()
293 except Exception as e:
294 msg = "put_value({}) parsing JSON from requests.put({}) threw exception {}: {!s}".format(
295 key, URL, type(e).__name__, e)
296 ConsulClient._logger.error(msg)
297 raise ConsulClientKVEntryNotFoundError(msg)
303 def get_value(key, get_index=False):
304 """get the value for key from consul-kv"""
306 URL = ConsulClient.CONSUL_KV_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), key)
309 response = requests.get(URL, timeout=30)
310 response.raise_for_status()
311 except requests.exceptions.RequestException as e:
312 msg = "get_value({}) requests.get({}) threw exception {}: {!s}".format(
313 key, URL, type(e).__name__, e)
314 ConsulClient._logger.error(msg)
315 raise ConsulClientConnectionError(msg)
318 data = response.json()
319 except Exception as e:
320 msg = "get_value({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
321 key, URL, type(e).__name__, e)
322 ConsulClient._logger.error(msg)
323 raise ConsulClientKVEntryNotFoundError(msg)
326 msg = "get_value({}) got empty or no value from requests.get({})".format(
328 ConsulClient._logger.error(msg)
329 raise ConsulClientKVEntryNotFoundError(msg)
332 value = base64.b64decode(data[0]["Value"]).decode("utf-8")
333 value_dict = json.loads(value)
334 except Exception as e:
335 msg = "get_value({}) decoding value from requests.get({}) threw exception {}: {!s}".format(
336 key, URL, type(e).__name__, e)
337 ConsulClient._logger.error(msg)
338 raise ConsulClientKVEntryNotFoundError(msg)
340 ConsulClient._logger.info("consul-kv key=%s value(%s) data=%s",
341 key, value, json.dumps(data))
344 return data[0]["ModifyIndex"], value_dict
350 def get_kvs(prefix, nest=True, trim_prefix=False):
351 """get key-values for keys beginning with prefix from consul-kv"""
353 URL = ConsulClient.CONSUL_KVS_MASK.format(os.environ.get("CONSUL_URL").rstrip("/"), prefix)
356 response = requests.get(URL, timeout=30)
357 response.raise_for_status()
358 except requests.exceptions.RequestException as e:
359 msg = "get_kvs({}) requests.get({}) threw exception {}: {!s}".format(
360 prefix, URL, type(e).__name__, e)
361 ConsulClient._logger.error(msg)
362 raise ConsulClientConnectionError(msg)
365 data = response.json()
366 except Exception as e:
367 msg = "get_kvs({}) parsing JSON from requests.get({}) threw exception {}: {!s}".format(
368 prefix, URL, type(e).__name__, e)
369 ConsulClient._logger.error(msg)
370 raise ConsulClientKVEntryNotFoundError(msg)
373 msg = "get_kvs({}) got empty or no value from requests.get({})".format(
375 ConsulClient._logger.error(msg)
376 raise ConsulClientKVEntryNotFoundError(msg)
378 def put_level_value(level_keys, value, level_dict={}):
380 key = level_keys.pop(0)
381 level_dict[key] = put_level_value(level_keys, value, level_dict.get(key, {}))
388 v = base64.b64decode(item["Value"]).decode("utf-8")
390 value = json.loads(v)
391 except Exception as e:
395 key = key[len(prefix):]
397 level_keys = key.split('/')
398 rdict = put_level_value(level_keys, value, rdict)
402 ConsulClient._logger.info("consul-kv prefix=%s value(%s) data=%s",
403 prefix, json.dumps(rdict), json.dumps(data))
408 def _gen_txn_operation(verb, key, value=None):
409 """returns the properly formatted operation to be used inside transaction"""
411 # key = urllib.quote(key) # can't use urllib.quote() because it kills ':' in the key
413 return {"KV": {"Verb": verb, "Key": key, "Value": base64.b64encode(bytes(value, "utf-8")).decode("utf-8")}}
414 return {"KV": {"Verb": verb, "Key": key}}
418 def _run_transaction(operation_name, txn):
419 """run a single transaction of several operations at consul /txn"""
424 txn_url = ConsulClient.CONSUL_TRANSACTION_URL.format(os.environ.get("CONSUL_URL").rstrip("/"))
427 response = requests.put(txn_url, json=txn, timeout=30)
428 except requests.exceptions.RequestException as e:
429 ConsulClient._logger.error("failed to {} at {}: exception {}: {!s} on txn={}"
430 .format(operation_name, txn_url, type(e).__name__, e, json.dumps(txn)))
433 if response.status_code != requests.codes.ok:
434 ConsulClient._logger.error("failed {} {}: {} text={} txn={} headers={}"
435 .format(operation_name, txn_url, response.status_code,
436 response.text, json.dumps(txn),
437 json.dumps(dict(list(response.request.headers.items())))))
440 ConsulClient._logger.info("response for {} {}: {} text={} txn={} headers={}"
441 .format(operation_name, txn_url, response.status_code,
442 response.text, json.dumps(txn),
443 json.dumps(dict(list(response.request.headers.items())))))
450 """put kvs into consul-kv"""
453 ConsulClient._logger.warning("kvs not supplied to store_kvs()")
457 ConsulClient._gen_txn_operation(ConsulClient.OPERATION_SET,
458 key, json.dumps(value))
459 for key, value in kvs.items()
462 idx_step = ConsulClient.MAX_OPS_PER_TXN - len(txn)
463 for idx in range(0, len(store_kvs), idx_step):
464 txn += store_kvs[idx : idx + idx_step]
465 if not ConsulClient._run_transaction("store_kvs", txn):
469 return ConsulClient._run_transaction("store_kvs", txn)
474 """delete key from consul-kv"""
477 ConsulClient._logger.warning("key not supplied to delete_key()")
481 ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE, key)
483 return ConsulClient._run_transaction("delete_key", delete_key)
488 """delete key from consul-kv"""
491 ConsulClient._logger.warning("key not supplied to delete_kvs()")
495 ConsulClient._gen_txn_operation(ConsulClient.OPERATION_DELETE_FOLDER, key)
497 return ConsulClient._run_transaction("delete_kvs", delete_kvs)
500 #----- Methods for Config Binding Service
503 def get_service_component(scn):
504 config = json.dumps(ConsulClient.get_value(scn))
507 dmaap = ConsulClient.get_value(scn + ":dmaap")
508 except Exception as e:
511 for key in list(dmaap.keys()):
512 config = re.sub('"<<' + key + '>>"', json.dumps(dmaap[key]), config)
515 rel = ConsulClient.get_value(scn + ":rel")
516 except Exception as e:
519 for key in list(rel.keys()):
520 config = re.sub('"{{' + key + '}}"', json.dumps(rel[key]), config)
522 return json.loads(config)
526 def get_service_component_all(scn, policies_as_list=True):
529 a_dict = ConsulClient.get_kvs(scn)
534 b_dict["config"] = ConsulClient.get_service_component(scn)
535 elif key == scn + ":dmaap":
537 elif key[0:t_len] == t_scn:
539 # policies_as_list = True formats policies items in a list like ONAP's CBS; False keeps policy_ids keys
540 if policies_as_list and b_key == "policies": # convert items from KVs to a values list
542 for sub_key in a_dict[key]:
543 if sub_key == "items":
544 b_dict[b_key][sub_key] = []
545 d_dict = a_dict[key][sub_key]
546 for item in sorted(d_dict.keys()): # old CBS sorted them so we emulate
547 b_dict[b_key][sub_key].append(d_dict[item])
549 b_dict[b_key][sub_key] = copy.deepcopy(a_dict[key][sub_key])
551 b_dict[b_key] = copy.deepcopy(a_dict[key])
556 def add_vnf_id(scn, vnf_type, vnf_id, dti_dict):
558 Add VNF instance to Consul scn:oti key.
560 Treat its value as a JSON string representing a dict.
561 Extend the dict by adding a dti_dict for vnf_id under vnf_type.
562 Turn the resulting extended dict into a JSON string.
563 Store the string back into Consul under scn:oti key.
564 Watch out for conflicting concurrent updates.
568 lc_vnf_type = vnf_type.lower()
569 while True: # do until update succeeds
570 (mod_index, v) = ConsulClient.get_value(key, get_index=True)
571 lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case
572 # but DCAE-C doesn't create such keys
574 if lc_vnf_type not in lc_v:
575 return # That VNF type is not supported by this component
576 lc_v[lc_vnf_type][vnf_id] = dti_dict # add or replace the VNF instance
578 updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
584 def delete_vnf_id(scn, vnf_type, vnf_id):
586 Delete VNF instance from Consul scn:oti key.
588 Treat its value as a JSON string representing a dict.
589 Modify the dict by deleting the vnf_id key entry from under vnf_type.
590 Turn the resulting extended dict into a JSON string.
591 Store the string back into Consul under scn:oti key.
592 Watch out for conflicting concurrent updates.
596 lc_vnf_type = vnf_type.lower()
597 while True: # do until update succeeds
598 (mod_index, v) = ConsulClient.get_value(key, get_index=True)
599 lc_v = {ky.lower():vl for ky,vl in list(v.items())} # aware this arbitrarily picks keys that only differ in case
600 # but DCAE-C doesn't create such keys
602 if lc_vnf_type not in lc_v:
603 return # That VNF type is not supported by this component
604 if vnf_id not in lc_v[lc_vnf_type]:
606 del lc_v[lc_vnf_type][vnf_id] # delete the VNF instance
608 updated = ConsulClient.put_value(key, lc_v, cas=mod_index)
613 if __name__ == "__main__":
617 print(json.dumps(value, sort_keys=True, indent=4, separators=(',', ': ')))