X-Git-Url: https://gerrit.onap.org/r/gitweb?p=policy%2Fengine.git;a=blobdiff_plain;f=PolicyEngineAPI%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fstd%2FAutoClientDMAAP.java;h=d031868f0e4fd4e9caa9291d2fee0a7b4b339d45;hp=3699bfcfb74c6e9fef3d03d1497856b158b93f83;hb=80f072f60509ef3a35369a60857fe05f6c2a993a;hpb=c53fa990ea27ec074859eb94bcb7ec6deaa2157b diff --git a/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientDMAAP.java b/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientDMAAP.java index 3699bfcfb..d031868f0 100644 --- a/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientDMAAP.java +++ b/PolicyEngineAPI/src/main/java/org/onap/policy/std/AutoClientDMAAP.java @@ -10,123 +10,131 @@ import org.onap.policy.api.PDPNotification; import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; import org.onap.policy.utils.BusConsumer; +import org.onap.policy.utils.BusConsumer.DmaapConsumerWrapper; import org.onap.policy.xacml.api.XACMLErrorConstants; public class AutoClientDMAAP implements Runnable { - private static StdPDPNotification notification = null; - private static NotificationScheme scheme = null; - private static NotificationHandler handler = null; - private static String topic = null; - private static boolean status = false; - private static Logger logger = FlexLogger.getLogger(AutoClientDMAAP.class.getName()); - private static String notficatioinType = null; - private static BusConsumer dmaapConsumer = null; - private static List dmaapList = null; - private static String aafLogin = null; - private static String aafPassword = null; - public volatile boolean isRunning = false; - - - public AutoClientDMAAP(List dmaapList, String topic, String aafLogin, String aafPassword) { - AutoClientDMAAP.topic = topic; - AutoClientDMAAP.dmaapList = dmaapList; - AutoClientDMAAP.aafLogin = aafLogin; - AutoClientDMAAP.aafPassword = aafPassword; - } - - public void setAuto(NotificationScheme scheme, - NotificationHandler handler) { - AutoClientDMAAP.scheme = scheme; - AutoClientDMAAP.handler = handler; - } - - public static void setScheme(NotificationScheme scheme) { - AutoClientDMAAP.scheme = scheme; - } - - public static boolean getStatus(){ - return AutoClientDMAAP.status; - } - - public static String getTopic() { - return AutoClientDMAAP.topic; - } - - public static String getNotficationType(){ - return AutoClientDMAAP.notficatioinType; - } - - public synchronized boolean isRunning() { - return this.isRunning; - } - - public synchronized void terminate() { - this.isRunning = false; - } - - @Override - public void run() { - synchronized(this) { - this.isRunning = true; - } - String group = UUID.randomUUID ().toString (); - String id = "0"; - - // Stop and Start needs to be done. - if (scheme != null && handler!=null) { - if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS) || scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) { - - // create a loop to listen for messages from DMaaP server - try { - dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, topic, aafLogin, aafPassword, group, id, 15*1000, 1000 ); - } catch (Exception e) { - logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e); - } - - while (this.isRunning() ) - { - try { - for ( String msg : dmaapConsumer.fetch () ) - { - logger.debug("Auto Notification Recieved Message " + msg + " from DMAAP server : " + dmaapList.toString()); - notification = NotificationUnMarshal.notificationJSON(msg); - callHandler(); - } - } catch (Exception e) { - logger.debug("Error in processing DMAAP message"); - } - - } - logger.debug("Stopping DMAAP Consumer loop will no longer fetch messages from the servers"); - } - } - } - - private static void callHandler() { - if (handler != null && scheme != null) { - if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS)) { - boolean removed = false, updated = false; - if (notification.getRemovedPolicies() != null && !notification.getRemovedPolicies().isEmpty()) { - removed = true; - } - if (notification.getLoadedPolicies() != null && !notification.getLoadedPolicies().isEmpty()) { - updated = true; - } - if (removed && updated) { - notification.setNotificationType(NotificationType.BOTH); - } else if (removed) { - notification.setNotificationType(NotificationType.REMOVE); - } else if (updated) { - notification.setNotificationType(NotificationType.UPDATE); - } - handler.notificationReceived(notification); - } else if (scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) { - PDPNotification newNotification = MatchStore.checkMatch(notification); - if (newNotification.getNotificationType() != null) { - handler.notificationReceived(newNotification); - } - } - } - } + private static StdPDPNotification notification = null; + private static NotificationScheme scheme = null; + private static NotificationHandler handler = null; + private static String topic = null; + private static boolean status = false; + private static Logger logger = FlexLogger.getLogger(AutoClientDMAAP.class.getName()); + private static String notficatioinType = null; + private static BusConsumer dmaapConsumer = null; + private static List dmaapList = null; + private static String aafLogin = null; + private static String aafPassword = null; + private volatile boolean running = false; + + public AutoClientDMAAP(List dmaapList, String topic, String aafLogin, String aafPassword) { + AutoClientDMAAP.topic = topic; + AutoClientDMAAP.dmaapList = dmaapList; + AutoClientDMAAP.aafLogin = aafLogin; + AutoClientDMAAP.aafPassword = aafPassword; + } + + public static void setAuto(NotificationScheme scheme, NotificationHandler handler) { + AutoClientDMAAP.scheme = scheme; + AutoClientDMAAP.handler = handler; + } + + public static void setScheme(NotificationScheme scheme) { + AutoClientDMAAP.scheme = scheme; + } + + public static boolean getStatus() { + return AutoClientDMAAP.status; + } + + public static String getTopic() { + return AutoClientDMAAP.topic; + } + + public static String getNotficationType() { + return AutoClientDMAAP.notficatioinType; + } + + public synchronized boolean isRunning() { + return this.running; + } + + public synchronized void terminate() { + this.running = false; + } + + @Override + public void run() { + synchronized (this) { + this.running = true; + } + String group = UUID.randomUUID().toString(); + String id = "0"; + + // Stop and Start needs to be done. + if (scheme != null && handler != null) { + if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS) + || scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) { + + // create a loop to listen for messages from DMaaP server + try { + setDmaapCosumer(new BusConsumer.DmaapConsumerWrapper(dmaapList, topic, aafLogin, aafPassword, group, + id, 15 * 1000, 1000)); + } catch (Exception e) { + logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e); + } + + while (this.isRunning()) { + try { + for (String msg : dmaapConsumer.fetch()) { + logger.debug("Auto Notification Recieved Message " + msg + " from DMAAP server : " + + dmaapList.toString()); + setNotification(NotificationUnMarshal.notificationJSON(msg)); + callHandler(); + } + } catch (Exception e) { + logger.debug("Error in processing DMAAP message", e); + } + + } + logger.debug("Stopping DMAAP Consumer loop will no longer fetch messages from the servers"); + } + } + } + + private void setNotification(StdPDPNotification notificationJSON) { + notification = notificationJSON; + } + + private static void setDmaapCosumer(DmaapConsumerWrapper dmaapConsumerWrapper) { + dmaapConsumer = dmaapConsumerWrapper; + } + + private static void callHandler() { + if (handler != null && scheme != null) { + if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS)) { + boolean removed = false, updated = false; + if (notification.getRemovedPolicies() != null && !notification.getRemovedPolicies().isEmpty()) { + removed = true; + } + if (notification.getLoadedPolicies() != null && !notification.getLoadedPolicies().isEmpty()) { + updated = true; + } + if (removed && updated) { + notification.setNotificationType(NotificationType.BOTH); + } else if (removed) { + notification.setNotificationType(NotificationType.REMOVE); + } else if (updated) { + notification.setNotificationType(NotificationType.UPDATE); + } + handler.notificationReceived(notification); + } else if (scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) { + PDPNotification newNotification = MatchStore.checkMatch(notification); + if (newNotification.getNotificationType() != null) { + handler.notificationReceived(newNotification); + } + } + } + } }