move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dmaap / consulif / consulif.py
1 # ============LICENSE_START====================================================
2 # org.onap.dcaegen2
3 # =============================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # =============================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END======================================================
19
20 import consul
21 import json
22 try:
23     from urllib.parse import urlparse
24 except ImportError:
25     from urlparse import urlparse
26
27
28 class ConsulHandle(object):
29     '''
30     Provide access to Consul KV store and service discovery
31     '''
32
33     def __init__(self, api_url, user, password, logger):
34         '''
35         Constructor
36         '''
37         u = urlparse(api_url)
38         self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme)
39
40     def get_config(self, key):
41         '''
42         Get configuration information from Consul using the provided key.
43         It should be in JSON form.  Convert it to a dictionary
44         '''
45         (index, val) = self.ch.kv.get(key)
46         config = json.loads(val['Value'])        # will raise ValueError if not JSON, let it propagate
47         return config
48
49     def get_service(self,service_name):
50         '''
51         Look up the service named service_name in Consul.
52         Return the service address and port.
53         '''
54         (index, val) = self.ch.catalog.service(service_name)
55         if len(val) > 0:                # catalog.service returns an empty array if service not found
56             service = val[0]            # Could be multiple listings, but we take the first
57             if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0):
58                 service_address = service['ServiceAddress']    # Most services should have this
59             else:
60                 service_address = service['Address']         # "External" services will have this only
61             service_port = service['ServicePort']
62         else:
63             raise Exception('Could not find service information for "{0}"'.format(service_name))
64
65         return service_address, service_port
66
67     def add_to_entry(self, key, add_name, add_value):
68         '''
69         Find 'key' in consul.
70         Treat its value as a JSON string representing a dict.
71         Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
72         Turn the resulting extended dict into a JSON string.
73         Store the string back into Consul under 'key'.
74         Watch out for conflicting concurrent updates.
75
76         Example:
77         Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
78         add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
79         should result in the value for key 'xyz:dmaap' in consul being updated to
80         '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
81         '''
82
83         while True:     # do until update succeeds
84             (index, val) = self.ch.kv.get(key)     # index gives version of key retrieved
85
86             if val is None:     # no key yet
87                 vstring = '{}'
88                 mod_index = 0   # Use 0 as the cas index for initial insertion of the key
89             else:
90                 vstring = val['Value']
91                 mod_index = val['ModifyIndex']
92
93             # Build the updated dict
94             # Exceptions just propagate
95             v = json.loads(vstring)
96             v[add_name] = add_value
97             new_vstring = json.dumps(v)
98
99             updated = self.ch.kv.put(key, new_vstring, cas=mod_index)       # if the key has changed since retrieval, this will return false
100             if updated:
101                 break
102
103
104     def delete_entry(self,entry_name):
105         '''
106         Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store.
107
108         Note that the kv.delete() operation always returns True,
109         whether there's an entry with key 'entry_name' exists or not.  This doesn't seem like
110         a great design, but it means it's safe to try to delete the same entry repeatedly.
111
112         Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and
113         feeds we've stored into the 'component_name:dmaap' entry.
114
115         Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire
116         'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or
117         subscribes relationship.   The first unlink will actually remove the entry, the subsequent ones
118         will harmlessly try to remove it again.
119
120         The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches
121         the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting
122         entry back into Consul.  It would be very similar to add_from_entry.  When there's nothing left
123         in the entry, then the entire entry would be deleted.
124         '''
125         self.ch.kv.delete(entry_name)