import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
-import org.openecomp.policy.rest.XACMLRestProperties;
import org.openecomp.policy.common.logging.eelf.MessageCodes;
import org.openecomp.policy.common.logging.eelf.PolicyLogger;
-import com.att.nsa.cambria.client.CambriaClientFactory;
-import com.att.nsa.cambria.client.CambriaPublisher;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.rest.XACMLRestProperties;
+import org.openecomp.policy.utils.BusPublisher;
import org.openecomp.policy.xacml.api.XACMLErrorConstants;
-import com.att.research.xacml.util.XACMLProperties;
-import org.openecomp.policy.common.logging.flexlogger.*;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.research.xacml.util.XACMLProperties;
/**
* The NotificationServer sends the Server Notifications to the Clients once there is any Event.
* WebSockets is being used as a medium for sending Notifications.
- * UEB is being used as a medium for sending Notifications.
+ * UEB is being used as a medium for sending Notifications.
+ * DMAAP is being used as a medium for sending Notifications.
*
- * @version 0.1
+ * @version 0.2
*
**/
@ServerEndpoint(value = "/notifications")
public class NotificationServer {
- private static final Logger logger = FlexLogger.getLogger(NotificationServer.class);
+ private static final Logger LOGGER = FlexLogger.getLogger(NotificationServer.class);
private static Queue<Session> queue = new ConcurrentLinkedQueue<Session>();
private static String update = null;
private static String hosts = null;
@OnOpen
public void openConnection(Session session) {
- logger.info("Session Connected: " + session.getId());
+ LOGGER.info("Session Connected: " + session.getId());
queue.add(session);
}
@OnError
public void error(Session session, Throwable t) {
queue.remove(session);
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Session Error for : " + session.getId() + " Error: " + t.getMessage());
}
session.getBasicRemote().sendText(update);
session.close();
} catch (IOException e) {
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending Message update");
}
}
}
- public static void sendNotification(String notification, String propNotificationType, String pdpURL){
+ public static void sendNotification(String notification, String propNotificationType, String pdpURL) throws Exception {
- logger.debug("Notification set to " + propNotificationType);
+ LOGGER.debug("Notification set to " + propNotificationType);
if (propNotificationType.equals("ueb")){
+
String topic = null;
try {
aURL = new URL(pdpURL);
} catch (MalformedURLException e1) {
pdpURL = pdpURL.replace("/", "");
topic = pdpURL.replace(":", "");
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in parsing out pdpURL for UEB notfication ");
PolicyLogger.error(MessageCodes.ERROR_PROCESS_FLOW, e1, "Error in parsing out pdpURL for UEB notfication ");
}
- hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
- logger.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
- CambriaPublisher pub = null;
+ hosts = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+ String apiKey = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_KEY);
+ String apiSecret = XACMLProperties.getProperty(XACMLRestProperties.PROP_UEB_API_SECRET);
+
+ LOGGER.debug("Creating Publisher for host: " + hosts + " with topic: " + topic);
+ CambriaBatchingPublisher pub = null;
try {
- pub = CambriaClientFactory.createSimplePublisher (null, hosts, topic );
+ if(hosts==null || topic==null || apiKey==null || apiSecret==null){
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+ throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "UEB properties are missing from the property file ");
+ }
+
+ hosts.trim();
+ topic.trim();
+ apiKey.trim();
+ apiSecret.trim();
+ pub = new CambriaClientBuilders.PublisherBuilder ()
+ .usingHosts ( hosts )
+ .onTopic ( topic )
+ .authenticatedBy ( apiKey, apiSecret )
+ .build ()
+ ;
+
} catch (MalformedURLException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
} catch (GeneralSecurityException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error creating the UEB publisher" + e1.getMessage());
+ }
+ if(pub != null){
+ try {
+ pub.send( "MyPartitionKey", notification );
+ } catch (IOException e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage() + e);
+ }
+ // close the publisher. The batching publisher does not send events
+ // immediately, so you MUST use close to send any remaining messages.
+ // You provide the amount of time you're willing to wait for the sends
+ // to succeed before giving up. If any messages are unsent after that time,
+ // they're returned to your app. You could, for example, persist to disk
+ // and try again later.
+ final List<?> stuck = pub.close ( 20, TimeUnit.SECONDS );
+
+ if (!stuck.isEmpty()){
+ LOGGER.error( stuck.size() + " messages unsent" );
+ }else{
+ LOGGER.info( "Clean exit; all messages sent: " + notification );
+ }
}
+ } else if (propNotificationType.equals("dmaap")) {
+
+ // Setting up the Publisher for DMaaP MR
+ String dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
+ String topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
+ String aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
+ String aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
+
try {
- pub.send( "MyPartitionKey", notification );
- } catch (IOException e) {
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update");
- }
- pub.close();
+ if(dmaapServers==null || topic==null){
+ LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
+ }
+
+ dmaapServers.trim();
+ topic.trim();
+ aafLogin.trim();
+ aafPassword.trim();
+
+ List<String> dmaapList = null;
+ if(dmaapServers.contains(",")) {
+ dmaapList = new ArrayList<String>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
+ } else {
+ dmaapList = new ArrayList<>();
+ dmaapList.add(dmaapServers);
+ }
+
+ BusPublisher publisher =
+ new BusPublisher.DmaapPublisherWrapper(dmaapList,
+ topic,
+ aafLogin,
+ aafPassword);
+
+ // Sending notification through DMaaP Message Router
+ publisher.send( "MyPartitionKey", notification);
+ LOGGER.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
+ publisher.close();
+
+ } catch (Exception e) {
+ LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error sending notification update" + e.getMessage());
+ }
}
+
for(Session session: queue) {
try {
session.getBasicRemote().sendText(notification);
} catch (IOException e) {
- logger.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
+ LOGGER.info(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error in sending the Event Notification: "+ e.getMessage());
}
}
}