1 package org.onap.policy.std;
6 import org.onap.policy.api.NotificationHandler;
7 import org.onap.policy.api.NotificationScheme;
8 import org.onap.policy.api.NotificationType;
9 import org.onap.policy.api.PDPNotification;
10 import org.onap.policy.common.logging.flexlogger.FlexLogger;
11 import org.onap.policy.common.logging.flexlogger.Logger;
12 import org.onap.policy.utils.BusConsumer;
13 import org.onap.policy.xacml.api.XACMLErrorConstants;
15 public class AutoClientDMAAP implements Runnable {
16 private static StdPDPNotification notification = null;
17 private static NotificationScheme scheme = null;
18 private static NotificationHandler handler = null;
19 private static String topic = null;
20 private static boolean status = false;
21 private static Logger logger = FlexLogger.getLogger(AutoClientDMAAP.class.getName());
22 private static String notficatioinType = null;
23 private static BusConsumer dmaapConsumer = null;
24 private static List<String> dmaapList = null;
25 private static String aafLogin = null;
26 private static String aafPassword = null;
27 public volatile boolean isRunning = false;
30 public AutoClientDMAAP(List<String> dmaapList, String topic, String aafLogin, String aafPassword) {
31 AutoClientDMAAP.topic = topic;
32 AutoClientDMAAP.dmaapList = dmaapList;
33 AutoClientDMAAP.aafLogin = aafLogin;
34 AutoClientDMAAP.aafPassword = aafPassword;
37 public void setAuto(NotificationScheme scheme,
38 NotificationHandler handler) {
39 AutoClientDMAAP.scheme = scheme;
40 AutoClientDMAAP.handler = handler;
43 public static void setScheme(NotificationScheme scheme) {
44 AutoClientDMAAP.scheme = scheme;
47 public static boolean getStatus(){
48 return AutoClientDMAAP.status;
51 public static String getTopic() {
52 return AutoClientDMAAP.topic;
55 public static String getNotficationType(){
56 return AutoClientDMAAP.notficatioinType;
59 public synchronized boolean isRunning() {
60 return this.isRunning;
63 public synchronized void terminate() {
64 this.isRunning = false;
70 this.isRunning = true;
72 String group = UUID.randomUUID ().toString ();
75 // Stop and Start needs to be done.
76 if (scheme != null && handler!=null) {
77 if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS) || scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) {
79 // create a loop to listen for messages from DMaaP server
81 dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, topic, aafLogin, aafPassword, group, id, 15*1000, 1000 );
82 } catch (Exception e) {
83 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e);
86 while (this.isRunning() )
89 for ( String msg : dmaapConsumer.fetch () )
91 logger.debug("Auto Notification Recieved Message " + msg + " from DMAAP server : " + dmaapList.toString());
92 notification = NotificationUnMarshal.notificationJSON(msg);
95 } catch (Exception e) {
96 logger.debug("Error in processing DMAAP message");
100 logger.debug("Stopping DMAAP Consumer loop will no longer fetch messages from the servers");
105 private static void callHandler() {
106 if (handler != null && scheme != null) {
107 if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS)) {
108 boolean removed = false, updated = false;
109 if (notification.getRemovedPolicies() != null && !notification.getRemovedPolicies().isEmpty()) {
112 if (notification.getLoadedPolicies() != null && !notification.getLoadedPolicies().isEmpty()) {
115 if (removed && updated) {
116 notification.setNotificationType(NotificationType.BOTH);
117 } else if (removed) {
118 notification.setNotificationType(NotificationType.REMOVE);
119 } else if (updated) {
120 notification.setNotificationType(NotificationType.UPDATE);
122 handler.notificationReceived(notification);
123 } else if (scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) {
124 PDPNotification newNotification = MatchStore.checkMatch(notification);
125 if (newNotification.getNotificationType() != null) {
126 handler.notificationReceived(newNotification);