X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ECOMP-PDP-REST%2Fsrc%2Fmain%2Fjava%2Forg%2Fopenecomp%2Fpolicy%2Fpdp%2Frest%2Fnotifications%2FNotificationServer.java;h=9bd2e1f20b496b521c4192f838ce636c066cd748;hb=d80880b097d08f9ab9dda54355216890a4b345dc;hp=d6cda7491d5b6f48f4727a8976f17b06c400b769;hpb=91d04c64771832a0b8815ffbe1f0f9920320d94d;p=policy%2Fengine.git diff --git a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java index d6cda7491..9bd2e1f20 100644 --- a/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java +++ b/ECOMP-PDP-REST/src/main/java/org/openecomp/policy/pdp/rest/notifications/NotificationServer.java @@ -24,8 +24,12 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import javax.websocket.OnClose; import javax.websocket.OnError; @@ -34,36 +38,40 @@ import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; -import org.openecomp.policy.rest.XACMLRestProperties; import org.openecomp.policy.common.logging.eelf.MessageCodes; import org.openecomp.policy.common.logging.eelf.PolicyLogger; -import com.att.nsa.cambria.client.CambriaClientFactory; -import com.att.nsa.cambria.client.CambriaPublisher; +import org.openecomp.policy.common.logging.flexlogger.FlexLogger; +import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.pdp.rest.api.services.NotificationService; +import org.openecomp.policy.rest.XACMLRestProperties; +import org.openecomp.policy.utils.BusPublisher; import org.openecomp.policy.xacml.api.XACMLErrorConstants; -import com.att.research.xacml.util.XACMLProperties; -import org.openecomp.policy.common.logging.flexlogger.*; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.research.xacml.util.XACMLProperties; /** * The NotificationServer sends the Server Notifications to the Clients once there is any Event. * WebSockets is being used as a medium for sending Notifications. - * UEB is being used as a medium for sending Notifications. + * UEB is being used as a medium for sending Notifications. + * DMAAP is being used as a medium for sending Notifications. * - * @version 0.1 + * @version 0.2 * **/ @ServerEndpoint(value = "/notifications") public class NotificationServer { - private static final Logger logger = FlexLogger.getLogger(NotificationServer.class); - private static Queue queue = new ConcurrentLinkedQueue(); + private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); + private static Queue queue = new ConcurrentLinkedQueue<>(); private static String update = null; private static String hosts = null; private static URL aURL = null; @OnOpen public void openConnection(Session session) { - logger.info("Session Connected: " + session.getId()); + LOGGER.info("Session Connected: " + session.getId()); queue.add(session); } @@ -75,7 +83,7 @@ public class NotificationServer { @OnError public void error(Session session, Throwable t) { queue.remove(session); - logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage()); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage()); } @@ -87,16 +95,17 @@ public class NotificationServer { session.getBasicRemote().sendText(update); session.close(); } catch (IOException e) { - logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage()); - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); } } } - public static void sendNotification(String notification, String propNotificationType, String pdpURL){ + public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception { - logger.debug("Notification set to " + propNotificationType); + LOGGER.debug("Notification set to " + propNotificationType); if (propNotificationType.equals("ueb")){ + String topic = null; try { aURL = new URL(pdpURL); @@ -104,38 +113,112 @@ public class NotificationServer { } catch (MalformedURLException e1) { pdpURL = pdpURL.replace("/", ""); topic = pdpURL.replace(":", ""); - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication "); } - hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER); - logger.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); - CambriaPublisher pub = null; + hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY); + String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET); + + LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); + CambriaBatchingPublisher pub = null; try { - pub = CambriaClientFactory.createSimplePublisher (null, hosts, topic ); + if(hosts==null || topic==null || apiKey==null || apiSecret==null){ + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); + throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); + } + + hosts.trim(); + topic.trim(); + apiKey.trim(); + apiSecret.trim(); + pub = new CambriaClientBuilders.PublisherBuilder () + .usingHosts ( hosts ) + .onTopic ( topic ) + .authenticatedBy ( apiKey, apiSecret ) + .build () + ; + } catch (MalformedURLException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage()); } catch (GeneralSecurityException e1) { - // TODO Auto-generated catch block - e1.printStackTrace(); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1); } + if(pub != null){ + try { + pub.send( "MyPartitionKey", notification ); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); + } + // close the publisher. The batching publisher does not send events + // immediately, so you MUST use close to send any remaining messages. + // You provide the amount of time you're willing to wait for the sends + // to succeed before giving up. If any messages are unsent after that time, + // they're returned to your app. You could, for example, persist to disk + // and try again later. + final List stuck = pub.close ( 20, TimeUnit.SECONDS ); + + if (!stuck.isEmpty()){ + LOGGER.error( stuck.size() + " messages unsent" ); + }else{ + LOGGER.info( "Clean exit; all messages sent: " + notification ); + } + } + } else if (propNotificationType.equals("dmaap")) { + + // Setting up the Publisher for DMaaP MR + String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC); + String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); + String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); + try { - pub.send( "MyPartitionKey", notification ); - } catch (IOException e) { - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"); - } - pub.close(); + if(dmaapServers==null || topic==null){ + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + } + + dmaapServers.trim(); + topic.trim(); + aafLogin.trim(); + aafPassword.trim(); + + List dmaapList = null; + if(dmaapServers.contains(",")) { + dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); + } else { + dmaapList = new ArrayList<>(); + dmaapList.add(dmaapServers); + } + + BusPublisher publisher = + new BusPublisher.DmaapPublisherWrapper(dmaapList, + topic, + aafLogin, + aafPassword); + + // Sending notification through DMaaP Message Router + publisher.send( "MyPartitionKey", notification); + LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); + publisher.close(); + + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); + } } + for(Session session: queue) { try { session.getBasicRemote().sendText(notification); } catch (IOException e) { - logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage()); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); } } + NotificationService.sendNotification(notification); } - + public static void setUpdate(String update) { NotificationServer.update = update; } + }