X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ECOMP-PDP-REST%2Fsrc%2Fmain%2Fjava%2Forg%2Fopenecomp%2Fpolicy%2Fpdp%2Frest%2Fnotifications%2FManualNotificationUpdateThread.java;h=a1d0ece83bbf8e55227d9114ad20110ce13bb7ab;hb=685ed1545ed28b777a3ba6e7d315b78f355154cb;hp=27d6b6f76167472507a169e2ff2cec0e22bacf9e;hpb=fc5c07705edc4dcb7083b39116a43844bb6a1490;p=policy%2Fengine.git diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java index 27d6b6f76..a1d0ece83 100644 --- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java +++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/ManualNotificationUpdateThread.java @@ -94,35 +94,36 @@ public class ManualNotificationUpdateThread implements Runnable { } catch (MalformedURLException e) { LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e); } - - String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest"; - SendMessage(consumerTopic, "Starting-Topic"); - final LinkedList urlList = new LinkedList<> (); - for ( String u : clusterList.split ( "," ) ){ - urlList.add ( u ); - } - - try { - CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 ); - } catch (MalformedURLException | GeneralSecurityException e1) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1); - } + if(aURL != null){ + String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest"; + SendMessage(consumerTopic, "Starting-Topic"); + final LinkedList urlList = new LinkedList<> (); + for ( String u : clusterList.split ( "," ) ){ + urlList.add ( u ); + } - while (this.isRunning()) { - LOGGER.debug("While loop test _ take out "); try { - for ( String msg : CConsumer.fetch () ){ - LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : "); - returnTopic = processMessage(msg); - if(returnTopic != null){ - SendMessage(returnTopic, update); + CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 ); + } catch (MalformedURLException | GeneralSecurityException e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1); + } + + while (this.isRunning()) { + LOGGER.debug("While loop test _ take out "); + try { + for ( String msg : CConsumer.fetch () ){ + LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : "); + returnTopic = processMessage(msg); + if(returnTopic != null){ + SendMessage(returnTopic, update); + } } + } catch (IOException e) { + LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message" + e); } - } catch (IOException e) { - LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message"); } + LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster"); } - LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster"); } else if ("dmaap".equals(propNotificationType)) { String dmaapServers = null; try { @@ -200,14 +201,15 @@ public class ManualNotificationUpdateThread implements Runnable { } catch (Exception e) { LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e); } - - try { - pub.send( "pdpReturnMessage", message ); - LOGGER.debug("Sending to Message to tpoic" + topic); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update"); - } - pub.close(); + if(pub != null){ + try { + pub.send( "pdpReturnMessage", message ); + LOGGER.debug("Sending to Message to tpoic" + topic); + pub.close(); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update" +e); + } + } } private String processMessage(String msg) {