Policy 1707 commit to LF
[policy/engine.git] / ECOMP-PDP-REST / src / main / java / org / openecomp / policy / pdp / rest / notifications / ManualNotificationUpdateThread.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ECOMP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.pdp.rest.notifications;
22
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.net.URL;
26 import java.security.GeneralSecurityException;
27 import java.util.ArrayList;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.UUID;
31
32 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
33 import org.openecomp.policy.common.logging.flexlogger.Logger;
34 import org.openecomp.policy.rest.XACMLRestProperties;
35 import org.openecomp.policy.utils.BusConsumer;
36 import org.openecomp.policy.utils.BusPublisher;
37 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
38
39 import com.att.nsa.cambria.client.CambriaClientFactory;
40 import com.att.nsa.cambria.client.CambriaConsumer;
41 import com.att.nsa.cambria.client.CambriaPublisher;
42 import com.att.research.xacml.util.XACMLProperties;
43
44 public class ManualNotificationUpdateThread implements Runnable {
45
46         private static final Logger LOGGER      = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
47
48         private static String topic = null;
49         private static CambriaConsumer CConsumer = null;
50         private static String clusterList = null;
51         private static String update = null;
52         private static BusConsumer dmaapConsumer = null;
53         private static List<String> dmaapList = null;
54         private static String propNotificationType = null;
55         private static String aafLogin = null;
56         private static String aafPassword = null;
57         
58         public volatile boolean isRunning = false;
59         
60         public synchronized boolean isRunning() {
61                 return this.isRunning;
62         }
63         
64         public synchronized void terminate() {
65                 this.isRunning = false;
66         }
67         
68         /**
69          * 
70          * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
71          * 
72          */
73         @Override
74         public void run() {
75                 synchronized(this) {
76                         this.isRunning = true;
77                 }
78                 
79                 URL aURL = null;
80                 String group =  UUID.randomUUID ().toString ();
81                 String id = "0";
82                 String returnTopic = null;
83                 propNotificationType = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TYPE);
84                 if ("ueb".equals(propNotificationType)){
85                         try {
86                                 clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS).trim();
87                                 String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
88                                 aURL = new URL(url);
89                                 topic = aURL.getHost() + aURL.getPort();
90                         } catch (NumberFormatException e) {
91                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
92                                 this.isRunning = false;
93                         } catch (MalformedURLException e) {
94                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
95                         }
96                         
97                         String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
98                         SendMessage(consumerTopic, "Starting-Topic");
99                         final LinkedList<String> urlList = new LinkedList<String> ();
100                         for ( String u : clusterList.split ( "," ) ){
101                                 urlList.add ( u );
102                         }
103                         
104                         try {
105                                 CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
106                         } catch (MalformedURLException | GeneralSecurityException e1) {
107                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create UEB Consumer: ", e1);
108                         }
109
110                         while (this.isRunning()) {
111                                 LOGGER.debug("While loop test _ take out ");
112                                 try {
113                                         for ( String msg : CConsumer.fetch () ){                
114                                                 LOGGER.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
115                                                 returnTopic = processMessage(msg);
116                                                 if(returnTopic != null){
117                                                         SendMessage(returnTopic, update);
118                                                 }
119                                         }
120                                 } catch (IOException e) {
121                                         LOGGER.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
122                                 }
123                         }
124                         LOGGER.debug("Stopping UEB Consumer loop will no longer fetch messages from the cluster");      
125                 } else if ("dmaap".equals(propNotificationType)) {
126                         String dmaapServers = null;
127                         try {
128                                 dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
129                                 topic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC);
130                                 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
131                                 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
132                         } catch (Exception e) {
133                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get DMaaP servers list:", e);
134                                 this.isRunning = false;
135                         } 
136                         
137                         if(dmaapServers==null || topic==null){
138                                 LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
139                                 try {
140                                         throw new Exception(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
141                                 } catch (Exception e) {
142                                         // TODO Auto-generated catch block
143                                         e.printStackTrace();
144                                 }
145                         }
146                         
147                         dmaapServers.trim();
148                         topic.trim();
149                         aafLogin.trim();
150                         aafPassword.trim();
151                         
152                         String consumerTopic = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_TOPIC).trim();
153                         SendMessage(consumerTopic, "Starting-Topic");
154                         dmaapList = new ArrayList<String>();
155                         for ( String u : dmaapServers.split ( "," ) ){
156                                 dmaapList.add ( u );
157                         }
158                         
159                         try {
160                                 
161                                 dmaapConsumer = new BusConsumer.DmaapConsumerWrapper(dmaapList, consumerTopic, aafLogin, aafPassword, group, id, 20*1000, 1000);
162                         } catch (Exception e1) {
163                                 LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Unable to create DMaaP Consumer: ", e1);
164                         }
165
166                         while (this.isRunning()) {
167                                 LOGGER.debug("While loop test _ take out ");
168                                 try {
169                                         for ( String msg : dmaapConsumer.fetch () ){            
170                                                 LOGGER.debug("Manual Notification Recieved Message " + msg + " from DMaaP server : ");
171                                                 returnTopic = processMessage(msg);
172                                                 if(returnTopic != null){
173                                                         SendMessage(returnTopic, update);
174                                                 }
175                                         }
176                                 }catch (Exception e) {
177                                         LOGGER.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing DMaaP message: ", e);                          }
178                         }
179                         LOGGER.debug("Stopping DMaaP Consumer loop will no longer fetch messages from the servers");    
180                 }
181         }
182
183         private void SendMessage( String topic, String message) {
184                 CambriaPublisher pub = null;
185                 BusPublisher publisher = null;
186                 try {
187                         if ("ueb".equals(propNotificationType)) {
188                                 pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
189                                 pub.send( "pdpReturnMessage", message );
190                                 LOGGER.debug("Sending Message to UEB topic: " + topic);
191                                 pub.close();
192                                 
193                         } else if ("dmaap".equals(propNotificationType)){
194                                 publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,topic,aafLogin,aafPassword);
195                                 publisher.send( "pdpReturnMessage", message );
196                                 LOGGER.debug("Sending to Message to DMaaP topic: " + topic);
197                                 publisher.close();
198                         }
199
200                 } catch (Exception e) {
201                         LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update: ", e);
202                 }
203
204                 try {
205                         pub.send( "pdpReturnMessage", message );
206                         LOGGER.debug("Sending to Message to tpoic" + topic);
207                 } catch (IOException e) {
208                         LOGGER.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
209                 }       
210                 pub.close();            
211         }
212
213         private String processMessage(String msg) {
214                 LOGGER.debug("notification message:  " + msg);
215                 String[] UID = msg.split("=")[1].split("\"");
216                 
217                 String returnTopic = topic + UID[0];
218                 if(msg.contains("Starting-Topic")){
219                         return null;
220                 }
221                 return returnTopic;
222         }
223         public static void setUpdate(String update) {
224                 ManualNotificationUpdateThread.update = update;
225         }
226         
227 }