1 # ============LICENSE_START====================================================
3 # =============================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # =============================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END======================================================
20 from cloudify import ctx
21 from cloudify.decorators import operation
22 from cloudify.exceptions import NonRecoverableError
23 from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST
24 from dmaapplugin.dmaaputils import random_string
25 from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
26 from consulif.consulif import ConsulHandle
28 # Lifecycle operations for DMaaP Data Router
29 # publish and subscribe relationships
32 def add_dr_publisher(**kwargs):
34 Sets up the source of the publishes_relationship as a publisher to the feed that
35 is the target of the relationship
36 Assumes target (the feed) has the following runtime properties set
40 Assumes source (the publisher) has a runtime property whose name matches the node name of the feed.
41 This is a dictionary containing one property:
42 - location (the dcaeLocationName to pass when adding the publisher to the feed)
43 Generates a user name and password that the publisher will need to use when publishing
44 Adds the following properties to the dictionary above:
51 # Make sure we have a name under which to store DMaaP configuration
52 # Check early so we don't needlessly create DMaaP entities
53 if 'service_component_name' not in ctx.source.instance.runtime_properties:
54 raise Exception("Source node does not have 'service_component_name' in runtime_properties")
56 target_feed = ctx.target.node.id
57 ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed))
59 # Set up the parameters for the add_publisher request to the DMaaP bus controller
60 feed_id = ctx.target.instance.runtime_properties["feed_id"]
61 location = ctx.source.instance.runtime_properties[target_feed]["location"]
62 username = random_string(8)
63 password = random_string(16)
65 # Make the request to add the publisher to the feed
66 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
67 add_pub = dmc.add_publisher(feed_id, location, username, password)
68 add_pub.raise_for_status()
69 publisher_info = add_pub.json()
70 publisher_id = publisher_info["pubId"]
71 ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password))
73 # Set runtime properties on the source
74 ctx.source.instance.runtime_properties[target_feed] = {
75 "publisher_id" : publisher_id,
76 "location" : location,
77 "publish_url" : ctx.target.instance.runtime_properties["publish_url"],
78 "log_url" : ctx.target.instance.runtime_properties["log_url"],
79 "username" : username,
84 ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
85 cpy = dict(ctx.source.instance.runtime_properties[target_feed])
86 ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
88 except Exception as e:
89 ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
90 raise NonRecoverableError(e)
94 def delete_dr_publisher(**kwargs):
96 Deletes publisher (the source of the publishes_files relationship)
97 from the feed (the target of the relationship).
98 Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties,
99 when the publisher was added to the feed.
103 # Make sure we have a name under which to store DMaaP configuration
104 # Check early so we don't needlessly create DMaaP entities
105 if 'service_component_name' not in ctx.source.instance.runtime_properties:
106 raise Exception("Source node does not have 'service_component_name' in runtime_properties")
108 # Get the publisher id
109 target_feed = ctx.target.node.id
110 publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"]
111 ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id))
114 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
115 del_result = dmc.delete_publisher(publisher_id)
116 del_result.raise_for_status()
118 ctx.logger.info("Deleted publisher {0}".format(publisher_id))
120 # Attempt to remove the entire ":dmaap" entry from the Consul KV store
121 # Will quietly do nothing if the entry has already been removed
122 ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
123 ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
125 except Exception as e:
126 ctx.logger.error("Error deleting publisher: {er}".format(er=e))
127 # don't raise a NonRecoverable error here--let the uninstall workflow continue
131 def add_dr_subscriber(**kwargs):
133 Sets up the source of the subscribes_to_files relationship as a subscriber to the
134 feed that is the target of the relationship.
135 Assumes target (the feed) has the following runtime property set
137 Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed.
138 This is a dictionary containing the following properties:
139 - location (the dcaeLocationName to pass when adding the publisher to the feed)
140 - delivery_url (the URL to which data router will deliver files)
141 - username (the username data router will use when delivering files)
142 - password (the password data router will use when delivering files)
143 Adds a property to the dictionary above:
144 - subscriber_id (used to delete the subscriber in the uninstall workflow
147 target_feed = ctx.target.node.id
148 ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed))
150 # Get the parameters for the call
151 feed_id = ctx.target.instance.runtime_properties["feed_id"]
152 feed = ctx.source.instance.runtime_properties[target_feed]
153 location = feed["location"]
154 delivery_url = feed["delivery_url"]
155 username = feed["username"]
156 password = feed["password"]
157 decompress = feed["decompress"] if "decompress" in feed else False
158 privileged = feed["privileged"] if "privileged" in feed else False
160 # Make the request to add the subscriber to the feed
161 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
162 add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged)
163 add_sub.raise_for_status()
164 subscriber_info = add_sub.json()
165 subscriber_id = subscriber_info["subId"]
166 ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location))
168 # Add subscriber_id to the runtime properties
169 # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id
170 ctx.source.instance.runtime_properties[target_feed] = {
171 "subscriber_id": subscriber_id,
172 "location" : location,
173 "delivery_url" : delivery_url,
174 "username" : username,
175 "password" : password,
176 "decompress": decompress,
177 "privilegedSubscriber": privileged
179 ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))
182 ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
183 cpy = dict(ctx.source.instance.runtime_properties[target_feed])
184 ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
186 except Exception as e:
187 ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
188 raise NonRecoverableError(e)
192 def delete_dr_subscriber(**kwargs):
194 Deletes subscriber (the source of the subscribes_to_files relationship)
195 from the feed (the target of the relationship).
196 Assumes that the source node's runtime properties dictionary for the target feed
197 includes 'subscriber_id', set when the publisher was added to the feed.
200 # Get the subscriber id
201 target_feed = ctx.target.node.id
202 subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"]
203 ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed))
206 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
207 del_result = dmc.delete_subscriber(subscriber_id)
208 del_result.raise_for_status()
210 ctx.logger.info("Deleted subscriber {0}".format(subscriber_id))
212 # Attempt to remove the entire ":dmaap" entry from the Consul KV store
213 # Will quietly do nothing if the entry has already been removed
214 ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
215 ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
217 except Exception as e:
218 ctx.logger.error("Error deleting subscriber: {er}".format(er=e))
219 # don't raise a NonRecoverable error here--let the uninstall workflow continue