move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dmaap / dmaapplugin / dr_relationships.py
1 # ============LICENSE_START====================================================
2 # org.onap.dcaegen2
3 # =============================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # =============================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END======================================================
19
20 from cloudify import ctx
21 from cloudify.decorators import operation
22 from cloudify.exceptions import NonRecoverableError
23 from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, CONSUL_HOST
24 from dmaapplugin.dmaaputils import random_string
25 from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
26 from consulif.consulif import ConsulHandle
27
28 # Lifecycle operations for DMaaP Data Router
29 # publish and subscribe relationships
30
31 @operation
32 def add_dr_publisher(**kwargs):
33     '''
34     Sets up the source of the publishes_relationship as a publisher to the feed that
35     is the target of the relationship
36         Assumes target (the feed) has the following runtime properties set
37             - feed_id
38             - log_url
39             - publish_url
40         Assumes source (the publisher) has a runtime property whose name matches the node name of the feed.
41         This is a dictionary containing one property:
42             - location   (the dcaeLocationName to pass when adding the publisher to the feed)
43         Generates a user name and password that the publisher will need to use when publishing
44         Adds the following properties to the dictionary above:
45              - publish_url
46              - log_url
47              - username
48              - password
49     '''
50     try:
51         # Make sure we have a name under which to store DMaaP configuration
52         # Check early so we don't needlessly create DMaaP entities
53         if 'service_component_name' not in ctx.source.instance.runtime_properties:
54             raise Exception("Source node does not have 'service_component_name' in runtime_properties")
55
56         target_feed = ctx.target.node.id
57         ctx.logger.info("Attempting to add publisher {0} to feed {1}".format(ctx.source.node.id, target_feed))
58
59         # Set up the parameters for the add_publisher request to the DMaaP bus controller
60         feed_id = ctx.target.instance.runtime_properties["feed_id"]
61         location = ctx.source.instance.runtime_properties[target_feed]["location"]
62         username = random_string(8)
63         password = random_string(16)
64
65         # Make the request to add the publisher to the feed
66         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
67         add_pub = dmc.add_publisher(feed_id, location, username, password)
68         add_pub.raise_for_status()
69         publisher_info = add_pub.json()
70         publisher_id = publisher_info["pubId"]
71         ctx.logger.info("Added publisher id {0} to feed {1} at {2}, with user {3}, pass {4}".format(publisher_id, feed_id, location, username, password))
72
73         # Set runtime properties on the source
74         ctx.source.instance.runtime_properties[target_feed] = {
75            "publisher_id" : publisher_id,
76            "location" : location,
77            "publish_url" : ctx.target.instance.runtime_properties["publish_url"],
78            "log_url" : ctx.target.instance.runtime_properties["log_url"],
79            "username" : username,
80            "password" : password
81         }
82
83         # Set key in Consul
84         ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
85         cpy = dict(ctx.source.instance.runtime_properties[target_feed])
86         ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
87
88     except Exception as e:
89         ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
90         raise NonRecoverableError(e)
91
92
93 @operation
94 def delete_dr_publisher(**kwargs):
95     '''
96     Deletes publisher (the source of the publishes_files relationship)
97     from the feed (the target of the relationship).
98     Assumes that the 'publisher_id' property was added to the dictionary of feed-related properties,
99     when the publisher was added to the feed.
100     '''
101
102     try:
103         # Make sure we have a name under which to store DMaaP configuration
104         # Check early so we don't needlessly create DMaaP entities
105         if 'service_component_name' not in ctx.source.instance.runtime_properties:
106             raise Exception("Source node does not have 'service_component_name' in runtime_properties")
107
108         # Get the publisher id
109         target_feed = ctx.target.node.id
110         publisher_id = ctx.source.instance.runtime_properties[target_feed]["publisher_id"]
111         ctx.logger.info("Attempting to delete publisher {0}".format(publisher_id))
112
113         # Make the request
114         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
115         del_result = dmc.delete_publisher(publisher_id)
116         del_result.raise_for_status()
117
118         ctx.logger.info("Deleted publisher {0}".format(publisher_id))
119
120         # Attempt to remove the entire ":dmaap" entry from the Consul KV store
121         # Will quietly do nothing if the entry has already been removed
122         ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
123         ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
124
125     except Exception as e:
126         ctx.logger.error("Error deleting publisher: {er}".format(er=e))
127         # don't raise a NonRecoverable error here--let the uninstall workflow continue
128
129
130 @operation
131 def add_dr_subscriber(**kwargs):
132     '''
133     Sets up the source of the subscribes_to_files relationship as a subscriber to the
134     feed that is the target of the relationship.
135     Assumes target (the feed) has the following runtime property set
136         - feed_id
137     Assumes source (the subscriber) has a runtime property whose name matches the node name of the feed.
138     This is a dictionary containing the following properties:
139         - location   (the dcaeLocationName to pass when adding the publisher to the feed)
140         - delivery_url (the URL to which data router will deliver files)
141         - username (the username data router will use when delivering files)
142         - password (the password data router will use when delivering files)
143     Adds a property to the dictionary above:
144         - subscriber_id  (used to delete the subscriber in the uninstall workflow
145     '''
146     try:
147         target_feed = ctx.target.node.id
148         ctx.logger.info("Attempting to add subscriber {0} to feed {1}".format(ctx.source.node.id, target_feed))
149
150         # Get the parameters for the call
151         feed_id = ctx.target.instance.runtime_properties["feed_id"]
152         feed = ctx.source.instance.runtime_properties[target_feed]
153         location = feed["location"]
154         delivery_url = feed["delivery_url"]
155         username = feed["username"]
156         password = feed["password"]
157         decompress = feed["decompress"] if "decompress" in feed else False
158         privileged = feed["privileged"] if "privileged" in feed else False
159
160         # Make the request to add the subscriber to the feed
161         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
162         add_sub = dmc.add_subscriber(feed_id, location, delivery_url,username, password, decompress, privileged)
163         add_sub.raise_for_status()
164         subscriber_info = add_sub.json()
165         subscriber_id = subscriber_info["subId"]
166         ctx.logger.info("Added subscriber id {0} to feed {1} at {2}".format(subscriber_id, feed_id, location))
167
168         # Add subscriber_id to the runtime properties
169         # ctx.source.instance.runtime_properties[target_feed]["subscriber_id"] = subscriber_id
170         ctx.source.instance.runtime_properties[target_feed] = {
171             "subscriber_id": subscriber_id,
172             "location" : location,
173             "delivery_url" : delivery_url,
174             "username" : username,
175             "password" : password,
176             "decompress": decompress,
177             "privilegedSubscriber": privileged
178         }
179         ctx.logger.info("on source: {0}".format(ctx.source.instance.runtime_properties[target_feed]))
180
181         # Set key in Consul
182         ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
183         cpy = dict(ctx.source.instance.runtime_properties[target_feed])
184         ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
185
186     except Exception as e:
187         ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
188         raise NonRecoverableError(e)
189
190
191 @operation
192 def delete_dr_subscriber(**kwargs):
193     '''
194     Deletes subscriber (the source of the subscribes_to_files relationship)
195     from the feed (the target of the relationship).
196     Assumes that the source node's runtime properties dictionary for the target feed
197     includes 'subscriber_id', set when the publisher was added to the feed.
198     '''
199     try:
200         # Get the subscriber id
201         target_feed = ctx.target.node.id
202         subscriber_id = ctx.source.instance.runtime_properties[target_feed]["subscriber_id"]
203         ctx.logger.info("Attempting to delete subscriber {0} from feed {1}".format(subscriber_id, target_feed))
204
205         # Make the request
206         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
207         del_result = dmc.delete_subscriber(subscriber_id)
208         del_result.raise_for_status()
209
210         ctx.logger.info("Deleted subscriber {0}".format(subscriber_id))
211
212         # Attempt to remove the entire ":dmaap" entry from the Consul KV store
213         # Will quietly do nothing if the entry has already been removed
214         ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
215         ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
216
217     except Exception as e:
218         ctx.logger.error("Error deleting subscriber: {er}".format(er=e))
219         # don't raise a NonRecoverable error here--let the uninstall workflow continue