1 # ============LICENSE_START====================================================
3 # =============================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # =============================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END======================================================
24 PUBS_PATH = '/dr_pubs'
25 SUBS_PATH = '/dr_subs'
26 TOPICS_PATH = '/topics'
27 CLIENTS_PATH = '/mr_clients'
28 LOCATIONS_PATH = '/dcaeLocations'
30 class DMaaPControllerHandle(object):
32 A simple wrapper class to map DMaaP bus controller API calls into operations supported by the requests module
35 def __init__(self, api_url, user, password, logger,
36 feeds_path = FEEDS_PATH,
37 pubs_path = PUBS_PATH,
38 subs_path = SUBS_PATH,
39 topics_path = TOPICS_PATH,
40 clients_path = CLIENTS_PATH):
44 self.api_url = api_url # URL for the root of the Controller resource tree, no trailing "/"
45 self.auth = (user, password) # user name and password for HTTP basic auth
47 self.feeds_path = feeds_path
48 self.pubs_path = pubs_path
49 self.subs_path = subs_path
50 self.topics_path = topics_path
51 self.clients_path = clients_path
54 ### INTERNAL FUNCTIONS ###
56 def _make_url(self, path):
58 Make a full URL given the path relative to the root
60 if not path.startswith('/'):
63 return self.api_url + path
65 def _get_resource(self, path):
67 Get the DMaaP resource at path, where path is relative to the root.
69 url = self._make_url(path)
70 self.logger.info("Querying URL: {0}".format(url))
71 return requests.get(url, auth=self.auth)
73 def _create_resource(self, path, resource_content):
75 Create a DMaaP resource by POSTing to the resource collection
76 identified by path (relative to root), using resource_content as the body of the post
78 url = self._make_url(path)
79 self.logger.info("Posting to URL: {0} with body: {1}".format(url, resource_content))
80 return requests.post(url, auth=self.auth, json=resource_content)
82 def _delete_resource(self, path):
84 Delete the DMaaP resource at path, where path is relative to the root.
86 url = self._make_url(path)
87 self.logger.info("Deleting URL: {0}".format(url))
88 return requests.delete(url, auth=self.auth)
93 def create_feed(self, name, version=None, description=None, aspr_class=None, owner=None, useExisting=None):
95 Create a DMaaP data router feed with the given feed name
96 and (optionally) feed version, feed description, ASPR classification,
97 owner, and useExisting flag
99 feed_definition = {'feedName' : name}
101 feed_definition['feedVersion'] = version
103 feed_definition['feedDescription'] = description
105 feed_definition['asprClassification'] = aspr_class
107 feed_definition['owner'] = owner
108 feeds_path_query = self.feeds_path
109 if useExisting == True: # It's a boolean!
110 feeds_path_query += "?useExisting=true"
112 return self._create_resource(feeds_path_query, feed_definition)
114 def get_feed_info(self, feed_id):
116 Get the representation of the DMaaP data router feed whose feed id is feed_id.
118 return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
120 def get_feed_info_by_name(self, feed_name):
122 Get the representation of the DMaaP data router feed whose feed name is feed_name.
124 feeds = self._get_resource("{0}".format(self.feeds_path))
125 feed_list = feeds.json()
126 for feed in feed_list:
127 if feed["feedName"] == feed_name:
128 self.logger.info("Found feed with {0}".format(feed_name))
129 feed_id = feed["feedId"]
130 return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
132 self.logger.info("feed_name {0} not found".format(feed_name))
135 def delete_feed(self, feed_id):
137 Delete the DMaaP data router feed whose feed id is feed_id.
139 return self._delete_resource("{0}/{1}".format(self.feeds_path, feed_id))
141 # Data Router Publishers
142 def add_publisher(self, feed_id, location, username, password, status=None):
144 Add a publisher to feed feed_id at location location with user, pass, and status
146 publisher_definition = {
148 'dcaeLocationName' : location,
149 'username' : username,
154 publisher_definition['status'] = status
156 return self._create_resource(self.pubs_path, publisher_definition)
158 def get_publisher_info(self, pub_id):
160 Get the representation of the DMaaP data router publisher whose publisher id is pub_id
162 return self._get_resource("{0}/{1}".format(self.pubs_path, pub_id))
164 def delete_publisher(self, pub_id):
166 Delete the DMaaP data router publisher whose publisher id is id.
168 return self._delete_resource("{0}/{1}".format(self.pubs_path, pub_id))
171 # Data Router SUbscrihers
172 def add_subscriber(self, feed_id, location, delivery_url, username, password, decompress, privileged, status=None):
174 Add a publisher to feed feed_id at location location with user, pass, and status
176 subscriber_definition = {
178 'dcaeLocationName' : location,
179 'deliveryURL' : delivery_url,
180 'username' : username,
181 'userpwd' : password,
182 'decompress': decompress,
183 'privilegedSubscriber': privileged
187 subscriber_definition['status'] = status
189 return self._create_resource(self.subs_path, subscriber_definition)
191 def get_subscriber_info(self, sub_id):
193 Get the representation of the DMaaP data router subscriber whose subscriber id is sub_id
195 return self._get_resource("{0}/{1}".format(self.subs_path, sub_id))
197 def delete_subscriber(self, sub_id):
199 Delete the DMaaP data router subscriber whose subscriber id is sub_id.
201 return self._delete_resource("{0}/{1}".format(self.subs_path, sub_id))
203 # Message router topics
204 def create_topic(self, name, description = None, txenable = None, owner = None, replication_case = None, global_mr_url = None, useExisting = None):
206 Create a message router topic with the topic name 'name' and optionally the topic_description
207 'description', the 'txenable' flag, the 'useExisting' flag and the topic owner 'owner'.
209 topic_definition = {'topicName' : name}
211 topic_definition['topicDescription'] = description
213 topic_definition['owner'] = owner
214 if txenable != None: # It's a boolean!
215 topic_definition['txenable'] = txenable
217 topic_definition['replicationCase'] = replication_case
219 topic_definition['globalMrURL'] = global_mr_url
220 topics_path_query = self.topics_path
221 if useExisting == True: # It's a boolean!
222 topics_path_query += "?useExisting=true"
224 return self._create_resource(topics_path_query, topic_definition)
226 def get_topic_info(self, fqtn):
228 Get information about the topic whose fully-qualified name is 'fqtn'
230 return self._get_resource("{0}/{1}".format(self.topics_path, fqtn))
232 def get_topic_fqtn_by_name(self, topic_name):
234 Get the representation of the DMaaP message router topic fqtn whose topic name is topic_name.
236 topics = self._get_resource("{0}".format(self.topics_path))
237 topic_list = topics.json()
238 for topic in topic_list:
239 if topic["topicName"] == topic_name:
240 self.logger.info("Found existing topic with name {0}".format(topic_name))
244 self.logger.info("topic_name {0} not found".format(topic_name))
247 def delete_topic(self, fqtn):
249 Delete the topic whose fully qualified name is 'fqtn'
251 return self._delete_resource("{0}/{1}".format(self.topics_path, fqtn))
253 # Message route clients (publishers and subscribers
254 def create_client(self, fqtn, location, client_role, actions):
256 Creates a client authorized to access the topic with fully-qualified name 'fqtn',
257 from the location 'location', using the AAF client role 'client_role'. The
258 client is authorized to perform actions in the list 'actions'. (Valid
259 values are 'pub', 'sub', and 'view'
261 client_definition = {
263 'dcaeLocationName' : location,
264 'clientRole' : client_role,
267 return self._create_resource(self.clients_path, client_definition)
269 def get_client_info(self, client_id):
271 Get client information for the client whose client ID is 'client_id'
273 return self._get_resource("{0}/{1}".format(self.clients_path, client_id))
275 def delete_client(self, client_id):
277 Delete the client whose client ID is 'client_id'
279 return self._delete_resource("{0}/{1}".format(self.clients_path, client_id))
281 def get_dcae_locations(self, dcae_layer):
283 Get the list of location names known to the DMaaP bus controller
284 whose "dcaeLayer" property matches dcae_layer and whose status is "VALID".
286 # Do these as a separate step so things like 404 get reported precisely
287 locations = self._get_resource(LOCATIONS_PATH)
288 locations.raise_for_status()
290 # pull out location names for VALID locations with matching dcae_layer
291 return [location["dcaeLocationName"] for location in locations.json()
292 if location['dcaeLayer'] == dcae_layer
293 and location['status'] == 'VALID']
295 def get_dcae_central_locations(self):
297 Get the list of location names known to the DMaaP bus controller
298 whose "dcaeLayer" property contains "central" (ignoring case)
299 and whose status is "VALID".
300 "dcaeLayer" contains "central" for central sites.
302 # Do these as a separate step so things like 404 get reported precisely
303 locations = self._get_resource(LOCATIONS_PATH)
304 locations.raise_for_status()
306 # pull out location names for VALID central locations
307 return [location["dcaeLocationName"] for location in locations.json()
308 if 'central' in location['dcaeLayer'].lower()
309 and location['status'] == 'VALID']