Fix knock on tabs/SONAR/Checkstyle issues
[policy/engine.git] / ONAP-PDP-REST / src / main / java / org / onap / policy / pdp / rest / notifications / NotificationServer.java
index 8c96219..5c1162a 100644 (file)
@@ -3,6 +3,7 @@
  * ONAP-PDP-REST
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2019 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.pdp.rest.notifications;
 
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.research.xacml.util.XACMLProperties;
+
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -48,180 +53,200 @@ import org.onap.policy.rest.XacmlRestProperties;
 import org.onap.policy.utils.BusPublisher;
 import org.onap.policy.xacml.api.XACMLErrorConstants;
 
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.research.xacml.util.XACMLProperties;
-
 
 /**
- * The NotificationServer sends the Server Notifications to the Clients once there is any Event.
- * WebSockets is being used as a medium for sending Notifications.
- * UEB is being used as a medium for sending Notifications.
- * DMAAP is being used as a medium for sending Notifications.
+ * The NotificationServer sends the Server Notifications to the Clients once there is any Event. WebSockets is being
+ * used as a medium for sending Notifications. UEB is being used as a medium for sending Notifications. DMAAP is being
+ * used as a medium for sending Notifications.
  *
  * @version 0.2
  *
  **/
 @ServerEndpoint(value = "/notifications")
 public class NotificationServer {
-       private static final Logger LOGGER      = FlexLogger.getLogger(NotificationServer.class);
-       private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
-       private static String update = null;
-
-       @OnOpen
-       public void openConnection(Session session) {
-               LOGGER.info("Session Connected: " + session.getId());
-               queue.add(session);
-       }
-
-       @OnClose
-       public void closeConnection(Session session) {
-               queue.remove(session);
-       }
-
-       @OnError
-       public void error(Session session, Throwable t) {
-               queue.remove(session);
-               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
-
-       }
-
-       @OnMessage
-       public void message(String message, Session session) {
-
-               if(message.equalsIgnoreCase("Manual")) {
-                       try {
-                               session.getBasicRemote().sendText(update);
-                               session.close();
-                       } catch (IOException e) {
-                               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
-                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
-                       }
-               }
-       }
-
-       public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws PolicyEngineException, IOException, InterruptedException {
-
-               LOGGER.debug("Notification set to " + propNotificationType);
-               if (propNotificationType.equals("ueb")){
-
-                       String topic = null;
-                       try {
-                               URL aURL = new URL(pdpURL);
-                               topic = aURL.getHost() + aURL.getPort();
-                       } catch (MalformedURLException e1) {
-                               pdpURL = pdpURL.replace("/", "");
-                               topic = pdpURL.replace(":", "");
-                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
-                               PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
-                       }
-                       String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
-                       String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY);
-                       String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET);
-
-                       LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
-                       CambriaBatchingPublisher pub = null;
-                       try {
-                               if(hosts==null || topic==null || apiKey==null || apiSecret==null){
-                                       LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
-                                       throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
-                               }
-
-                               hosts = hosts.trim();
-                               topic = topic.trim();
-                               apiKey = apiKey.trim();
-                               apiSecret = apiSecret.trim();
-                               pub = new CambriaClientBuilders.PublisherBuilder ()
-                                               .usingHosts ( hosts )
-                                               .onTopic ( topic )
-                                               .authenticatedBy ( apiKey, apiSecret )
-                                               .build ()
-                                               ;
-
-                       } catch (MalformedURLException e1) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
-                       } catch (GeneralSecurityException e1) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1);
-                       }
-                       if(pub != null){
-                               try {
-                                       pub.send( "MyPartitionKey", notification );
-                               } catch (IOException e) {
-                                       LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
-                               }
-                               // close the publisher. The batching publisher does not send events
-                               // immediately, so you MUST use close to send any remaining messages.
-                               // You provide the amount of time you're willing to wait for the sends
-                               // to succeed before giving up. If any messages are unsent after that time,
-                               // they're returned to your app. You could, for example, persist to disk
-                               // and try again later.
-                               final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS );
-
-                               if (!stuck.isEmpty()){
-                                       LOGGER.error( stuck.size() + " messages unsent" );
-                               }else{
-                                       LOGGER.info( "Clean exit; all messages sent: " + notification );
-                               }
-                       }
-               } else if (propNotificationType.equals("dmaap")) {
-
-                       // Setting up the Publisher for DMaaP MR
-                       String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
-                       String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC);
-                       String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
-                       String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
-
-                       try {
-                               if(dmaapServers==null || topic==null){
-                                       LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
-                                       throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
-                               }
-
-                               dmaapServers= dmaapServers.trim();
-                               topic= topic.trim();
-                               aafLogin= aafLogin.trim();
-                               aafPassword= aafPassword.trim();
-
-                               List<String> dmaapList = null;
-                               if(dmaapServers.contains(",")) {
-                                       dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
-                               } else {
-                                       dmaapList = new ArrayList<>();
-                                       dmaapList.add(dmaapServers);
-                               }
-
-                               BusPublisher publisher =
-                                               new BusPublisher.DmaapPublisherWrapper(dmaapList,
-                                                                                              topic,
-                                                                                              aafLogin,
-                                                                                              aafPassword);
-
-                               // Sending notification through DMaaP Message Router
-                               publisher.send( "MyPartitionKey", notification);
-                               LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
-                               publisher.close();
-
-                       } catch (Exception e) {
-                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
-                       }
-               }
-
-               for(Session session: queue) {
-                       try {
-                               LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId() + "\n "
-                                               + "PDPUrl is " + pdpURL);
-                       LOGGER.info("NotificationServer: sending text message");
-                               session.getBasicRemote().sendText(notification);
-                       } catch (IOException e) {
-                               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
-                       }
-               }
-
-               NotificationService.sendNotification(notification);
-       }
-
-       public static void setUpdate(String update) {
-               NotificationServer.update = update;
-       }
+    private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
+    private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
+    private static String update = null;
+
+    @OnOpen
+    public void openConnection(Session session) {
+        LOGGER.info("Session Connected: " + session.getId());
+        queue.add(session);
+    }
+
+    @OnClose
+    public void closeConnection(Session session) {
+        queue.remove(session);
+    }
+
+    /**
+     * Error callback method.
+     * @param session The session on which the error occurs
+     * @param throwable exception thrown on the error callback
+     */
+    @OnError
+    public void error(Session session, Throwable throwable) {
+        queue.remove(session);
+        LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: "
+                + throwable.getMessage());
+    }
+
+    /**
+     * Message callback method.
+     * @param message the message on the callback
+     * @param session The session on which the error occurs
+     */
+    @OnMessage
+    public void message(String message, Session session) {
+
+        if (message.equalsIgnoreCase("Manual")) {
+            try {
+                session.getBasicRemote().sendText(update);
+                session.close();
+            } catch (IOException e) {
+                LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "
+                        + e.getMessage() + e);
+                LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
+            }
+        }
+    }
+
+    /**
+     * Send a notification.
+     * @param notification The notification type
+     * @param propNotificationType Notification type properties
+     * @param pdpUrl URL of the PDP
+     * @throws PolicyEngineException on errors from the policy engine
+     * @throws IOException exceptions on IO errors
+     * @throws InterruptedException interrupts
+     */
+    public static void sendNotification(String notification, String propNotificationType, String pdpUrl)
+            throws PolicyEngineException, IOException, InterruptedException {
+
+        LOGGER.debug("Notification set to " + propNotificationType);
+        if (propNotificationType.equals("ueb")) {
+
+            String topic = null;
+            try {
+                URL notificationUrl = new URL(pdpUrl);
+                topic = notificationUrl.getHost() + notificationUrl.getPort();
+            } catch (MalformedURLException e1) {
+                pdpUrl = pdpUrl.replace("/", "");
+                topic = pdpUrl.replace(":", "");
+                LOGGER.error(
+                        XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
+                PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1,
+                        "Error in parsing out pdpURL for UEB notfication ");
+            }
+            String hosts = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
+            String apiKey = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_KEY);
+            String apiSecret = XACMLProperties.getProperty(XacmlRestProperties.PROP_UEB_API_SECRET);
+
+            LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
+            CambriaBatchingPublisher pub = null;
+            try {
+                if (hosts == null || topic == null || apiKey == null || apiSecret == null) {
+                    LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
+                            + "UEB properties are missing from the property file ");
+                    throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE
+                            + "UEB properties are missing from the property file ");
+                }
+
+                hosts = hosts.trim();
+                topic = topic.trim();
+                apiKey = apiKey.trim();
+                apiSecret = apiSecret.trim();
+                pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(hosts).onTopic(topic)
+                        .authenticatedBy(apiKey, apiSecret).build();
+
+            } catch (MalformedURLException e1) {
+                LOGGER.error(
+                        XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
+            } catch (GeneralSecurityException e1) {
+                LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher"
+                        + e1.getMessage() + e1);
+            }
+            if (pub != null) {
+                try {
+                    pub.send("MyPartitionKey", notification);
+                } catch (IOException e) {
+                    LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"
+                            + e.getMessage() + e);
+                }
+                // close the publisher. The batching publisher does not send events
+                // immediately, so you MUST use close to send any remaining messages.
+                // You provide the amount of time you're willing to wait for the sends
+                // to succeed before giving up. If any messages are unsent after that time,
+                // they're returned to your app. You could, for example, persist to disk
+                // and try again later.
+                final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
+
+                if (!stuck.isEmpty()) {
+                    LOGGER.error(stuck.size() + " messages unsent");
+                } else {
+                    LOGGER.info("Clean exit; all messages sent: " + notification);
+                }
+            }
+        } else if (propNotificationType.equals("dmaap")) {
+
+            // Setting up the Publisher for DMaaP MR
+            String dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
+            String topic = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_TOPIC);
+            String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+            String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+
+            try {
+                if (dmaapServers == null || topic == null) {
+                    LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE
+                            + "DMaaP properties are missing from the property file ");
+                    throw new PolicyEngineException(XACMLErrorConstants.ERROR_DATA_ISSUE
+                            + "DMaaP properties are missing from the property file ");
+                }
+
+                dmaapServers = dmaapServers.trim();
+                topic = topic.trim();
+                aafLogin = aafLogin.trim();
+                aafPassword = aafPassword.trim();
+
+                List<String> dmaapList = null;
+                if (dmaapServers.contains(",")) {
+                    dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
+                } else {
+                    dmaapList = new ArrayList<>();
+                    dmaapList.add(dmaapServers);
+                }
+
+                BusPublisher publisher =
+                        new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
+
+                // Sending notification through DMaaP Message Router
+                publisher.send("MyPartitionKey", notification);
+                LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
+                publisher.close();
+
+            } catch (Exception e) {
+                LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update"
+                        + e.getMessage() + e);
+            }
+        }
+
+        for (Session session : queue) {
+            try {
+                LOGGER.info("\n Sending Notification: " + notification + " for client session id: " + session.getId()
+                        + "\n " + "PDPUrl is " + pdpUrl);
+                LOGGER.info("NotificationServer: sending text message");
+                session.getBasicRemote().sendText(notification);
+            } catch (IOException e) {
+                LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "
+                        + e.getMessage() + e);
+            }
+        }
+
+        NotificationService.sendNotification(notification);
+    }
+
+    public static void setUpdate(String update) {
+        NotificationServer.update = update;
+    }
 
 }