1 package org.onap.policy.std;
6 import org.onap.policy.api.NotificationHandler;
7 import org.onap.policy.api.NotificationScheme;
8 import org.onap.policy.api.NotificationType;
9 import org.onap.policy.api.PDPNotification;
10 import org.onap.policy.common.logging.flexlogger.FlexLogger;
11 import org.onap.policy.common.logging.flexlogger.Logger;
12 import org.onap.policy.utils.BusConsumer;
13 import org.onap.policy.utils.BusConsumer.DmaapConsumerWrapper;
14 import org.onap.policy.xacml.api.XACMLErrorConstants;
16 public class AutoClientDMAAP implements Runnable {
17 private static StdPDPNotification notification = null;
18 private static NotificationScheme scheme = null;
19 private static NotificationHandler handler = null;
20 private static String topic = null;
21 private static boolean status = false;
22 private static Logger logger = FlexLogger.getLogger(AutoClientDMAAP.class.getName());
23 private static String notficatioinType = null;
24 private static BusConsumer dmaapConsumer = null;
25 private static List<String> dmaapList = null;
26 private static String aafLogin = null;
27 private static String aafPassword = null;
28 private volatile boolean running = false;
30 public AutoClientDMAAP(List<String> dmaapList, String topic, String aafLogin, String aafPassword) {
31 AutoClientDMAAP.topic = topic;
32 AutoClientDMAAP.dmaapList = dmaapList;
33 AutoClientDMAAP.aafLogin = aafLogin;
34 AutoClientDMAAP.aafPassword = aafPassword;
37 public static void setAuto(NotificationScheme scheme, NotificationHandler handler) {
38 AutoClientDMAAP.scheme = scheme;
39 AutoClientDMAAP.handler = handler;
42 public static void setScheme(NotificationScheme scheme) {
43 AutoClientDMAAP.scheme = scheme;
46 public static boolean getStatus() {
47 return AutoClientDMAAP.status;
50 public static String getTopic() {
51 return AutoClientDMAAP.topic;
54 public static String getNotficationType() {
55 return AutoClientDMAAP.notficatioinType;
58 public synchronized boolean isRunning() {
62 public synchronized void terminate() {
71 String group = UUID.randomUUID().toString();
74 // Stop and Start needs to be done.
75 if (scheme != null && handler != null) {
76 if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS)
77 || scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) {
79 // create a loop to listen for messages from DMaaP server
81 setDmaapCosumer(new BusConsumer.DmaapConsumerWrapper(dmaapList, topic, aafLogin, aafPassword, group,
82 id, 15 * 1000, 1000));
83 } catch (Exception e) {
84 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e);
87 while (this.isRunning()) {
89 for (String msg : dmaapConsumer.fetch()) {
90 logger.debug("Auto Notification Recieved Message " + msg + " from DMAAP server : "
91 + dmaapList.toString());
92 setNotification(NotificationUnMarshal.notificationJSON(msg));
95 } catch (Exception e) {
96 logger.debug("Error in processing DMAAP message", e);
100 logger.debug("Stopping DMAAP Consumer loop will no longer fetch messages from the servers");
105 private static void setNotification(StdPDPNotification notificationJSON) {
106 notification = notificationJSON;
109 private static void setDmaapCosumer(DmaapConsumerWrapper dmaapConsumerWrapper) {
110 dmaapConsumer = dmaapConsumerWrapper;
113 private static void callHandler() {
114 if (handler != null && scheme != null) {
115 if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS)) {
116 boolean removed = false, updated = false;
117 if (notification.getRemovedPolicies() != null && !notification.getRemovedPolicies().isEmpty()) {
120 if (notification.getLoadedPolicies() != null && !notification.getLoadedPolicies().isEmpty()) {
123 if (removed && updated) {
124 notification.setNotificationType(NotificationType.BOTH);
125 } else if (removed) {
126 notification.setNotificationType(NotificationType.REMOVE);
127 } else if (updated) {
128 notification.setNotificationType(NotificationType.UPDATE);
130 handler.notificationReceived(notification);
131 } else if (scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) {
132 PDPNotification newNotification = MatchStore.checkMatch(notification);
133 if (newNotification.getNotificationType() != null) {
134 handler.notificationReceived(newNotification);