move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dcae-policy / dcaepolicyplugin / discovery.py
1 # ================================================================================
2 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 #
17
18 """client to talk to consul on standard port 8500"""
19
20 import base64
21 import json
22
23 import requests
24 from cloudify import ctx
25 from cloudify.exceptions import NonRecoverableError
26
27 # it is safe to assume that consul agent is at consul:8500
28 # define consul alis in /etc/hosts on cloudify manager vm
29 # $ cat /etc/hosts
30 # 127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4 consul
31
32 CONSUL_SERVICE_URL = "http://consul:8500/v1/catalog/service/{0}"
33 CONSUL_KV_MASK = "http://consul:8500/v1/kv/{0}"
34
35
36 def discover_service_url(service_name):
37     """find the service record in consul"""
38     service_url = CONSUL_SERVICE_URL.format(service_name)
39     ctx.logger.info("getting service_url at {0}".format(service_url))
40
41     try:
42         response = requests.get(service_url, timeout=60)
43     except requests.ConnectionError as ex:
44         raise NonRecoverableError(
45             "ConnectionError - failed to get {0}: {1}".format(service_url, str(ex)))
46
47     ctx.logger.info("got {0} for service_url at {1} response: {2}"
48                     .format(response.status_code, service_url, response.text))
49
50     if response.status_code != requests.codes.ok:
51         return
52
53     resp_json = response.json()
54     if resp_json:
55         service = resp_json[0]
56         return "http://{0}:{1}".format(service["ServiceAddress"], service["ServicePort"])
57
58
59 def discover_value(key):
60     """get the value for the key from consul-kv"""
61     kv_url = CONSUL_KV_MASK.format(key)
62     ctx.logger.info("getting kv at {0}".format(kv_url))
63
64     try:
65         response = requests.get(kv_url, timeout=60)
66     except requests.ConnectionError as ex:
67         raise NonRecoverableError(
68             "ConnectionError - failed to get {0}: {1}".format(kv_url, str(ex)))
69
70     ctx.logger.info("got {0} for kv at {1} response: {2}"
71                     .format(response.status_code, kv_url, response.text))
72
73     if response.status_code != requests.codes.ok:
74         return
75
76     data = response.json()
77     if not data:
78         ctx.logger.error("failed discover_value %s", key)
79         return
80     value = base64.b64decode(data[0]["Value"]).decode("utf-8")
81     ctx.logger.info("consul-kv key=%s value(%s) data=%s",
82                     key, value, json.dumps(data))
83     return json.loads(value)