CHeckstyle and JUnit for base package in ONAP-REST
[policy/engine.git] / ONAP-PDP-REST / src / main / java / org / onap / policy / pdp / rest / notifications / ManualNotificationUpdateThread.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.pdp.rest.notifications;
22
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.net.URL;
26 import java.security.GeneralSecurityException;
27 import java.util.ArrayList;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.UUID;
31
32 import org.onap.policy.common.logging.flexlogger.FlexLogger;
33 import org.onap.policy.common.logging.flexlogger.Logger;
34 import org.onap.policy.rest.XacmlRestProperties;
35 import org.onap.policy.utils.BusConsumer;
36 import org.onap.policy.utils.BusPublisher;
37 import org.onap.policy.xacml.api.XACMLErrorConstants;
38
39 import com.att.nsa.cambria.client.CambriaClientFactory;
40 import com.att.nsa.cambria.client.CambriaConsumer;
41 import com.att.nsa.cambria.client.CambriaPublisher;
42 import com.att.research.xacml.util.XACMLProperties;
43
44 @SuppressWarnings("deprecation")
45 public class ManualNotificationUpdateThread implements Runnable {
46
47     private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
48
49     private String topic = null;
50     private CambriaConsumer cConsumer = null;
51     private static String clusterList = null;
52     private static String update = null;
53     private BusConsumer dmaapConsumer = null;
54     private List<String> dmaapList = null;
55     private static String propNotificationType = null;
56     private static String aafLogin = null;
57     private static String aafPassword = null;
58
59     public volatile boolean isRunning = false;
60
61     public synchronized boolean isRunning() {
62         return this.isRunning;
63     }
64
65     public synchronized void terminate() {
66         this.isRunning = false;
67     }
68
69     /**
70      *
71      * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
72      *
73      */
74     @Override
75     public void run() {
76         synchronized (this) {
77             this.isRunning = true;
78         }
79
80         URL aURL = null;
81         String group = UUID.randomUUID().toString();
82         String id = "0";
83         String returnTopic = null;
84         setPropNotification();
85         if ("ueb".equals(propNotificationType)) {
86             try {
87                 setCluster();
88                 String url = XACMLProperties.getProperty(XacmlRestProperties.PROP_PDP_ID);
89                 aURL = new URL(url);
90                 topic = aURL.getHost() + aURL.getPort();
91             } catch (NumberFormatException e) {
92                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
93                 this.isRunning = false;
94             } catch (MalformedURLException e) {
95                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
96                         + "Error in processing URL to create topic for Notification ", e);
97             }
98             if (aURL != null) {
99                 String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
100                 sendMessage(consumerTopic, "Starting-Topic");
101                 final LinkedList<String> urlList = new LinkedList<>();
102                 for (String u : clusterList.split(",")) {
103                     urlList.add(u);
104                 }
105
106                 try {
107                     cConsumer = CambriaClientFactory.createConsumer(null, urlList, consumerTopic, group, id, 20 * 1000,
108                             1000);
109                 } catch (MalformedURLException | GeneralSecurityException e1) {
110                     LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
111                 }
112
113                 while (this.isRunning()) {
114                     LOGGER.debug("While loop test _ take out ");
115                     try {
116                         for (String msg : cConsumer.fetch()) {
117                             LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
118                             returnTopic = processMessage(msg);
119                             if (returnTopic != null) {
120                                 sendMessage(returnTopic, update);
121                             }
122                         }
123                     } catch (IOException e) {
124                         LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e);
125                     }
126                 }
127                 LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
128             }
129         } else if ("dmaap".equals(propNotificationType)) {
130             String dmaapServers = null;
131             try {
132                 dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
133                 topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC);
134                 setAAFCreds();
135             } catch (Exception e) {
136                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
137                 this.isRunning = false;
138             }
139
140             if (dmaapServers == null || topic == null) {
141                 LOGGER.error(
142                         XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
143             }
144
145             dmaapServers = dmaapServers.trim();
146             topic = topic.trim();
147
148             String consumerTopic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC).trim();
149             sendMessage(consumerTopic, "Starting-Topic");
150             dmaapList = new ArrayList<>();
151             for (String u : dmaapServers.split(",")) {
152                 dmaapList.add(u);
153             }
154
155             try {
156                 dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword,
157                         group, id, 20 * 1000, 1000);
158             } catch (Exception e1) {
159                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
160             }
161
162             while (this.isRunning()) {
163                 LOGGER.debug("While loop test _ take out ");
164                 try {
165                     for (String msg : dmaapConsumer.fetch()) {
166                         LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
167                         returnTopic = processMessage(msg);
168                         if (returnTopic != null) {
169                             sendMessage(returnTopic, update);
170                         }
171                     }
172                 } catch (Exception e) {
173                     LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e);
174                 }
175             }
176             LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");
177         }
178     }
179
180     private static void setAAFCreds() {
181         aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
182         aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
183         if (aafLogin != null) {
184             aafLogin = aafLogin.trim();
185         }
186         if (aafPassword != null) {
187             aafPassword = aafPassword.trim();
188         }
189     }
190
191     private static void setCluster() {
192         clusterList = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
193         if (clusterList != null) {
194             clusterList = clusterList.trim();
195         }
196     }
197
198     private static void setPropNotification() {
199         propNotificationType = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TYPE);
200     }
201
202     private void sendMessage(String topic, String message) {
203         CambriaPublisher pub = null;
204         BusPublisher publisher = null;
205         try {
206             if ("ueb".equals(propNotificationType)) {
207                 pub = CambriaClientFactory.createSimplePublisher(null, clusterList, topic);
208                 pub.send("pdpReturnMessage", message);
209                 LOGGER.debug("Sending Message to UEB topic: " + topic);
210                 pub.close();
211
212             } else if ("dmaap".equals(propNotificationType)) {
213                 publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
214                 publisher.send("pdpReturnMessage", message);
215                 LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
216                 publisher.close();
217             }
218
219         } catch (Exception e) {
220             LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update: ", e);
221         }
222         if (pub != null) {
223             try {
224                 pub.send("pdpReturnMessage", message);
225                 LOGGER.debug("Sending to Message to tpoic" + topic);
226                 pub.close();
227             } catch (IOException e) {
228                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e);
229             }
230         }
231     }
232
233     private String processMessage(String msg) {
234         LOGGER.debug("notification message:  " + msg);
235         String[] uID = msg.split("=")[1].split("\"");
236
237         String returnTopic = topic + uID[0];
238         if (msg.contains("Starting-Topic")) {
239             return null;
240         }
241         return returnTopic;
242     }
243
244     public static void setUpdate(String update) {
245         ManualNotificationUpdateThread.update = update;
246     }
247
248 }