[Policy-20] getConfig & Policy resolved blockers
[policy/engine.git] / ECOMP-PDP-REST / src / main / java / org / openecomp / policy / pdp / rest / notifications / ManualNotificationUpdateThread.java
index 27d6b6f..a1d0ece 100644 (file)
@@ -94,35 +94,36 @@ public class ManualNotificationUpdateThread implements Runnable {
                        } catch (MalformedURLException e) {
                                LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
                        }
-                       
-                       String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
-                       SendMessage(consumerTopic, "Starting-Topic");
-                       final LinkedList<String> urlList = new LinkedList<> ();
-                       for ( String u : clusterList.split ( "," ) ){
-                               urlList.add ( u );
-                       }
-                       
-                       try {
-                               CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
-                       } catch (MalformedURLException | GeneralSecurityException e1) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
-                       }
+                       if(aURL != null){
+                               String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
+                               SendMessage(consumerTopic, "Starting-Topic");
+                               final LinkedList<String> urlList = new LinkedList<> ();
+                               for ( String u : clusterList.split ( "," ) ){
+                                       urlList.add ( u );
+                               }
 
-                       while (this.isRunning()) {
-                               LOGGER.debug("While loop test _ take out ");
                                try {
-                                       for ( String msg : CConsumer.fetch () ){                
-                                               LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
-                                               returnTopic = processMessage(msg);
-                                               if(returnTopic != null){
-                                                       SendMessage(returnTopic, update);
+                                       CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
+                               } catch (MalformedURLException | GeneralSecurityException e1) {
+                                       LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
+                               }
+
+                               while (this.isRunning()) {
+                                       LOGGER.debug("While loop test _ take out ");
+                                       try {
+                                               for ( String msg : CConsumer.fetch () ){                
+                                                       LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
+                                                       returnTopic = processMessage(msg);
+                                                       if(returnTopic != null){
+                                                               SendMessage(returnTopic, update);
+                                                       }
                                                }
+                                       } catch (IOException e) {
+                                               LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e);
                                        }
-                               } catch (IOException e) {
-                                       LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
                                }
+                               LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");      
                        }
-                       LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");      
                } else if ("dmaap".equals(propNotificationType)) {
                        String dmaapServers = null;
                        try {
@@ -200,14 +201,15 @@ public class ManualNotificationUpdateThread implements Runnable {
                } catch (Exception e) {
                        LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
                }
-
-               try {
-                       pub.send( "pdpReturnMessage", message );
-                       LOGGER.debug("Sending to Message to tpoic" + topic);
-               } catch (IOException e) {
-                       LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
-               }       
-               pub.close();            
+               if(pub != null){
+                       try {
+                               pub.send( "pdpReturnMessage", message );
+                               LOGGER.debug("Sending to Message to tpoic" + topic);
+                               pub.close();
+                       } catch (IOException e) {
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update" +e);
+                       }       
+               }               
        }
 
        private String processMessage(String msg) {