Fixes for sonar critical issues
[policy/engine.git] / ONAP-PDP-REST / src / main / java / org / onap / policy / pdp / rest / notifications / ManualNotificationUpdateThread.java
index c130657..9027e27 100644 (file)
@@ -44,186 +44,205 @@ import com.att.research.xacml.util.XACMLProperties;
 @SuppressWarnings("deprecation")
 public class ManualNotificationUpdateThread implements Runnable {
 
-       private static final Logger LOGGER      = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
-
-       private static String topic = null;
-       private static CambriaConsumer CConsumer = null;
-       private static String clusterList = null;
-       private static String update = null;
-       private static BusConsumer dmaapConsumer = null;
-       private static List<String> dmaapList = null;
-       private static String propNotificationType = null;
-       private static String aafLogin = null;
-       private static String aafPassword = null;
-       
-       public volatile boolean isRunning = false;
-       
-       public synchronized boolean isRunning() {
-               return this.isRunning;
-       }
-       
-       public synchronized void terminate() {
-               this.isRunning = false;
-       }
-       
-       /**
-        * 
-        * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
-        * 
-        */
-       @Override
-       public void run() {
-               synchronized(this) {
-                       this.isRunning = true;
-               }
-               
-               URL aURL = null;
-               String group =  UUID.randomUUID ().toString ();
-               String id = "0";
-               String returnTopic = null;
-               propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
-               if ("ueb".equals(propNotificationType)){
-                       try {
-                               clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim();
-                               String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
-                               aURL = new URL(url);
-                               topic = aURL.getHost() + aURL.getPort();
-                       } catch (NumberFormatException e) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
-                               this.isRunning = false;
-                       } catch (MalformedURLException e) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
-                       }
-                       if(aURL != null){
-                               String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
-                               SendMessage(consumerTopic, "Starting-Topic");
-                               final LinkedList<String> urlList = new LinkedList<> ();
-                               for ( String u : clusterList.split ( "," ) ){
-                                       urlList.add ( u );
-                               }
-
-                               try {
-                                       CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
-                               } catch (MalformedURLException | GeneralSecurityException e1) {
-                                       LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
-                               }
-
-                               while (this.isRunning()) {
-                                       LOGGER.debug("While loop test _ take out ");
-                                       try {
-                                               for ( String msg : CConsumer.fetch () ){                
-                                                       LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
-                                                       returnTopic = processMessage(msg);
-                                                       if(returnTopic != null){
-                                                               SendMessage(returnTopic, update);
-                                                       }
-                                               }
-                                       } catch (IOException e) {
-                                               LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e);
-                                       }
-                               }
-                               LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");      
-                       }
-               } else if ("dmaap".equals(propNotificationType)) {
-                       String dmaapServers = null;
-                       try {
-                               dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
-                               topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
-                               aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
-                               aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
-                       } catch (Exception e) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
-                               this.isRunning = false;
-                       } 
-                       
-                       if(dmaapServers==null || topic==null){
-                               LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
-                               try {
-                                       throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
-                               } catch (Exception e) {
-                                       LOGGER.error(e);
-                               }
-                       }
-                       
-                       dmaapServers.trim();
-                       topic.trim();
-                       aafLogin.trim();
-                       aafPassword.trim();
-                       
-                       String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
-                       SendMessage(consumerTopic, "Starting-Topic");
-                       dmaapList = new ArrayList<>();
-                       for ( String u : dmaapServers.split ( "," ) ){
-                               dmaapList.add ( u );
-                       }
-                       
-                       try {
-                               
-                               dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000);
-                       } catch (Exception e1) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
-                       }
-
-                       while (this.isRunning()) {
-                               LOGGER.debug("While loop test _ take out ");
-                               try {
-                                       for ( String msg : dmaapConsumer.fetch () ){            
-                                               LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
-                                               returnTopic = processMessage(msg);
-                                               if(returnTopic != null){
-                                                       SendMessage(returnTopic, update);
-                                               }
-                                       }
-                               }catch (Exception e) {
-                                       LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e);                          }
-                       }
-                       LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");    
-               }
-       }
-
-       private void SendMessage( String topic, String message) {
-               CambriaPublisher pub = null;
-               BusPublisher publisher = null;
-               try {
-                       if ("ueb".equals(propNotificationType)) {
-                               pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
-                               pub.send( "pdpReturnMessage", message );
-                               LOGGER.debug("Sending Message to UEB topic: " + topic);
-                               pub.close();
-                               
-                       } else if ("dmaap".equals(propNotificationType)){
-                               publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword);
-                               publisher.send( "pdpReturnMessage", message );
-                               LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
-                               publisher.close();
-                       }
-
-               } catch (Exception e) {
-                       LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
-               }
-               if(pub != null){
-                       try {
-                               pub.send( "pdpReturnMessage", message );
-                               LOGGER.debug("Sending to Message to tpoic" + topic);
-                               pub.close();
-                       } catch (IOException e) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update" +e);
-                       }       
-               }               
-       }
-
-       private String processMessage(String msg) {
-               LOGGER.debug("notification message:  " + msg);
-               String[] UID = msg.split("=")[1].split("\"");
-               
-               String returnTopic = topic + UID[0];
-               if(msg.contains("Starting-Topic")){
-                       return null;
-               }
-               return returnTopic;
-       }
-       public static void setUpdate(String update) {
-               ManualNotificationUpdateThread.update = update;
-       }
-       
+    private static final Logger LOGGER = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
+
+    private String topic = null;
+    private CambriaConsumer cConsumer = null;
+    private static String clusterList = null;
+    private static String update = null;
+    private BusConsumer dmaapConsumer = null;
+    private List<String> dmaapList = null;
+    private static String propNotificationType = null;
+    private static String aafLogin = null;
+    private static String aafPassword = null;
+
+    public volatile boolean isRunning = false;
+
+    public synchronized boolean isRunning() {
+        return this.isRunning;
+    }
+
+    public synchronized void terminate() {
+        this.isRunning = false;
+    }
+
+    /**
+     * 
+     * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
+     * 
+     */
+    @Override
+    public void run() {
+        synchronized (this) {
+            this.isRunning = true;
+        }
+
+        URL aURL = null;
+        String group = UUID.randomUUID().toString();
+        String id = "0";
+        String returnTopic = null;
+        setPropNotification();
+        if ("ueb".equals(propNotificationType)) {
+            try {
+                setCluster();
+                String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
+                aURL = new URL(url);
+                topic = aURL.getHost() + aURL.getPort();
+            } catch (NumberFormatException e) {
+                LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
+                this.isRunning = false;
+            } catch (MalformedURLException e) {
+                LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
+                        + "Error in processing URL to create topic for Notification ", e);
+            }
+            if (aURL != null) {
+                String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
+                sendMessage(consumerTopic, "Starting-Topic");
+                final LinkedList<String> urlList = new LinkedList<>();
+                for (String u : clusterList.split(",")) {
+                    urlList.add(u);
+                }
+
+                try {
+                    cConsumer = CambriaClientFactory.createConsumer(null, urlList, consumerTopic, group, id, 20 * 1000,
+                            1000);
+                } catch (MalformedURLException | GeneralSecurityException e1) {
+                    LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
+                }
+
+                while (this.isRunning()) {
+                    LOGGER.debug("While loop test _ take out ");
+                    try {
+                        for (String msg : cConsumer.fetch()) {
+                            LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
+                            returnTopic = processMessage(msg);
+                            if (returnTopic != null) {
+                                sendMessage(returnTopic, update);
+                            }
+                        }
+                    } catch (IOException e) {
+                        LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e);
+                    }
+                }
+                LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");
+            }
+        } else if ("dmaap".equals(propNotificationType)) {
+            String dmaapServers = null;
+            try {
+                dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+                topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
+                setAAFCreds();
+            } catch (Exception e) {
+                LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
+                this.isRunning = false;
+            }
+
+            if (dmaapServers == null || topic == null) {
+                LOGGER.error(
+                        XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+            }
+
+            dmaapServers = dmaapServers.trim();
+            topic = topic.trim();
+
+            String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
+            sendMessage(consumerTopic, "Starting-Topic");
+            dmaapList = new ArrayList<>();
+            for (String u : dmaapServers.split(",")) {
+                dmaapList.add(u);
+            }
+
+            try {
+                dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword,
+                        group, id, 20 * 1000, 1000);
+            } catch (Exception e1) {
+                LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
+            }
+
+            while (this.isRunning()) {
+                LOGGER.debug("While loop test _ take out ");
+                try {
+                    for (String msg : dmaapConsumer.fetch()) {
+                        LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
+                        returnTopic = processMessage(msg);
+                        if (returnTopic != null) {
+                            sendMessage(returnTopic, update);
+                        }
+                    }
+                } catch (Exception e) {
+                    LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e);
+                }
+            }
+            LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");
+        }
+    }
+
+    private static void setAAFCreds() {
+        aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+        aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+        if (aafLogin != null) {
+            aafLogin = aafLogin.trim();
+        }
+        if (aafPassword != null) {
+            aafPassword = aafPassword.trim();
+        }
+    }
+
+    private static void setCluster() {
+        clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+        if (clusterList != null) {
+            clusterList = clusterList.trim();
+        }
+    }
+
+    private static void setPropNotification() {
+        propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
+    }
+
+    private void sendMessage(String topic, String message) {
+        CambriaPublisher pub = null;
+        BusPublisher publisher = null;
+        try {
+            if ("ueb".equals(propNotificationType)) {
+                pub = CambriaClientFactory.createSimplePublisher(null, clusterList, topic);
+                pub.send("pdpReturnMessage", message);
+                LOGGER.debug("Sending Message to UEB topic: " + topic);
+                pub.close();
+
+            } else if ("dmaap".equals(propNotificationType)) {
+                publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
+                publisher.send("pdpReturnMessage", message);
+                LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
+                publisher.close();
+            }
+
+        } catch (Exception e) {
+            LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update: ", e);
+        }
+        if (pub != null) {
+            try {
+                pub.send("pdpReturnMessage", message);
+                LOGGER.debug("Sending to Message to tpoic" + topic);
+                pub.close();
+            } catch (IOException e) {
+                LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e);
+            }
+        }
+    }
+
+    private String processMessage(String msg) {
+        LOGGER.debug("notification message:  " + msg);
+        String[] uID = msg.split("=")[1].split("\"");
+
+        String returnTopic = topic + uID[0];
+        if (msg.contains("Starting-Topic")) {
+            return null;
+        }
+        return returnTopic;
+    }
+
+    public static void setUpdate(String update) {
+        ManualNotificationUpdateThread.update = update;
+    }
+
 }