[POLICY-117] Resolve the Policy Critical issues
[policy/engine.git] / ECOMP-PDP-REST / src / main / java / org / openecomp / policy / pdp / rest / notifications / NotificationServer.java
index d6cda74..9bd2e1f 100644 (file)
@@ -24,8 +24,12 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.websocket.OnClose;
 import javax.websocket.OnError;
@@ -34,36 +38,40 @@ import javax.websocket.OnOpen;
 import javax.websocket.Session;
 import javax.websocket.server.ServerEndpoint;
 
-import org.openecomp.policy.rest.XACMLRestProperties;
 import org.openecomp.policy.common.logging.eelf.MessageCodes;
 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
-import com.att.nsa.cambria.client.CambriaClientFactory;
-import com.att.nsa.cambria.client.CambriaPublisher;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.pdp.rest.api.services.NotificationService;
+import org.openecomp.policy.rest.XACMLRestProperties;
+import org.openecomp.policy.utils.BusPublisher;
 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
-import com.att.research.xacml.util.XACMLProperties;
 
-import org.openecomp.policy.common.logging.flexlogger.*;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.research.xacml.util.XACMLProperties;
 
 
 /**
  * The NotificationServer sends the Server Notifications to the Clients once there is any Event.
  * WebSockets is being used as a medium for sending Notifications.
- * UEB is being used as a medium for sending Notifications. 
+ * UEB is being used as a medium for sending Notifications.
+ * DMAAP is being used as a medium for sending Notifications. 
  * 
- * @version 0.1
+ * @version 0.2
  *
  **/
 @ServerEndpoint(value = "/notifications")
 public class NotificationServer {
-       private static final Logger logger      = FlexLogger.getLogger(NotificationServer.class);
-       private static Queue<Session> queue = new ConcurrentLinkedQueue<Session>();
+       private static final Logger LOGGER      = FlexLogger.getLogger(NotificationServer.class);
+       private static Queue<Session> queue = new ConcurrentLinkedQueue<>();
        private static String update = null;
        private static  String hosts = null;
        private static URL aURL = null;
        
        @OnOpen
        public void openConnection(Session session) {
-               logger.info("Session Connected: " + session.getId());
+               LOGGER.info("Session Connected: " + session.getId());
                queue.add(session);
        }
        
@@ -75,7 +83,7 @@ public class NotificationServer {
        @OnError
        public void error(Session session, Throwable t) {
                queue.remove(session);
-               logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
+               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
                
        }
        
@@ -87,16 +95,17 @@ public class NotificationServer {
                                session.getBasicRemote().sendText(update);
                                session.close();
                        } catch (IOException e) {
-                               logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
-                               logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
+                               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
                        }       
                }
        }
 
-       public static void sendNotification(String notification, String propNotificationType, String pdpURL){
+       public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception {
 
-               logger.debug("Notification set to " + propNotificationType);
+               LOGGER.debug("Notification set to " + propNotificationType);
                if (propNotificationType.equals("ueb")){
+
                        String topic = null;
                        try {
                                aURL = new URL(pdpURL);
@@ -104,38 +113,112 @@ public class NotificationServer {
                        } catch (MalformedURLException e1) {
                                pdpURL = pdpURL.replace("/", "");
                                topic = pdpURL.replace(":", "");
-                               logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
                                PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
                        }
-                       hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
-                       logger.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
-                       CambriaPublisher pub = null;
+                       hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+                       String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY);
+                       String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET);
+
+                       LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
+                       CambriaBatchingPublisher pub = null;
                        try {
-                               pub = CambriaClientFactory.createSimplePublisher (null, hosts, topic );
+                               if(hosts==null || topic==null || apiKey==null || apiSecret==null){
+                                       LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+                                       throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+                               }
+
+                               hosts.trim();
+                               topic.trim();
+                               apiKey.trim();
+                               apiSecret.trim();
+                               pub = new CambriaClientBuilders.PublisherBuilder ()
+                                               .usingHosts ( hosts )
+                                               .onTopic ( topic )
+                                               .authenticatedBy ( apiKey, apiSecret )
+                                               .build ()
+                                               ;
+
                        } catch (MalformedURLException e1) {
-                               // TODO Auto-generated catch block
-                               e1.printStackTrace();
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
                        } catch (GeneralSecurityException e1) {
-                               // TODO Auto-generated catch block
-                               e1.printStackTrace();
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage() +e1);
                        }
+                       if(pub != null){
+                               try {
+                                       pub.send( "MyPartitionKey", notification );
+                               } catch (IOException e) {
+                                       LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
+                               }
+                               // close the publisher. The batching publisher does not send events
+                               // immediately, so you MUST use close to send any remaining messages.
+                               // You provide the amount of time you're willing to wait for the sends
+                               // to succeed before giving up. If any messages are unsent after that time,
+                               // they're returned to your app. You could, for example, persist to disk
+                               // and try again later.
+                               final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS );
+
+                               if (!stuck.isEmpty()){
+                                       LOGGER.error( stuck.size() + " messages unsent" );
+                               }else{
+                                       LOGGER.info( "Clean exit; all messages sent: " + notification );
+                               }
+                       }
+               } else if (propNotificationType.equals("dmaap")) {
+                       
+                       // Setting up the Publisher for DMaaP MR
+                       String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+                       String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
+                       String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+                       String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+                       
                        try {
-                               pub.send( "MyPartitionKey", notification );
-                       } catch (IOException e) {
-                               logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update");
-                       }       
-                       pub.close();    
+                               if(dmaapServers==null || topic==null){
+                                       LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+                                       throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+                               }
+                               
+                               dmaapServers.trim();
+                               topic.trim();
+                               aafLogin.trim();
+                               aafPassword.trim();
+                               
+                               List<String> dmaapList = null;
+                               if(dmaapServers.contains(",")) {
+                                       dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
+                               } else {
+                                       dmaapList = new ArrayList<>();
+                                       dmaapList.add(dmaapServers);
+                               }
+                               
+                               BusPublisher publisher = 
+                                               new BusPublisher.DmaapPublisherWrapper(dmaapList, 
+                                                                                              topic, 
+                                                                                              aafLogin, 
+                                                                                              aafPassword);
+                               
+                               // Sending notification through DMaaP Message Router
+                               publisher.send( "MyPartitionKey", notification);
+                               LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
+                               publisher.close();
+                               
+                       } catch (Exception e) {
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
+                       }
                }
+               
                for(Session session: queue) {
                        try {
                                session.getBasicRemote().sendText(notification);
                        } catch (IOException e) {
-                               logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+                               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage() + e);
                        }
                }
+               NotificationService.sendNotification(notification);
        }
-       
+
        public static void setUpdate(String update) {
                NotificationServer.update = update;
        }
+       
 }