Initial OpenECOMP policy/engine commit
[policy/engine.git] / ECOMP-PDP-REST / src / main / java / org / openecomp / policy / pdp / rest / notifications / ManualNotificationUpdateThread.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ECOMP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.pdp.rest.notifications;
22
23 import java.io.IOException;
24 import java.net.MalformedURLException;
25 import java.net.URL;
26 import java.security.GeneralSecurityException;
27 import java.util.LinkedList;
28 import java.util.UUID;
29
30 import org.openecomp.policy.rest.XACMLRestProperties;
31
32 import com.att.nsa.cambria.client.CambriaClientFactory;
33 import com.att.nsa.cambria.client.CambriaConsumer;
34 import com.att.nsa.cambria.client.CambriaPublisher;
35 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
36 import com.att.research.xacml.util.XACMLProperties;
37
38 import org.openecomp.policy.common.logging.flexlogger.*;
39
40 public class ManualNotificationUpdateThread implements Runnable {
41         private static final Logger logger      = FlexLogger.getLogger(ManualNotificationUpdateThread.class);
42 //      private static List<String> uebURLList = null;
43         private static String topic = null;
44         private static CambriaConsumer CConsumer = null;
45 //      private static Collection<String>  clusterList = null;
46         private static String clusterList = null;
47 //      private Collection<String> urlList  = null;
48         private static String update = null;
49         
50         public volatile boolean isRunning = false;
51         
52         public synchronized boolean isRunning() {
53                 return this.isRunning;
54         }
55         
56         public synchronized void terminate() {
57                 this.isRunning = false;
58         }
59         
60         /**
61          * 
62          * This is our thread that runs on startup if the system is configured to UEB to accept manual update requests
63          * 
64          */
65         @Override
66         public void run() {
67                 synchronized(this) {
68                         this.isRunning = true;
69                 }
70                 URL aURL = null;
71                 String group =  UUID.randomUUID ().toString ();
72                 String id = "0";
73                 String returnTopic = null;
74                 try {
75                         ManualNotificationUpdateThread.clusterList = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_UEB_CLUSTER);
76                         String url = XACMLProperties.getProperty(XACMLRestProperties.PROP_PDP_ID);
77                         aURL = new URL(url);
78                         topic = aURL.getHost() + aURL.getPort();
79                 } catch (NumberFormatException e) {
80                         logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Unable to get UEB cluster list or pdp url: ", e);
81                         this.isRunning = false;
82                 } catch (MalformedURLException e) {
83                         logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing URL to create topic for Notification ", e);
84                 }
85                 String consumerTopic = aURL.getHost() + aURL.getPort() + "UpdateRequest";
86                 SendMessage(consumerTopic, "Starting-Topic");
87                 final LinkedList<String> urlList = new LinkedList<String> ();
88                 for ( String u : clusterList.split ( "," ) ){
89                         urlList.add ( u );
90                 }
91                 
92                 try {
93                         CConsumer = CambriaClientFactory.createConsumer ( null, urlList, consumerTopic , group, id, 20*1000, 1000 );
94                 } catch (MalformedURLException | GeneralSecurityException e1) {
95                         // TODO Auto-generated catch block
96                         e1.printStackTrace();
97                 }
98
99
100                 while (this.isRunning()) {
101                         logger.debug("While loop test _ take out ");
102                         try {
103                                 for ( String msg : CConsumer.fetch () ){                
104                                         logger.debug("Manual Notification Recieved Message " + msg + " from UEB cluster : ");
105                                         returnTopic = processMessage(msg);
106                                         if(returnTopic != null){
107                                                 SendMessage(returnTopic, update);
108                                         }
109                                 }
110                         } catch (IOException e) {
111                                 logger.debug(XACMLErrorConstants.ERROR_DATA_ISSUE + "Error in processing UEB message");
112                         }
113                 }
114                 logger.debug("Stopping UEB Consuer loop will not logger fetch messages from the cluser");
115
116                 }
117
118         private void SendMessage( String topic, String message) {
119                 CambriaPublisher pub = null;
120                 try {
121                         pub = CambriaClientFactory.createSimplePublisher (null, clusterList, topic );
122                 } catch (MalformedURLException e1) {
123                         // TODO Auto-generated catch block
124                         e1.printStackTrace();
125                 } catch (GeneralSecurityException e1) {
126                         // TODO Auto-generated catch block
127                         e1.printStackTrace();
128                 }
129                 try {
130                         pub.send( "pdpReturnMessage", message );
131                         logger.debug("Sending to Message to tpoic" + topic);
132                 } catch (IOException e) {
133                         logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW+ "Error sending notification update");
134                 }       
135                 pub.close();            
136         }
137
138         private String processMessage(String msg) {
139                 logger.debug("notification message:  " + msg);
140                 String[] UID = msg.split("=")[1].split("\"");
141                 String returnTopic = topic + UID[0];
142                 if(msg.contains("Starting-Topic")){
143                         return null;
144                 }
145                 return returnTopic;
146         }
147         public static void setUpdate(String update) {
148                 ManualNotificationUpdateThread.update = update;
149         }
150         
151 }