2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.pdp.rest.notifications;
23 import java.io.IOException;
24 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.ArrayList;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.UUID;
32 import org.onap.policy.common.logging.flexlogger.FlexLogger;
33 import org.onap.policy.common.logging.flexlogger.Logger;
34 import org.onap.policy.rest.XACMLRestProperties;
35 import org.onap.policy.utils.BusConsumer;
36 import org.onap.policy.utils.BusPublisher;
37 import org.onap.policy.xacml.api.XACMLErrorConstants;
39 import com.att.nsa.cambria.client.CambriaClientFactory;
40 import com.att.nsa.cambria.client.CambriaConsumer;
41 import com.att.nsa.cambria.client.CambriaPublisher;
42 import com.att.research.xacml.util.XACMLProperties;
44 @SuppressWarnings("deprecation")
45 public class ManualNotificationUpdateThread implements Runnable {
47 private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
49 private String topic = null;
50 private CambriaConsumer cConsumer = null;
51 private static String clusterList = null;
52 private static String update = null;
53 private BusConsumer dmaapConsumer = null;
54 private List<String> dmaapList = null;
55 private static String propNotificationType = null;
56 private static String aafLogin = null;
57 private static String aafPassword = null;
59 public volatile boolean isRunning = false;
61 public synchronized boolean isRunning() {
62 return this.isRunning;
65 public synchronized void terminate() {
66 this.isRunning = false;
71 * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
77 this.isRunning = true;
81 String group = UUID.randomUUID().toString();
83 String returnTopic = null;
84 setPropNotification();
85 if ("ueb".equals(propNotificationType)) {
88 String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
90 topic = aURL.getHost() + aURL.getPort();
91 } catch (NumberFormatException e) {
92 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
93 this.isRunning = false;
94 } catch (MalformedURLException e) {
95 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
96 + "Error in processing URL to create topic for Notification ", e);
99 String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
100 sendMessage(consumerTopic, "Starting-Topic");
101 final LinkedList<String> urlList = new LinkedList<>();
102 for (String u : clusterList.split(",")) {
107 cConsumer = CambriaClientFactory.createConsumer(null, urlList, consumerTopic, group, id, 20 * 1000,
109 } catch (MalformedURLException | GeneralSecurityException e1) {
110 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
113 while (this.isRunning()) {
114 LOGGER.debug("While loop test _ take out ");
116 for (String msg : cConsumer.fetch()) {
117 LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
118 returnTopic = processMessage(msg);
119 if (returnTopic != null) {
120 sendMessage(returnTopic, update);
123 } catch (IOException e) {
124 LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e);
127 LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
129 } else if ("dmaap".equals(propNotificationType)) {
130 String dmaapServers = null;
132 dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
133 topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
135 } catch (Exception e) {
136 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
137 this.isRunning = false;
140 if (dmaapServers == null || topic == null) {
142 XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
145 dmaapServers = dmaapServers.trim();
146 topic = topic.trim();
148 String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
149 sendMessage(consumerTopic, "Starting-Topic");
150 dmaapList = new ArrayList<>();
151 for (String u : dmaapServers.split(",")) {
156 dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword,
157 group, id, 20 * 1000, 1000);
158 } catch (Exception e1) {
159 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
162 while (this.isRunning()) {
163 LOGGER.debug("While loop test _ take out ");
165 for (String msg : dmaapConsumer.fetch()) {
166 LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
167 returnTopic = processMessage(msg);
168 if (returnTopic != null) {
169 sendMessage(returnTopic, update);
172 } catch (Exception e) {
173 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e);
176 LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");
180 private static void setAAFCreds() {
181 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
182 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
183 if (aafLogin != null) {
184 aafLogin = aafLogin.trim();
186 if (aafPassword != null) {
187 aafPassword = aafPassword.trim();
191 private static void setCluster() {
192 clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
193 if (clusterList != null) {
194 clusterList = clusterList.trim();
198 private static void setPropNotification() {
199 propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
202 private void sendMessage(String topic, String message) {
203 CambriaPublisher pub = null;
204 BusPublisher publisher = null;
206 if ("ueb".equals(propNotificationType)) {
207 pub = CambriaClientFactory.createSimplePublisher(null, clusterList, topic);
208 pub.send("pdpReturnMessage", message);
209 LOGGER.debug("Sending Message to UEB topic: " + topic);
212 } else if ("dmaap".equals(propNotificationType)) {
213 publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
214 publisher.send("pdpReturnMessage", message);
215 LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
219 } catch (Exception e) {
220 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update: ", e);
224 pub.send("pdpReturnMessage", message);
225 LOGGER.debug("Sending to Message to tpoic" + topic);
227 } catch (IOException e) {
228 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e);
233 private String processMessage(String msg) {
234 LOGGER.debug("notification message: " + msg);
235 String[] uID = msg.split("=")[1].split("\"");
237 String returnTopic = topic + uID[0];
238 if (msg.contains("Starting-Topic")) {
244 public static void setUpdate(String update) {
245 ManualNotificationUpdateThread.update = update;