Policy 1707 commit to LF
[policy/engine.git] / ECOMP-PDP-REST / src / main / java / org / openecomp / policy / pdp / rest / notifications / NotificationServer.java
index d6cda74..fe295eb 100644 (file)
@@ -24,8 +24,12 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.websocket.OnClose;
 import javax.websocket.OnError;
@@ -34,28 +38,31 @@ import javax.websocket.OnOpen;
 import javax.websocket.Session;
 import javax.websocket.server.ServerEndpoint;
 
-import org.openecomp.policy.rest.XACMLRestProperties;
 import org.openecomp.policy.common.logging.eelf.MessageCodes;
 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
-import com.att.nsa.cambria.client.CambriaClientFactory;
-import com.att.nsa.cambria.client.CambriaPublisher;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.rest.XACMLRestProperties;
+import org.openecomp.policy.utils.BusPublisher;
 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
-import com.att.research.xacml.util.XACMLProperties;
 
-import org.openecomp.policy.common.logging.flexlogger.*;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.research.xacml.util.XACMLProperties;
 
 
 /**
  * The NotificationServer sends the Server Notifications to the Clients once there is any Event.
  * WebSockets is being used as a medium for sending Notifications.
- * UEB is being used as a medium for sending Notifications. 
+ * UEB is being used as a medium for sending Notifications.
+ * DMAAP is being used as a medium for sending Notifications. 
  * 
- * @version 0.1
+ * @version 0.2
  *
  **/
 @ServerEndpoint(value = "/notifications")
 public class NotificationServer {
-       private static final Logger logger      = FlexLogger.getLogger(NotificationServer.class);
+       private static final Logger LOGGER      = FlexLogger.getLogger(NotificationServer.class);
        private static Queue<Session> queue = new ConcurrentLinkedQueue<Session>();
        private static String update = null;
        private static  String hosts = null;
@@ -63,7 +70,7 @@ public class NotificationServer {
        
        @OnOpen
        public void openConnection(Session session) {
-               logger.info("Session Connected: " + session.getId());
+               LOGGER.info("Session Connected: " + session.getId());
                queue.add(session);
        }
        
@@ -75,7 +82,7 @@ public class NotificationServer {
        @OnError
        public void error(Session session, Throwable t) {
                queue.remove(session);
-               logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
+               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
                
        }
        
@@ -87,16 +94,17 @@ public class NotificationServer {
                                session.getBasicRemote().sendText(update);
                                session.close();
                        } catch (IOException e) {
-                               logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
-                               logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
+                               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
                        }       
                }
        }
 
-       public static void sendNotification(String notification, String propNotificationType, String pdpURL){
+       public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception {
 
-               logger.debug("Notification set to " + propNotificationType);
+               LOGGER.debug("Notification set to " + propNotificationType);
                if (propNotificationType.equals("ueb")){
+                       
                        String topic = null;
                        try {
                                aURL = new URL(pdpURL);
@@ -104,33 +112,111 @@ public class NotificationServer {
                        } catch (MalformedURLException e1) {
                                pdpURL = pdpURL.replace("/", "");
                                topic = pdpURL.replace(":", "");
-                               logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
                                PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
                        }
-                       hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
-                       logger.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
-                       CambriaPublisher pub = null;
+                       hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+                       String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY);
+                       String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET);
+                       
+                       LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
+                       CambriaBatchingPublisher pub = null;
                        try {
-                               pub = CambriaClientFactory.createSimplePublisher (null, hosts, topic );
+                               if(hosts==null || topic==null || apiKey==null || apiSecret==null){
+                                       LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+                                       throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+                               }
+                               
+                               hosts.trim();
+                               topic.trim();
+                               apiKey.trim();
+                               apiSecret.trim();
+                               pub = new CambriaClientBuilders.PublisherBuilder ()
+                                               .usingHosts ( hosts )
+                                               .onTopic ( topic )
+                                               .authenticatedBy ( apiKey, apiSecret )
+                                               .build ()
+                                       ;
+                               
                        } catch (MalformedURLException e1) {
-                               // TODO Auto-generated catch block
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
                                e1.printStackTrace();
                        } catch (GeneralSecurityException e1) {
-                               // TODO Auto-generated catch block
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
                                e1.printStackTrace();
                        }
+
                        try {
                                pub.send( "MyPartitionKey", notification );
                        } catch (IOException e) {
-                               logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update");
-                       }       
-                       pub.close();    
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
+                       }
+                       
+                       // close the publisher. The batching publisher does not send events
+                       // immediately, so you MUST use close to send any remaining messages.
+                       // You provide the amount of time you're willing to wait for the sends
+                       // to succeed before giving up. If any messages are unsent after that time,
+                       // they're returned to your app. You could, for example, persist to disk
+                       // and try again later.
+                       final List stuck = pub.close ( 20, TimeUnit.SECONDS );
+                       
+                       if ( stuck.size () > 0 )
+                       {
+                               System.err.println ( stuck.size() + " messages unsent" );
+                       }
+                       else
+                       {
+                               System.out.println ( "Clean exit; all messages sent: " + notification );
+                       }
+                       
+               } else if (propNotificationType.equals("dmaap")) {
+                       
+                       // Setting up the Publisher for DMaaP MR
+                       String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+                       String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
+                       String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+                       String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+                       
+                       try {
+                               if(dmaapServers==null || topic==null){
+                                       LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+                                       throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+                               }
+                               
+                               dmaapServers.trim();
+                               topic.trim();
+                               aafLogin.trim();
+                               aafPassword.trim();
+                               
+                               List<String> dmaapList = null;
+                               if(dmaapServers.contains(",")) {
+                                       dmaapList = new ArrayList<String>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
+                               } else {
+                                       dmaapList = new ArrayList<String>();
+                                       dmaapList.add(dmaapServers);
+                               }
+                               
+                               BusPublisher publisher = 
+                                               new BusPublisher.DmaapPublisherWrapper(dmaapList, 
+                                                                                              topic, 
+                                                                                              aafLogin, 
+                                                                                              aafPassword);
+                               
+                               // Sending notification through DMaaP Message Router
+                               publisher.send( "MyPartitionKey", notification);
+                               LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
+                               publisher.close();
+                               
+                       } catch (Exception e) {
+                               LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
+                       }
                }
+               
                for(Session session: queue) {
                        try {
                                session.getBasicRemote().sendText(notification);
                        } catch (IOException e) {
-                               logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+                               LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
                        }
                }
        }