Commit includes ControlLoopPolicy API and bugfixes
[policy/engine.git] / ECOMP-PDP-REST / src / main / java / org / openecomp / policy / pdp / rest / notifications / ManualNotificationUpdateThread.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ECOMP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.pdp.rest.notifications;
22
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.net.URL;
26 import java.security.GeneralSecurityException;
27 import java.util.ArrayList;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.UUID;
31
32 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
33 import org.openecomp.policy.common.logging.flexlogger.Logger;
34 import org.openecomp.policy.rest.XACMLRestProperties;
35 import org.openecomp.policy.utils.BusConsumer;
36 import org.openecomp.policy.utils.BusPublisher;
37 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
38
39 import com.att.nsa.cambria.client.CambriaClientFactory;
40 import com.att.nsa.cambria.client.CambriaConsumer;
41 import com.att.nsa.cambria.client.CambriaPublisher;
42 import com.att.research.xacml.util.XACMLProperties;
43
44 @SuppressWarnings("deprecation")
45 public class ManualNotificationUpdateThread implements Runnable {
46
47         private static final Logger LOGGER      = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
48
49         private static String topic = null;
50         private static CambriaConsumer CConsumer = null;
51         private static String clusterList = null;
52         private static String update = null;
53         private static BusConsumer dmaapConsumer = null;
54         private static List<String> dmaapList = null;
55         private static String propNotificationType = null;
56         private static String aafLogin = null;
57         private static String aafPassword = null;
58         
59         public volatile boolean isRunning = false;
60         
61         public synchronized boolean isRunning() {
62                 return this.isRunning;
63         }
64         
65         public synchronized void terminate() {
66                 this.isRunning = false;
67         }
68         
69         /**
70          * 
71          * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
72          * 
73          */
74         @Override
75         public void run() {
76                 synchronized(this) {
77                         this.isRunning = true;
78                 }
79                 
80                 URL aURL = null;
81                 String group =  UUID.randomUUID ().toString ();
82                 String id = "0";
83                 String returnTopic = null;
84                 propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
85                 if ("ueb".equals(propNotificationType)){
86                         try {
87                                 clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim();
88                                 String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
89                                 aURL = new URL(url);
90                                 topic = aURL.getHost() + aURL.getPort();
91                         } catch (NumberFormatException e) {
92                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
93                                 this.isRunning = false;
94                         } catch (MalformedURLException e) {
95                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
96                         }
97                         
98                         String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
99                         SendMessage(consumerTopic, "Starting-Topic");
100                         final LinkedList<String> urlList = new LinkedList<> ();
101                         for ( String u : clusterList.split ( "," ) ){
102                                 urlList.add ( u );
103                         }
104                         
105                         try {
106                                 CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
107                         } catch (MalformedURLException | GeneralSecurityException e1) {
108                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
109                         }
110
111                         while (this.isRunning()) {
112                                 LOGGER.debug("While loop test _ take out ");
113                                 try {
114                                         for ( String msg : CConsumer.fetch () ){                
115                                                 LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
116                                                 returnTopic = processMessage(msg);
117                                                 if(returnTopic != null){
118                                                         SendMessage(returnTopic, update);
119                                                 }
120                                         }
121                                 } catch (IOException e) {
122                                         LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
123                                 }
124                         }
125                         LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");      
126                 } else if ("dmaap".equals(propNotificationType)) {
127                         String dmaapServers = null;
128                         try {
129                                 dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
130                                 topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
131                                 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
132                                 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
133                         } catch (Exception e) {
134                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
135                                 this.isRunning = false;
136                         } 
137                         
138                         if(dmaapServers==null || topic==null){
139                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
140                                 try {
141                                         throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
142                                 } catch (Exception e) {
143                                         LOGGER.error(e);
144                                 }
145                         }
146                         
147                         dmaapServers.trim();
148                         topic.trim();
149                         aafLogin.trim();
150                         aafPassword.trim();
151                         
152                         String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
153                         SendMessage(consumerTopic, "Starting-Topic");
154                         dmaapList = new ArrayList<>();
155                         for ( String u : dmaapServers.split ( "," ) ){
156                                 dmaapList.add ( u );
157                         }
158                         
159                         try {
160                                 
161                                 dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000);
162                         } catch (Exception e1) {
163                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
164                         }
165
166                         while (this.isRunning()) {
167                                 LOGGER.debug("While loop test _ take out ");
168                                 try {
169                                         for ( String msg : dmaapConsumer.fetch () ){            
170                                                 LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
171                                                 returnTopic = processMessage(msg);
172                                                 if(returnTopic != null){
173                                                         SendMessage(returnTopic, update);
174                                                 }
175                                         }
176                                 }catch (Exception e) {
177                                         LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e);                          }
178                         }
179                         LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");    
180                 }
181         }
182
183         private void SendMessage( String topic, String message) {
184                 CambriaPublisher pub = null;
185                 BusPublisher publisher = null;
186                 try {
187                         if ("ueb".equals(propNotificationType)) {
188                                 pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
189                                 pub.send( "pdpReturnMessage", message );
190                                 LOGGER.debug("Sending Message to UEB topic: " + topic);
191                                 pub.close();
192                                 
193                         } else if ("dmaap".equals(propNotificationType)){
194                                 publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword);
195                                 publisher.send( "pdpReturnMessage", message );
196                                 LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
197                                 publisher.close();
198                         }
199
200                 } catch (Exception e) {
201                         LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
202                 }
203
204                 try {
205                         pub.send( "pdpReturnMessage", message );
206                         LOGGER.debug("Sending to Message to tpoic" + topic);
207                 } catch (IOException e) {
208                         LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
209                 }       
210                 pub.close();            
211         }
212
213         private String processMessage(String msg) {
214                 LOGGER.debug("notification message:  " + msg);
215                 String[] UID = msg.split("=")[1].split("\"");
216                 
217                 String returnTopic = topic + UID[0];
218                 if(msg.contains("Starting-Topic")){
219                         return null;
220                 }
221                 return returnTopic;
222         }
223         public static void setUpdate(String update) {
224                 ManualNotificationUpdateThread.update = update;
225         }
226         
227 }