2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.pdp.rest.notifications;
 
  23 import java.io.IOException;
 
  24 import java.net.MalformedURLException;
 
  26 import java.security.GeneralSecurityException;
 
  27 import java.util.ArrayList;
 
  28 import java.util.LinkedList;
 
  29 import java.util.List;
 
  30 import java.util.UUID;
 
  32 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
 
  33 import org.openecomp.policy.common.logging.flexlogger.Logger;
 
  34 import org.openecomp.policy.rest.XACMLRestProperties;
 
  35 import org.openecomp.policy.utils.BusConsumer;
 
  36 import org.openecomp.policy.utils.BusPublisher;
 
  37 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
 
  39 import com.att.nsa.cambria.client.CambriaClientFactory;
 
  40 import com.att.nsa.cambria.client.CambriaConsumer;
 
  41 import com.att.nsa.cambria.client.CambriaPublisher;
 
  42 import com.att.research.xacml.util.XACMLProperties;
 
  44 @SuppressWarnings("deprecation")
 
  45 public class ManualNotificationUpdateThread implements Runnable {
 
  47         private static final Logger LOGGER      = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
 
  49         private static String topic = null;
 
  50         private static CambriaConsumer CConsumer = null;
 
  51         private static String clusterList = null;
 
  52         private static String update = null;
 
  53         private static BusConsumer dmaapConsumer = null;
 
  54         private static List<String> dmaapList = null;
 
  55         private static String propNotificationType = null;
 
  56         private static String aafLogin = null;
 
  57         private static String aafPassword = null;
 
  59         public volatile boolean isRunning = false;
 
  61         public synchronized boolean isRunning() {
 
  62                 return this.isRunning;
 
  65         public synchronized void terminate() {
 
  66                 this.isRunning = false;
 
  71          * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
 
  77                         this.isRunning = true;
 
  81                 String group =  UUID.randomUUID ().toString ();
 
  83                 String returnTopic = null;
 
  84                 propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
 
  85                 if ("ueb".equals(propNotificationType)){
 
  87                                 clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim();
 
  88                                 String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
 
  90                                 topic = aURL.getHost() + aURL.getPort();
 
  91                         } catch (NumberFormatException e) {
 
  92                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
 
  93                                 this.isRunning = false;
 
  94                         } catch (MalformedURLException e) {
 
  95                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
 
  98                         String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
 
  99                         SendMessage(consumerTopic, "Starting-Topic");
 
 100                         final LinkedList<String> urlList = new LinkedList<String> ();
 
 101                         for ( String u : clusterList.split ( "," ) ){
 
 106                                 CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
 
 107                         } catch (MalformedURLException | GeneralSecurityException e1) {
 
 108                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
 
 111                         while (this.isRunning()) {
 
 112                                 LOGGER.debug("While loop test _ take out ");
 
 114                                         for ( String msg : CConsumer.fetch () ){                
 
 115                                                 LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
 
 116                                                 returnTopic = processMessage(msg);
 
 117                                                 if(returnTopic != null){
 
 118                                                         SendMessage(returnTopic, update);
 
 121                                 } catch (IOException e) {
 
 122                                         LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
 
 125                         LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");      
 
 126                 } else if ("dmaap".equals(propNotificationType)) {
 
 127                         String dmaapServers = null;
 
 129                                 dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
 
 130                                 topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
 
 131                                 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
 
 132                                 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
 
 133                         } catch (Exception e) {
 
 134                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
 
 135                                 this.isRunning = false;
 
 138                         if(dmaapServers==null || topic==null){
 
 139                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
 
 141                                         throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
 
 142                                 } catch (Exception e) {
 
 152                         String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
 
 153                         SendMessage(consumerTopic, "Starting-Topic");
 
 154                         dmaapList = new ArrayList<String>();
 
 155                         for ( String u : dmaapServers.split ( "," ) ){
 
 161                                 dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000);
 
 162                         } catch (Exception e1) {
 
 163                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
 
 166                         while (this.isRunning()) {
 
 167                                 LOGGER.debug("While loop test _ take out ");
 
 169                                         for ( String msg : dmaapConsumer.fetch () ){            
 
 170                                                 LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
 
 171                                                 returnTopic = processMessage(msg);
 
 172                                                 if(returnTopic != null){
 
 173                                                         SendMessage(returnTopic, update);
 
 176                                 }catch (Exception e) {
 
 177                                         LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e);                          }
 
 179                         LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");    
 
 183         private void SendMessage( String topic, String message) {
 
 184                 CambriaPublisher pub = null;
 
 185                 BusPublisher publisher = null;
 
 187                         if ("ueb".equals(propNotificationType)) {
 
 188                                 pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
 
 189                                 pub.send( "pdpReturnMessage", message );
 
 190                                 LOGGER.debug("Sending Message to UEB topic: " + topic);
 
 193                         } else if ("dmaap".equals(propNotificationType)){
 
 194                                 publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword);
 
 195                                 publisher.send( "pdpReturnMessage", message );
 
 196                                 LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
 
 200                 } catch (Exception e) {
 
 201                         LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
 
 205                         pub.send( "pdpReturnMessage", message );
 
 206                         LOGGER.debug("Sending to Message to tpoic" + topic);
 
 207                 } catch (IOException e) {
 
 208                         LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
 
 213         private String processMessage(String msg) {
 
 214                 LOGGER.debug("notification message:  " + msg);
 
 215                 String[] UID = msg.split("=")[1].split("\"");
 
 217                 String returnTopic = topic + UID[0];
 
 218                 if(msg.contains("Starting-Topic")){
 
 223         public static void setUpdate(String update) {
 
 224                 ManualNotificationUpdateThread.update = update;