X-Git-Url: https://gerrit.onap.org/r/gitweb?p=policy%2Fengine.git;a=blobdiff_plain;f=ONAP-PDP-REST%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fpdp%2Frest%2Fnotifications%2FNotificationServer.java;h=5c1162a7d95ab1e7e8e077c6143c06e27b6d05e5;hp=8c962192b53a1d946975089ad3230492db422b78;hb=c46961c5322b8113dcf859e07e596bd7e473bc5d;hpb=87b642029080fbbacfb06daba15104f988ab6be0 diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java index 8c962192b..5c1162a7d 100644 --- a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java @@ -3,6 +3,7 @@ * ONAP-PDP-REST * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +21,10 @@ package org.onap.policy.pdp.rest.notifications; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.research.xacml.util.XACMLProperties; + import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -48,180 +53,200 @@ import org.onap.policy.rest.XacmlRestProperties; import org.onap.policy.utils.BusPublisher; import org.onap.policy.xacml.api.XACMLErrorConstants; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.research.xacml.util.XACMLProperties; - /** - * The NotificationServer sends the Server Notifications to the Clients once there is any Event. - * WebSockets is being used as a medium for sending Notifications. - * UEB is being used as a medium for sending Notifications. - * DMAAP is being used as a medium for sending Notifications. + * The NotificationServer sends the Server Notifications to the Clients once there is any Event. WebSockets is being + * used as a medium for sending Notifications. UEB is being used as a medium for sending Notifications. DMAAP is being + * used as a medium for sending Notifications. * * @version 0.2 * **/ @ServerEndpoint(value = "/notifications") public class NotificationServer { - private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); - private static Queue queue = new ConcurrentLinkedQueue<>(); - private static String update = null; - - @OnOpen - public void openConnection(Session session) { - LOGGER.info("Session Connected: " + session.getId()); - queue.add(session); - } - - @OnClose - public void closeConnection(Session session) { - queue.remove(session); - } - - @OnError - public void error(Session session, Throwable t) { - queue.remove(session); - LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage()); - - } - - @OnMessage - public void message(String message, Session session) { - - if(message.equalsIgnoreCase("Manual")) { - try { - session.getBasicRemote().sendText(update); - session.close(); - } catch (IOException e) { - LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); - } - } - } - - public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws PolicyEngineException, IOException, InterruptedException { - - LOGGER.debug("Notification set to " + propNotificationType); - if (propNotificationType.equals("ueb")){ - - String topic = null; - try { - URL aURL = new URL(pdpURL); - topic = aURL.getHost() + aURL.getPort(); - } catch (MalformedURLException e1) { - pdpURL = pdpURL.replace("/", ""); - topic = pdpURL.replace(":", ""); - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); - PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication "); - } - String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); - String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY); - String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET); - - LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); - CambriaBatchingPublisher pub = null; - try { - if(hosts==null || topic==null || apiKey==null || apiSecret==null){ - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); - throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); - } - - hosts = hosts.trim(); - topic = topic.trim(); - apiKey = apiKey.trim(); - apiSecret = apiSecret.trim(); - pub = new CambriaClientBuilders.PublisherBuilder () - .usingHosts ( hosts ) - .onTopic ( topic ) - .authenticatedBy ( apiKey, apiSecret ) - .build () - ; - - } catch (MalformedURLException e1) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage()); - } catch (GeneralSecurityException e1) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1); - } - if(pub != null){ - try { - pub.send( "MyPartitionKey", notification ); - } catch (IOException e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); - } - // close the publisher. The batching publisher does not send events - // immediately, so you MUST use close to send any remaining messages. - // You provide the amount of time you're willing to wait for the sends - // to succeed before giving up. If any messages are unsent after that time, - // they're returned to your app. You could, for example, persist to disk - // and try again later. - final List stuck = pub.close ( 20, TimeUnit.SECONDS ); - - if (!stuck.isEmpty()){ - LOGGER.error( stuck.size() + " messages unsent" ); - }else{ - LOGGER.info( "Clean exit; all messages sent: " + notification ); - } - } - } else if (propNotificationType.equals("dmaap")) { - - // Setting up the Publisher for DMaaP MR - String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); - String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC); - String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); - String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); - - try { - if(dmaapServers==null || topic==null){ - LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); - throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); - } - - dmaapServers= dmaapServers.trim(); - topic= topic.trim(); - aafLogin= aafLogin.trim(); - aafPassword= aafPassword.trim(); - - List dmaapList = null; - if(dmaapServers.contains(",")) { - dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); - } else { - dmaapList = new ArrayList<>(); - dmaapList.add(dmaapServers); - } - - BusPublisher publisher = - new BusPublisher.DmaapPublisherWrapper(dmaapList, - topic, - aafLogin, - aafPassword); - - // Sending notification through DMaaP Message Router - publisher.send( "MyPartitionKey", notification); - LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); - publisher.close(); - - } catch (Exception e) { - LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); - } - } - - for(Session session: queue) { - try { - LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId() + "\n " - + "PDPUrl is " + pdpURL); - LOGGER.info("NotificationServer: sending text message"); - session.getBasicRemote().sendText(notification); - } catch (IOException e) { - LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); - } - } - - NotificationService.sendNotification(notification); - } - - public static void setUpdate(String update) { - NotificationServer.update = update; - } + private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); + private static Queue queue = new ConcurrentLinkedQueue<>(); + private static String update = null; + + @OnOpen + public void openConnection(Session session) { + LOGGER.info("Session Connected: " + session.getId()); + queue.add(session); + } + + @OnClose + public void closeConnection(Session session) { + queue.remove(session); + } + + /** + * Error callback method. + * @param session The session on which the error occurs + * @param throwable exception thrown on the error callback + */ + @OnError + public void error(Session session, Throwable throwable) { + queue.remove(session); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + + throwable.getMessage()); + } + + /** + * Message callback method. + * @param message the message on the callback + * @param session The session on which the error occurs + */ + @OnMessage + public void message(String message, Session session) { + + if (message.equalsIgnoreCase("Manual")) { + try { + session.getBasicRemote().sendText(update); + session.close(); + } catch (IOException e) { + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: " + + e.getMessage() + e); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); + } + } + } + + /** + * Send a notification. + * @param notification The notification type + * @param propNotificationType Notification type properties + * @param pdpUrl URL of the PDP + * @throws PolicyEngineException on errors from the policy engine + * @throws IOException exceptions on IO errors + * @throws InterruptedException interrupts + */ + public static void sendNotification(String notification, String propNotificationType, String pdpUrl) + throws PolicyEngineException, IOException, InterruptedException { + + LOGGER.debug("Notification set to " + propNotificationType); + if (propNotificationType.equals("ueb")) { + + String topic = null; + try { + URL notificationUrl = new URL(pdpUrl); + topic = notificationUrl.getHost() + notificationUrl.getPort(); + } catch (MalformedURLException e1) { + pdpUrl = pdpUrl.replace("/", ""); + topic = pdpUrl.replace(":", ""); + LOGGER.error( + XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); + PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, + "Error in parsing out pdpURL for UEB notfication "); + } + String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); + String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY); + String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET); + + LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); + CambriaBatchingPublisher pub = null; + try { + if (hosts == null || topic == null || apiKey == null || apiSecret == null) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + + "UEB properties are missing from the property file "); + throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + + "UEB properties are missing from the property file "); + } + + hosts = hosts.trim(); + topic = topic.trim(); + apiKey = apiKey.trim(); + apiSecret = apiSecret.trim(); + pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(hosts).onTopic(topic) + .authenticatedBy(apiKey, apiSecret).build(); + + } catch (MalformedURLException e1) { + LOGGER.error( + XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage()); + } catch (GeneralSecurityException e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + + e1.getMessage() + e1); + } + if (pub != null) { + try { + pub.send("MyPartitionKey", notification); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + + e.getMessage() + e); + } + // close the publisher. The batching publisher does not send events + // immediately, so you MUST use close to send any remaining messages. + // You provide the amount of time you're willing to wait for the sends + // to succeed before giving up. If any messages are unsent after that time, + // they're returned to your app. You could, for example, persist to disk + // and try again later. + final List stuck = pub.close(20, TimeUnit.SECONDS); + + if (!stuck.isEmpty()) { + LOGGER.error(stuck.size() + " messages unsent"); + } else { + LOGGER.info("Clean exit; all messages sent: " + notification); + } + } + } else if (propNotificationType.equals("dmaap")) { + + // Setting up the Publisher for DMaaP MR + String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS); + String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC); + String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); + String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); + + try { + if (dmaapServers == null || topic == null) { + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + + "DMaaP properties are missing from the property file "); + throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + + "DMaaP properties are missing from the property file "); + } + + dmaapServers = dmaapServers.trim(); + topic = topic.trim(); + aafLogin = aafLogin.trim(); + aafPassword = aafPassword.trim(); + + List dmaapList = null; + if (dmaapServers.contains(",")) { + dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); + } else { + dmaapList = new ArrayList<>(); + dmaapList.add(dmaapServers); + } + + BusPublisher publisher = + new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword); + + // Sending notification through DMaaP Message Router + publisher.send("MyPartitionKey", notification); + LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); + publisher.close(); + + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + + e.getMessage() + e); + } + } + + for (Session session : queue) { + try { + LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId() + + "\n " + "PDPUrl is " + pdpUrl); + LOGGER.info("NotificationServer: sending text message"); + session.getBasicRemote().sendText(notification); + } catch (IOException e) { + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: " + + e.getMessage() + e); + } + } + + NotificationService.sendNotification(notification); + } + + public static void setUpdate(String update) { + NotificationServer.update = update; + } }