X-Git-Url: https://gerrit.onap.org/r/gitweb?p=policy%2Fengine.git;a=blobdiff_plain;f=ONAP-PDP-REST%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fpdp%2Frest%2Fnotifications%2FNotificationServer.java;h=8c962192b53a1d946975089ad3230492db422b78;hp=4c2d1ef577ae1be33fa49fe853c640b0d0fbc8e4;hb=c2ca6ea5cb44103903e1409e8dd6db80167e61e8;hpb=766f999081e73b301062f1b96bffb9c52d7e9852 diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java index 4c2d1ef57..8c962192b 100644 --- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -44,7 +44,7 @@ import org.onap.policy.common.logging.eelf.PolicyLogger; import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; import org.onap.policy.pdp.rest.api.services.NotificationService; -import org.onap.policy.rest.XACMLRestProperties; +import org.onap.policy.rest.XacmlRestProperties; import org.onap.policy.utils.BusPublisher; import org.onap.policy.xacml.api.XACMLErrorConstants; @@ -57,8 +57,8 @@ import com.att.research.xacml.util.XACMLProperties; * The NotificationServer sends the Server Notifications to the Clients once there is any Event. * WebSockets is being used as a medium for sending Notifications. * UEB is being used as a medium for sending Notifications. - * DMAAP is being used as a medium for sending Notifications. - * + * DMAAP is being used as a medium for sending Notifications. + * * @version 0.2 * **/ @@ -67,28 +67,28 @@ public class NotificationServer { private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); private static Queue queue = new ConcurrentLinkedQueue<>(); private static String update = null; - + @OnOpen public void openConnection(Session session) { LOGGER.info("Session Connected: " + session.getId()); queue.add(session); } - + @OnClose public void closeConnection(Session session) { queue.remove(session); } - + @OnError public void error(Session session, Throwable t) { queue.remove(session); LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage()); - + } - + @OnMessage public void message(String message, Session session) { - + if(message.equalsIgnoreCase("Manual")) { try { session.getBasicRemote().sendText(update); @@ -96,7 +96,7 @@ public class NotificationServer { } catch (IOException e) { LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); - } + } } } @@ -115,9 +115,9 @@ public class NotificationServer { LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication "); } - String hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); - String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY); - String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET); + String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); + String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY); + String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET); LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); CambriaBatchingPublisher pub = null; @@ -164,24 +164,24 @@ public class NotificationServer { } } } else if (propNotificationType.equals("dmaap")) { - + // Setting up the Publisher for DMaaP MR - String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); - String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC); + String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); + String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC); String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); - + try { if(dmaapServers==null || topic==null){ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); } - + dmaapServers= dmaapServers.trim(); topic= topic.trim(); aafLogin= aafLogin.trim(); aafPassword= aafPassword.trim(); - + List dmaapList = null; if(dmaapServers.contains(",")) { dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); @@ -189,23 +189,23 @@ public class NotificationServer { dmaapList = new ArrayList<>(); dmaapList.add(dmaapServers); } - - BusPublisher publisher = - new BusPublisher.DmaapPublisherWrapper(dmaapList, - topic, - aafLogin, + + BusPublisher publisher = + new BusPublisher.DmaapPublisherWrapper(dmaapList, + topic, + aafLogin, aafPassword); - + // Sending notification through DMaaP Message Router publisher.send( "MyPartitionKey", notification); LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); publisher.close(); - + } catch (Exception e) { LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); } } - + for(Session session: queue) { try { LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId() + "\n " @@ -216,12 +216,12 @@ public class NotificationServer { LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); } } - + NotificationService.sendNotification(notification); } public static void setUpdate(String update) { NotificationServer.update = update; } - + }