move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dmaap / dmaapplugin / mr_relationships.py
1 # ============LICENSE_START====================================================
2 # org.onap.dcaegen2
3 # =============================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # =============================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END======================================================
18
19 from cloudify import ctx
20 from cloudify.decorators import operation
21 from cloudify.exceptions import NonRecoverableError
22 from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, DMAAP_OWNER, CONSUL_HOST
23 from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
24 from consulif.consulif import ConsulHandle
25
26 # Message router relationship operations
27
28 def _add_mr_client(ctype, actions):
29     '''
30     Adds the node represented by 'source' as a client (publisher or subscriber) to
31     to topic represented by the 'target' node.  The list of actions in 'actions'
32     determines whether the client is a subscriber or a publisher.
33
34     Assumes target (the topic) has the following runtime property set
35         - fqtn
36     Assumes source (the client) has a runtime property whose name matches the node name of the feed.
37     This is a dictionary containing the following properties:
38         - location   (the dcaeLocationName to pass when adding the client to the topic)
39         - client_role (the AAF client role under which the client will access the topic)
40     Adds two properties to the dictionary above:
41         - topic_url (the URL that the client can use to access the topic)
42         - client_id  (used to delete the client in the uninstall workflow)
43     '''
44     try:
45         # Make sure we have a name under which to store DMaaP configuration
46         # Check early so we don't needlessly create DMaaP entities
47         if 'service_component_name' not in ctx.source.instance.runtime_properties:
48             raise Exception("Source node does not have 'service_component_name' in runtime_properties")
49
50         target_topic = ctx.target.node.id           # Key for the source's dictionary with topic-related info
51         fqtn = ctx.target.instance.runtime_properties["fqtn"]
52         ctx.logger.info("Attempting to add {0} as {1} to topic {2}".format(ctx.source.node.id, ctype, fqtn))
53
54         # Get the parameters needed for adding the client
55         location = ctx.source.instance.runtime_properties[target_topic]["location"]
56         client_role = ctx.source.instance.runtime_properties[target_topic]["client_role"]
57
58         # Make the request to add the client to the topic
59         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
60         c = dmc.create_client(fqtn, location, client_role, actions)
61         c.raise_for_status()
62         client_info = c.json()
63         client_id = client_info["mrClientId"]
64         topic_url = client_info["topicURL"]
65
66         # Update source's runtime properties
67         #ctx.source.instance.runtime_properties[target_topic]["topic_url"] = topic_url
68         #ctx.source.instance.runtime_properties[target_topic]["client_id"] = client_id
69         ctx.source.instance.runtime_properties[target_topic] = {
70             "topic_url" : topic_url,
71             "client_id" : client_id,
72             "location" : location,
73             "client_role" : client_role
74         }
75
76         ctx.logger.info("Added {0} id {1} to feed {2} at {3}".format(ctype, client_id, fqtn, location))
77
78         # Set key in Consul
79         ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
80         ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_topic, ctx.source.instance.runtime_properties[target_topic])
81
82     except Exception as e:
83         ctx.logger.error("Error adding client to feed: {er}".format(er=e))
84         raise NonRecoverableError(e)
85
86 @operation
87 def add_mr_publisher(**kwargs):
88     _add_mr_client("publisher", ["view", "pub"])
89
90 @operation
91 def add_mr_subscriber(**kwargs):
92         _add_mr_client("subscriber", ["view", "sub"])
93
94 @operation
95 def delete_mr_client(**kwargs):
96     '''
97     Delete the client (publisher or subscriber).
98     Expect property 'client_id' to have been set in the instance's runtime_properties
99     when the client was created.
100     '''
101     try:
102         target_topic = ctx.target.node.id
103         client_id = ctx.source.instance.runtime_properties[target_topic]["client_id"]
104         ctx.logger.info("Attempting to delete client {0} ".format(client_id))
105         dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
106         c = dmc.delete_client(client_id)
107         c.raise_for_status()
108
109         ctx.logger.info("Deleted client {0}".format(client_id))
110
111         # Attempt to remove the entire ":dmaap" entry from the Consul KV store
112         # Will quietly do nothing if the entry has already been removed
113         ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
114         ch.delete_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']))
115
116     except Exception as e:
117         ctx.logger.error("Error deleting MR client: {er}".format(er=e))
118         # don't raise a NonRecoverable error here--let the uninstall workflow continue
119