move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dmaap / dmaapplugin / dr_bridge.py
1 # ============LICENSE_START====================================================
2 # org.onap.dcaegen2
3 # =============================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # =============================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END======================================================
19
20 from cloudify import ctx
21 from cloudify.decorators import operation
22 from cloudify.exceptions import NonRecoverableError
23 from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS
24 from dmaapplugin.dmaaputils import random_string
25 from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
26
27 # Set up a subscriber to a source feed
28 def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw):
29     # Add subscriber to source feed
30     add_sub = dmc.add_subscriber(source_feed_id, loc, delivery_url, username, userpw)
31     add_sub.raise_for_status()
32     return add_sub.json()
33
34 # Set up a publisher to a target feed
35 def _set_up_publisher(dmc, target_feed_id, loc):
36     username = random_string(8)
37     userpw = random_string(16)
38     add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw)
39     add_pub.raise_for_status()
40     pub_info = add_pub.json()
41     return pub_info["pubId"], username, userpw
42
43 # Get a central location to use when creating a publisher or subscriber
44 def _get_central_location(dmc):
45     locations = dmc.get_dcae_central_locations()
46     if len(locations) < 1:
47         raise Exception('No central location found for setting up DR bridging')
48     return locations[0]          # We take the first one.  Typically there will be two central locations
49
50
51 # Set up a "bridge" between two feeds internal to DCAE
52 # A source feed "bridges_to" a target feed, meaning that anything published to
53 # the source feed will be delivered to subscribers to the target feed (as well as
54 # to subscribers of the source feed).
55 #
56 # The bridge is established by first adding a publisher to the target feed.  The result of doing this
57 # is a publish URL and a set of publication credentials.
58 #The publish URL and publication credentials are used to set up a subscriber to the source feed.
59 #I.e., we tell the source feed to deliver to an endpoint which is actually a publish
60 # endpoint for the target feed.
61 @operation
62 def create_dr_bridge(**kwargs):
63
64     try:
65
66         # Get source and target feed ids
67         if 'feed_id' in ctx.target.instance.runtime_properties:
68             target_feed_id = ctx.target.instance.runtime_properties['feed_id']
69         else:
70             raise Exception('Target feed has no feed_id property')
71         if 'feed_id' in ctx.source.instance.runtime_properties:
72             source_feed_id = ctx.source.instance.runtime_properties['feed_id']
73         else:
74             raise Exception('Source feed has no feed_id property')
75
76         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
77
78         # Get a location to use when creating a publisher or subscriber--a central location seems reasonable
79         loc = _get_central_location(dmc)
80
81         ctx.logger.info('Creating bridge from feed {0} to feed {1} using location {2}'.format(source_feed_id, target_feed_id, loc))
82
83         # Add publisher to target feed
84         publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
85         ctx.logger.info("Added publisher id {0} to  target feed {1} with user {2}".format(publisher_id, target_feed_id, username))
86
87         # Add subscriber to source feed
88         delivery_url = ctx.target.instance.runtime_properties['publish_url']
89         subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw)
90         subscriber_id = subscriber_info["subId"]
91         ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, delivery_url))
92
93         # Save the publisher and subscriber IDs on the source node, indexed by the target node id
94         ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "subscriber_id": subscriber_id}
95
96     except Exception as e:
97         ctx.logger.error("Error creating bridge: {0}".format(e))
98         raise NonRecoverableError(e)
99
100 # Set up a bridge from an internal DCAE feed to a feed in an external Data Router system
101 # The target feed needs to be provisioned in the external Data Router system.  A publisher
102 # to that feed must also be set up in the external Data Router system.  The publish URL,
103 # username, and password need to be captured in a target node of type dcae.nodes.ExternalTargetFeed.
104 # The bridge is established by setting up a subscriber to the internal DCAE source feed using the
105 # external feed publisher parameters as delivery parameters for the subscriber.
106 @operation
107 def create_external_dr_bridge(**kwargs):
108     try:
109
110         # Make sure target feed has full set of properties
111         if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties:
112             url = ctx.target.node.properties['url']
113             username = ctx.target.node.properties['username']
114             userpw = ctx.target.node.properties['userpw']
115         else:
116             raise Exception ("Target feed missing url, username, and/or user pw")
117
118         # Make sure source feed has a feed ID
119         if 'feed_id' in ctx.source.instance.runtime_properties:
120             source_feed_id = ctx.source.instance.runtime_properties['feed_id']
121         else:
122             raise Exception('Source feed has no feed_id property')
123
124         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
125
126         # Get a central location to use when creating subscriber
127         loc = _get_central_location(dmc)
128
129         ctx.logger.info('Creating external bridge from feed {0} to external url {1} using location {2}'.format(source_feed_id, url, loc))
130
131         # Create subscription to source feed using properties of the external target feed
132         subscriber_info = _set_up_subscriber(dmc, source_feed_id, loc, url, username, userpw)
133         subscriber_id = subscriber_info["subId"]
134         ctx.logger.info("Added subscriber id {0} to source feed {1} with delivery url {2}".format(subscriber_id, source_feed_id, url))
135
136         # Save the subscriber ID on the source node, indexed by the target node id
137         ctx.source.instance.runtime_properties[ctx.target.node.id] = {"subscriber_id": subscriber_id}
138
139     except Exception as e:
140         ctx.logger.error("Error creating external bridge: {0}".format(e))
141         raise NonRecoverableError(e)
142
143 # Set up a bridge from a feed in an external Data Router system to an internal DCAE feed.
144 # The bridge is established by creating a publisher on the internal DCAE feed.  Then a subscription
145 # to the external feed is created through manual provisioning in the external Data Router system, using
146 # the publish URL and the publisher username and password for the internal feed as the delivery parameters
147 # for the external subscription.
148 # In order to obtain the publish URL, publisher username, and password, a blueprint using this sort of
149 # bridge will typically have an output that exposes the runtime_property set on the source node in this operation.
150 @operation
151 def create_external_source_dr_bridge(**kwargs):
152     try:
153         # Get target feed id
154         if 'feed_id' in ctx.target.instance.runtime_properties:
155             target_feed_id = ctx.target.instance.runtime_properties['feed_id']
156         else:
157             raise Exception('Target feed has no feed_id property')
158
159         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
160
161         # Get a central location to use when creating a publisher
162         loc = _get_central_location(dmc)
163
164         # Create a publisher on the target feed
165         publisher_id, username, userpw = _set_up_publisher(dmc, target_feed_id, loc)
166
167         # Save the publisher info on the source node, indexed by the target node
168         ctx.source.instance.runtime_properties[ctx.target.node.id] = {"publisher_id": publisher_id, "url": ctx.target.instance.runtime_properties["publish_url"], "username": username, "userpw": userpw}
169
170     except Exception as e:
171         ctx.logger.error("Error creating external source bridge: {0}".format(e))
172
173 # Remove the bridge between the relationship source and target.
174 # For a bridge between 2 internal feeds, deletes the subscriber on the source feed and the publisher on the target feed.
175 # For a bridge to an external target feed, deletes the subscriber on the source feed.
176 # For a bridge from an external source feed, deletes the publisher on the target feed.
177 @operation
178 def remove_dr_bridge(**kwargs):
179     try:
180
181         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
182
183         if ctx.target.node.id in ctx.source.instance.runtime_properties:
184
185             if 'subscriber_id' in ctx.source.instance.runtime_properties[ctx.target.node.id]:
186                 # Delete the subscription for this bridge
187                 ctx.logger.info("Removing bridge -- deleting subscriber {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id']))
188                 dmc.delete_subscriber(ctx.source.instance.runtime_properties[ctx.target.node.id]['subscriber_id'])
189
190             if 'publisher_id' in ctx.source.instance.runtime_properties:
191                 # Delete the publisher for this bridge
192                 ctx.logger.info("Removing bridge -- deleting publisher {0}".format(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id']))
193                 dmc.delete_publisher(ctx.source.instance.runtime_properties[ctx.target.node.id]['publisher_id'])
194
195         ctx.logger.info("Remove bridge from {0} to {1}".format(ctx.source.node.id, ctx.target.node.id))
196
197     except Exception as e:
198         ctx.logger.error("Error removing bridge: {0}".format(e))
199         # Let the uninstall workflow proceed--don't throw a NonRecoverableError