X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ONAP-PDP-REST%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fpdp%2Frest%2Fnotifications%2FNotificationServer.java;fp=ONAP-PDP-REST%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fpdp%2Frest%2Fnotifications%2FNotificationServer.java;h=690d8c517d6014dfe955d8249fa99aa209e8c6ce;hb=073cc188efe9abb4c010cf674e34e2cf46ef1c52;hp=0000000000000000000000000000000000000000;hpb=4ca818fdfb9b807562166800a086b413593d6894;p=policy%2Fengine.git diff --git a/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java new file mode 100644 index 000000000..690d8c517 --- /dev/null +++ b/ONAP-PDP-REST/src/main/java/org/onap/policy/pdp/rest/notifications/NotificationServer.java @@ -0,0 +1,224 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP-PDP-REST + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pdp.rest.notifications; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.GeneralSecurityException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.ServerEndpoint; + +import org.onap.policy.common.logging.eelf.MessageCodes; +import org.onap.policy.common.logging.eelf.PolicyLogger; +import org.onap.policy.common.logging.flexlogger.FlexLogger; +import org.onap.policy.common.logging.flexlogger.Logger; +import org.onap.policy.pdp.rest.api.services.NotificationService; +import org.onap.policy.rest.XACMLRestProperties; +import org.onap.policy.utils.BusPublisher; +import org.onap.policy.xacml.api.XACMLErrorConstants; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.research.xacml.util.XACMLProperties; + + +/** + * The NotificationServer sends the Server Notifications to the Clients once there is any Event. + * WebSockets is being used as a medium for sending Notifications. + * UEB is being used as a medium for sending Notifications. + * DMAAP is being used as a medium for sending Notifications. + * + * @version 0.2 + * + **/ +@ServerEndpoint(value = "/notifications") +public class NotificationServer { + private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class); + private static Queue queue = new ConcurrentLinkedQueue<>(); + private static String update = null; + private static String hosts = null; + private static URL aURL = null; + + @OnOpen + public void openConnection(Session session) { + LOGGER.info("Session Connected: " + session.getId()); + queue.add(session); + } + + @OnClose + public void closeConnection(Session session) { + queue.remove(session); + } + + @OnError + public void error(Session session, Throwable t) { + queue.remove(session); + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage()); + + } + + @OnMessage + public void Message(String message, Session session) { + + if(message.equalsIgnoreCase("Manual")) { + try { + session.getBasicRemote().sendText(update); + session.close(); + } catch (IOException e) { + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update"); + } + } + } + + public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception { + + LOGGER.debug("Notification set to " + propNotificationType); + if (propNotificationType.equals("ueb")){ + + String topic = null; + try { + aURL = new URL(pdpURL); + topic = aURL.getHost() + aURL.getPort(); + } catch (MalformedURLException e1) { + pdpURL = pdpURL.replace("/", ""); + topic = pdpURL.replace(":", ""); + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication "); + PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication "); + } + hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY); + String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET); + + LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic); + CambriaBatchingPublisher pub = null; + try { + if(hosts==null || topic==null || apiKey==null || apiSecret==null){ + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); + throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file "); + } + + hosts.trim(); + topic.trim(); + apiKey.trim(); + apiSecret.trim(); + pub = new CambriaClientBuilders.PublisherBuilder () + .usingHosts ( hosts ) + .onTopic ( topic ) + .authenticatedBy ( apiKey, apiSecret ) + .build () + ; + + } catch (MalformedURLException e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage()); + } catch (GeneralSecurityException e1) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1); + } + if(pub != null){ + try { + pub.send( "MyPartitionKey", notification ); + } catch (IOException e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); + } + // close the publisher. The batching publisher does not send events + // immediately, so you MUST use close to send any remaining messages. + // You provide the amount of time you're willing to wait for the sends + // to succeed before giving up. If any messages are unsent after that time, + // they're returned to your app. You could, for example, persist to disk + // and try again later. + final List stuck = pub.close ( 20, TimeUnit.SECONDS ); + + if (!stuck.isEmpty()){ + LOGGER.error( stuck.size() + " messages unsent" ); + }else{ + LOGGER.info( "Clean exit; all messages sent: " + notification ); + } + } + } else if (propNotificationType.equals("dmaap")) { + + // Setting up the Publisher for DMaaP MR + String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); + String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC); + String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); + String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); + + try { + if(dmaapServers==null || topic==null){ + LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); + } + + dmaapServers.trim(); + topic.trim(); + aafLogin.trim(); + aafPassword.trim(); + + List dmaapList = null; + if(dmaapServers.contains(",")) { + dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); + } else { + dmaapList = new ArrayList<>(); + dmaapList.add(dmaapServers); + } + + BusPublisher publisher = + new BusPublisher.DmaapPublisherWrapper(dmaapList, + topic, + aafLogin, + aafPassword); + + // Sending notification through DMaaP Message Router + publisher.send( "MyPartitionKey", notification); + LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); + publisher.close(); + + } catch (Exception e) { + LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e); + } + } + + for(Session session: queue) { + try { + session.getBasicRemote().sendText(notification); + } catch (IOException e) { + LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e); + } + } + NotificationService.sendNotification(notification); + } + + public static void setUpdate(String update) { + NotificationServer.update = update; + } + +}