2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.pdp.rest.notifications;
23 import java.io.IOException;
24 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.ArrayList;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.UUID;
32 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
33 import org.openecomp.policy.common.logging.flexlogger.Logger;
34 import org.openecomp.policy.rest.XACMLRestProperties;
35 import org.openecomp.policy.utils.BusConsumer;
36 import org.openecomp.policy.utils.BusPublisher;
37 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
39 import com.att.nsa.cambria.client.CambriaClientFactory;
40 import com.att.nsa.cambria.client.CambriaConsumer;
41 import com.att.nsa.cambria.client.CambriaPublisher;
42 import com.att.research.xacml.util.XACMLProperties;
44 @SuppressWarnings("deprecation")
45 public class ManualNotificationUpdateThread implements Runnable {
47 private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
49 private static String topic = null;
50 private static CambriaConsumer CConsumer = null;
51 private static String clusterList = null;
52 private static String update = null;
53 private static BusConsumer dmaapConsumer = null;
54 private static List<String> dmaapList = null;
55 private static String propNotificationType = null;
56 private static String aafLogin = null;
57 private static String aafPassword = null;
59 public volatile boolean isRunning = false;
61 public synchronized boolean isRunning() {
62 return this.isRunning;
65 public synchronized void terminate() {
66 this.isRunning = false;
71 * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
77 this.isRunning = true;
81 String group = UUID.randomUUID ().toString ();
83 String returnTopic = null;
84 propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
85 if ("ueb".equals(propNotificationType)){
87 clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim();
88 String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
90 topic = aURL.getHost() + aURL.getPort();
91 } catch (NumberFormatException e) {
92 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
93 this.isRunning = false;
94 } catch (MalformedURLException e) {
95 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
98 String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
99 SendMessage(consumerTopic, "Starting-Topic");
100 final LinkedList<String> urlList = new LinkedList<> ();
101 for ( String u : clusterList.split ( "," ) ){
106 CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
107 } catch (MalformedURLException | GeneralSecurityException e1) {
108 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
111 while (this.isRunning()) {
112 LOGGER.debug("While loop test _ take out ");
114 for ( String msg : CConsumer.fetch () ){
115 LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
116 returnTopic = processMessage(msg);
117 if(returnTopic != null){
118 SendMessage(returnTopic, update);
121 } catch (IOException e) {
122 LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e);
125 LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
127 } else if ("dmaap".equals(propNotificationType)) {
128 String dmaapServers = null;
130 dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
131 topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
132 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
133 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
134 } catch (Exception e) {
135 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
136 this.isRunning = false;
139 if(dmaapServers==null || topic==null){
140 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
142 throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
143 } catch (Exception e) {
153 String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
154 SendMessage(consumerTopic, "Starting-Topic");
155 dmaapList = new ArrayList<>();
156 for ( String u : dmaapServers.split ( "," ) ){
162 dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000);
163 } catch (Exception e1) {
164 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
167 while (this.isRunning()) {
168 LOGGER.debug("While loop test _ take out ");
170 for ( String msg : dmaapConsumer.fetch () ){
171 LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
172 returnTopic = processMessage(msg);
173 if(returnTopic != null){
174 SendMessage(returnTopic, update);
177 }catch (Exception e) {
178 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e); }
180 LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");
184 private void SendMessage( String topic, String message) {
185 CambriaPublisher pub = null;
186 BusPublisher publisher = null;
188 if ("ueb".equals(propNotificationType)) {
189 pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
190 pub.send( "pdpReturnMessage", message );
191 LOGGER.debug("Sending Message to UEB topic: " + topic);
194 } else if ("dmaap".equals(propNotificationType)){
195 publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword);
196 publisher.send( "pdpReturnMessage", message );
197 LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
201 } catch (Exception e) {
202 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
206 pub.send( "pdpReturnMessage", message );
207 LOGGER.debug("Sending to Message to tpoic" + topic);
209 } catch (IOException e) {
210 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update" +e);
215 private String processMessage(String msg) {
216 LOGGER.debug("notification message: " + msg);
217 String[] UID = msg.split("=")[1].split("\"");
219 String returnTopic = topic + UID[0];
220 if(msg.contains("Starting-Topic")){
225 public static void setUpdate(String update) {
226 ManualNotificationUpdateThread.update = update;