91d81d0433ee9fcec58689de2d3f93fd0876240a
[policy/engine.git] / PolicyEngineAPI / src / main / java / org / openecomp / policy / std / AutoClientUEB.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * PolicyEngineAPI
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.std;
22
23 import java.net.MalformedURLException;
24 import java.net.URL;
25 import java.security.GeneralSecurityException;
26 import java.util.List;
27 import java.util.UUID;
28
29 import org.openecomp.policy.api.NotificationHandler;
30 import org.openecomp.policy.api.NotificationScheme;
31 import org.openecomp.policy.api.NotificationType;
32 import org.openecomp.policy.api.PDPNotification;
33 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
34 import org.openecomp.policy.common.logging.flexlogger.Logger;
35 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
36
37 import com.att.nsa.cambria.client.CambriaClientBuilders;
38 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
39 import com.att.nsa.cambria.client.CambriaConsumer;
40 /**
41  * Create a UEB Consumer to receive policy update notification.
42  * 
43  * 
44  *
45  */
46 public class AutoClientUEB implements Runnable  {
47         private static StdPDPNotification notification = null;
48         private static NotificationScheme scheme = null;
49         private static NotificationHandler handler = null;
50         private static String topic = null;
51         private static String url = null;
52         private static boolean status = false; 
53         private static Logger logger = FlexLogger.getLogger(AutoClientUEB.class.getName());
54         private static String notficatioinType = null;
55         private static CambriaConsumer CConsumer = null;
56         private static String apiKey = null;
57         private static String apiSecret = null;
58         private static List<String> uebURLList = null; 
59         public volatile boolean isRunning = false;
60     
61
62         public AutoClientUEB(String url, List<String> uebURLList, String apiKey, String apiSecret) {
63                AutoClientUEB.url = url;
64                AutoClientUEB.uebURLList = uebURLList;
65                AutoClientUEB.apiKey = apiKey;
66                AutoClientUEB.apiKey = apiKey;
67         }
68
69         public void setAuto(NotificationScheme scheme,
70                         NotificationHandler handler) {
71                 AutoClientUEB.scheme = scheme;
72                 AutoClientUEB.handler = handler;
73         }
74
75         public static void setScheme(NotificationScheme scheme) {
76                 AutoClientUEB.scheme = scheme;
77         }
78         
79         public static boolean getStatus(){
80                 return AutoClientUEB.status;
81         }
82
83         public static String getURL() {
84                 return AutoClientUEB.url;
85         }
86         
87         public static String getNotficationType(){
88                 return AutoClientUEB.notficatioinType;
89         }
90
91         public synchronized boolean isRunning() {
92                 return this.isRunning;
93         }
94         
95         public synchronized void terminate() {
96                 this.isRunning = false;
97         }
98         
99         @Override
100         public void run() {
101                 synchronized(this) {
102                         this.isRunning = true;
103                 }
104                 String group =  UUID.randomUUID ().toString ();
105                 String id = "0";
106                 //String topic = null;
107                 // Stop and Start needs to be done.
108                 if (scheme != null && handler!=null) {
109                         if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS) || scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) {
110                                 URL aURL;
111                                 try {
112                                         aURL = new URL(AutoClientUEB.topic);
113                                         topic = aURL.getHost() + aURL.getPort();
114                                 } catch (MalformedURLException e) {
115                                         topic = AutoClientUEB.url.replace("[:/]", "");
116                                 }
117                                         
118                                 try {
119                                         //CConsumer = CambriaClientFactory.createConsumer ( null, uebURLList, topic, group, id, 15*1000, 1000 );
120                                         ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
121                                         builder.knownAs(group, id)
122                                         .usingHosts(uebURLList)
123                                         .onTopic(topic)
124                                         .waitAtServer(15*1000)
125                                         .receivingAtMost(1000)
126                                         .authenticatedBy(apiKey, apiSecret);
127                                         
128                                          CConsumer = builder.build();
129                                         
130                                 } catch (MalformedURLException e1) {
131                                         e1.printStackTrace();
132                                 } catch (GeneralSecurityException e1) {
133                                         e1.printStackTrace();
134                                 }
135                                 while (this.isRunning() )
136                                 {
137                                         try {
138                                                 for ( String msg : CConsumer.fetch () )
139                                                 {               
140                                                         logger.debug("Auto Notification Recieved Message " + msg + " from UEB cluster : " + uebURLList.toString());
141                                                         notification = NotificationUnMarshal.notificationJSON(msg);
142                                                         callHandler();
143                                                 }
144                                         } catch (Exception e) {
145                                                 logger.debug(XACMLErrorConstants.ERROR_SYSTEM_ERROR + "Error in processing UEB message" + e.getMessage());
146                                         }
147
148                                 }
149                                 logger.debug("Stopping UEB Consumer loop will not logger fetch messages from the cluster");
150                         }
151                 }
152         }
153
154         private static void callHandler() {
155                 if (handler != null && scheme != null) {
156                         if (scheme.equals(NotificationScheme.AUTO_ALL_NOTIFICATIONS)) {
157                                 boolean removed = false, updated = false;
158                                 if (notification.getRemovedPolicies() != null && !notification.getRemovedPolicies().isEmpty()) {
159                                         removed = true;
160                                 }
161                                 if (notification.getLoadedPolicies() != null && !notification.getLoadedPolicies().isEmpty()) {
162                                         updated = true;
163                                 }
164                                 if (removed && updated) {
165                                         notification.setNotificationType(NotificationType.BOTH);
166                                 } else if (removed) {
167                                         notification.setNotificationType(NotificationType.REMOVE);
168                                 } else if (updated) {
169                                         notification.setNotificationType(NotificationType.UPDATE);
170                                 }
171                                 handler.notificationReceived(notification);
172                         } else if (scheme.equals(NotificationScheme.AUTO_NOTIFICATIONS)) {
173                                 PDPNotification newNotification = MatchStore.checkMatch(notification);
174                                 if (newNotification.getNotificationType() != null) {
175                                         handler.notificationReceived(newNotification);
176                                 }
177                         }
178                 }
179         }
180
181 }