1 # ============LICENSE_START====================================================
3 # =============================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # =============================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END======================================================
23 from urllib.parse import urlparse
25 from urlparse import urlparse
28 class ConsulHandle(object):
30 Provide access to Consul KV store and service discovery
33 def __init__(self, api_url, user, password, logger):
38 self.ch = consul.Consul(host=u.hostname, port=u.port, scheme=u.scheme)
40 def get_config(self, key):
42 Get configuration information from Consul using the provided key.
43 It should be in JSON form. Convert it to a dictionary
45 (index, val) = self.ch.kv.get(key)
46 config = json.loads(val['Value']) # will raise ValueError if not JSON, let it propagate
49 def get_service(self,service_name):
51 Look up the service named service_name in Consul.
52 Return the service address and port.
54 (index, val) = self.ch.catalog.service(service_name)
55 if len(val) > 0: # catalog.service returns an empty array if service not found
56 service = val[0] # Could be multiple listings, but we take the first
57 if ('ServiceAddress' in service) and (len(service['ServiceAddress']) > 0):
58 service_address = service['ServiceAddress'] # Most services should have this
60 service_address = service['Address'] # "External" services will have this only
61 service_port = service['ServicePort']
63 raise Exception('Could not find service information for "{0}"'.format(service_name))
65 return service_address, service_port
67 def add_to_entry(self, key, add_name, add_value):
70 Treat its value as a JSON string representing a dict.
71 Extend the dict by adding an entry with key 'add_name' and value 'add_value'.
72 Turn the resulting extended dict into a JSON string.
73 Store the string back into Consul under 'key'.
74 Watch out for conflicting concurrent updates.
77 Key 'xyz:dmaap' has the value '{"feed00": {"feed_url" : "http://example.com/feeds/999"}}'
78 add_to_entry('xyz:dmaap', 'topic00', {'topic_url' : 'http://example.com/topics/1229'})
79 should result in the value for key 'xyz:dmaap' in consul being updated to
80 '{"feed00": {"feed_url" : "http://example.com/feeds/999"}, "topic00" : {"topic_url" : "http://example.com/topics/1229"}}'
83 while True: # do until update succeeds
84 (index, val) = self.ch.kv.get(key) # index gives version of key retrieved
86 if val is None: # no key yet
88 mod_index = 0 # Use 0 as the cas index for initial insertion of the key
90 vstring = val['Value']
91 mod_index = val['ModifyIndex']
93 # Build the updated dict
94 # Exceptions just propagate
95 v = json.loads(vstring)
96 v[add_name] = add_value
97 new_vstring = json.dumps(v)
99 updated = self.ch.kv.put(key, new_vstring, cas=mod_index) # if the key has changed since retrieval, this will return false
104 def delete_entry(self,entry_name):
106 Delete an entire key-value entry whose key is 'entry_name' from the Consul KV store.
108 Note that the kv.delete() operation always returns True,
109 whether there's an entry with key 'entry_name' exists or not. This doesn't seem like
110 a great design, but it means it's safe to try to delete the same entry repeatedly.
112 Note also in our application for this plugin, the uninstall workflow will always delete all of the topics and
113 feeds we've stored into the 'component_name:dmaap' entry.
115 Given the two foregoing notes, it is safe for this plugin to attempt to delete the entire
116 'component_name:dmaap' entry any time it performs an 'unlink' operation for a publishes or
117 subscribes relationship. The first unlink will actually remove the entry, the subsequent ones
118 will harmlessly try to remove it again.
120 The 'correct' approach would be to have a delete_from_entry(self, key, delete_name) that fetches
121 the entry from Consul, removes only the topic or feed being unlinked, and then puts the resulting
122 entry back into Consul. It would be very similar to add_from_entry. When there's nothing left
123 in the entry, then the entire entry would be deleted.
125 self.ch.kv.delete(entry_name)