2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.pdp.rest.notifications;
23 import java.io.IOException;
24 import java.net.MalformedURLException;
26 import java.security.GeneralSecurityException;
27 import java.util.ArrayList;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.UUID;
32 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
33 import org.openecomp.policy.common.logging.flexlogger.Logger;
34 import org.openecomp.policy.rest.XACMLRestProperties;
35 import org.openecomp.policy.utils.BusConsumer;
36 import org.openecomp.policy.utils.BusPublisher;
37 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
39 import com.att.nsa.cambria.client.CambriaClientFactory;
40 import com.att.nsa.cambria.client.CambriaConsumer;
41 import com.att.nsa.cambria.client.CambriaPublisher;
42 import com.att.research.xacml.util.XACMLProperties;
44 @SuppressWarnings("deprecation")
45 public class ManualNotificationUpdateThread implements Runnable {
47 private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
49 private static String topic = null;
50 private static CambriaConsumer CConsumer = null;
51 private static String clusterList = null;
52 private static String update = null;
53 private static BusConsumer dmaapConsumer = null;
54 private static List<String> dmaapList = null;
55 private static String propNotificationType = null;
56 private static String aafLogin = null;
57 private static String aafPassword = null;
59 public volatile boolean isRunning = false;
61 public synchronized boolean isRunning() {
62 return this.isRunning;
65 public synchronized void terminate() {
66 this.isRunning = false;
71 * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
77 this.isRunning = true;
81 String group = UUID.randomUUID ().toString ();
83 String returnTopic = null;
84 propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
85 if ("ueb".equals(propNotificationType)){
87 clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim();
88 String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
90 topic = aURL.getHost() + aURL.getPort();
91 } catch (NumberFormatException e) {
92 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
93 this.isRunning = false;
94 } catch (MalformedURLException e) {
95 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
98 String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
99 SendMessage(consumerTopic, "Starting-Topic");
100 final LinkedList<String> urlList = new LinkedList<String> ();
101 for ( String u : clusterList.split ( "," ) ){
106 CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
107 } catch (MalformedURLException | GeneralSecurityException e1) {
108 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
111 while (this.isRunning()) {
112 LOGGER.debug("While loop test _ take out ");
114 for ( String msg : CConsumer.fetch () ){
115 LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
116 returnTopic = processMessage(msg);
117 if(returnTopic != null){
118 SendMessage(returnTopic, update);
121 } catch (IOException e) {
122 LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
125 LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
126 } else if ("dmaap".equals(propNotificationType)) {
127 String dmaapServers = null;
129 dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
130 topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
131 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
132 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
133 } catch (Exception e) {
134 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
135 this.isRunning = false;
138 if(dmaapServers==null || topic==null){
139 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
141 throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
142 } catch (Exception e) {
152 String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
153 SendMessage(consumerTopic, "Starting-Topic");
154 dmaapList = new ArrayList<String>();
155 for ( String u : dmaapServers.split ( "," ) ){
161 dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000);
162 } catch (Exception e1) {
163 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
166 while (this.isRunning()) {
167 LOGGER.debug("While loop test _ take out ");
169 for ( String msg : dmaapConsumer.fetch () ){
170 LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
171 returnTopic = processMessage(msg);
172 if(returnTopic != null){
173 SendMessage(returnTopic, update);
176 }catch (Exception e) {
177 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e); }
179 LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");
183 private void SendMessage( String topic, String message) {
184 CambriaPublisher pub = null;
185 BusPublisher publisher = null;
187 if ("ueb".equals(propNotificationType)) {
188 pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
189 pub.send( "pdpReturnMessage", message );
190 LOGGER.debug("Sending Message to UEB topic: " + topic);
193 } else if ("dmaap".equals(propNotificationType)){
194 publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword);
195 publisher.send( "pdpReturnMessage", message );
196 LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
200 } catch (Exception e) {
201 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
205 pub.send( "pdpReturnMessage", message );
206 LOGGER.debug("Sending to Message to tpoic" + topic);
207 } catch (IOException e) {
208 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
213 private String processMessage(String msg) {
214 LOGGER.debug("notification message: " + msg);
215 String[] UID = msg.split("=")[1].split("\"");
217 String returnTopic = topic + UID[0];
218 if(msg.contains("Starting-Topic")){
223 public static void setUpdate(String update) {
224 ManualNotificationUpdateThread.update = update;