package org.onap.policy.std; import java.util.List; import java.util.UUID; import org.onap.policy.api.NotificationHandler; import org.onap.policy.api.NotificationScheme; import org.onap.policy.api.NotificationType; import org.onap.policy.api.PDPNotification; import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; import org.onap.policy.utils.BusConsumer; import org.onap.policy.utils.BusConsumer.DmaapConsumerWrapper; import org.onap.policy.xacml.api.XACMLErrorConstants; public class AutoClientDMAAP implements Runnable { private static StdPDPNotification notification = null; private static NotificationScheme scheme = null; private static NotificationHandler handler = null; private static String topic = null; private static boolean status = false; private static Logger logger = FlexLogger.getLogger(AutoClientDMAAP.class.getName()); private static String notficatioinType = null; private static BusConsumer dmaapConsumer = null; private static List dmaapList = null; private static String aafLogin = null; private static String aafPassword = null; private volatile boolean running = false; public AutoClientDMAAP(List dmaapList, String topic, String aafLogin, String aafPassword) { AutoClientDMAAP.topic = topic; AutoClientDMAAP.dmaapList = dmaapList; AutoClientDMAAP.aafLogin = aafLogin; AutoClientDMAAP.aafPassword = aafPassword; } public static void setAuto(NotificationScheme scheme, NotificationHandler handler) { AutoClientDMAAP.scheme = scheme; AutoClientDMAAP.handler = handler; } public static void setScheme(NotificationScheme scheme) { AutoClientDMAAP.scheme = scheme; } public static boolean getStatus() { return AutoClientDMAAP.status; } public static String getTopic() { return AutoClientDMAAP.topic; } public static String getNotficationType() { return AutoClientDMAAP.notficatioinType; } public synchronized boolean isRunning() { return this.running; } public synchronized void terminate() { this.running = false; } @Override public void run() { synchronized (this) { this.running = true; } String group = UUID.randomUUID().toString(); String id = "0"; // Stop and Start needs to be done. if (scheme != null && handler != null) { if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS) || scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) { // create a loop to listen for messages from DMaaP server try { setDmaapCosumer(new BusConsumer.DmaapConsumerWrapper(dmaapList, topic, aafLogin, aafPassword, group, id, 15 * 1000, 1000)); } catch (Exception e) { logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e); } while (this.isRunning()) { try { for (String msg : dmaapConsumer.fetch()) { logger.debug("Auto Notification Recieved Message " + msg + " from DMAAP server : " + dmaapList.toString()); setNotification(NotificationUnMarshal.notificationJSON(msg)); callHandler(); } } catch (Exception e) { logger.debug("Error in processing DMAAP message", e); } } logger.debug("Stopping DMAAP Consumer loop will no longer fetch messages from the servers"); } } } private void setNotification(StdPDPNotification notificationJSON) { notification = notificationJSON; } private static void setDmaapCosumer(DmaapConsumerWrapper dmaapConsumerWrapper) { dmaapConsumer = dmaapConsumerWrapper; } private static void callHandler() { if (handler != null && scheme != null) { if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS)) { boolean removed = false, updated = false; if (notification.getRemovedPolicies() != null && !notification.getRemovedPolicies().isEmpty()) { removed = true; } if (notification.getLoadedPolicies() != null && !notification.getLoadedPolicies().isEmpty()) { updated = true; } if (removed && updated) { notification.setNotificationType(NotificationType.BOTH); } else if (removed) { notification.setNotificationType(NotificationType.REMOVE); } else if (updated) { notification.setNotificationType(NotificationType.UPDATE); } handler.notificationReceived(notification); } else if (scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) { PDPNotification newNotification = MatchStore.checkMatch(notification); if (newNotification.getNotificationType() != null) { handler.notificationReceived(newNotification); } } } } }