[POLICY-73] replace openecomp for policy-engine
[policy/engine.git] / ONAP-PDP-REST / src / main / java / org / onap / policy / pdp / rest / api / services / NotificationService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.policy.pdp.rest.api.services;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.PrintWriter;
25 import java.nio.file.Files;
26 import java.nio.file.Paths;
27 import java.nio.file.StandardOpenOption;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Date;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.UUID;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.TimeUnit;
36 import java.util.stream.Collectors;
37 import java.util.stream.Stream;
38
39 import org.onap.policy.api.PolicyException;
40 import org.onap.policy.common.logging.flexlogger.FlexLogger;
41 import org.onap.policy.common.logging.flexlogger.Logger;
42 import org.onap.policy.rest.XACMLRestProperties;
43 import org.onap.policy.utils.BusPublisher;
44 import org.onap.policy.xacml.api.XACMLErrorConstants;
45 import org.springframework.http.HttpStatus;
46
47 import com.att.research.xacml.util.XACMLProperties;
48
49 public class NotificationService {
50         public static final String BACKUPFILE = "topicBackup.txt";
51         private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName());
52         private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>();
53         private static int interval = 15000;  
54         private static Thread backUpthread = null;
55         private static Object resourceLock = new Object();
56         private static List<String> dmaapList = null;
57         private static String dmaapServers = null;
58         private static String aafLogin = null;
59         private static String aafPassword = null;
60         
61         private String notificationResponse = null;
62         private HttpStatus status = HttpStatus.BAD_REQUEST;
63         
64         /**
65          * NotificationService Constructor.  
66          * 
67          * @param notificationTopic Topic Name in String format. 
68          * @param requestID Request ID in String format. 
69          * @param serviceType Needs to be NotificationServiceType based enumeration value. 
70          */
71         public NotificationService(String notificationTopic, String requestID, NotificationServiceType serviceType) {
72                 init();
73                 if(dmaapServers==null || aafLogin==null || aafPassword==null){
74                         notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file";
75                         return;
76                 }
77                 UUID requestUUID = null;
78                 if (requestID != null && !requestID.isEmpty()) {
79                         try {
80                                 requestUUID = UUID.fromString(requestID);
81                         } catch (IllegalArgumentException e) {
82                                 requestUUID = UUID.randomUUID();
83                                 logger.info("Generated Random UUID: " + requestUUID.toString(), e);
84                         }
85                 }else{
86                         requestUUID = UUID.randomUUID();
87                         logger.info("Generated Random UUID: " + requestUUID.toString());
88                 }
89         try{
90             run(notificationTopic, serviceType);
91         }catch(PolicyException e){
92                 notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e;
93             status = HttpStatus.BAD_REQUEST;
94         }
95         }
96
97         private static void init() {
98                 if(dmaapServers==null || aafLogin==null || aafPassword==null){
99                         dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
100                         aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
101                         aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
102                         interval = Integer.parseInt(XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_DELAY, Integer.toString(interval)));
103                         if(dmaapServers==null || aafLogin==null || aafPassword==null){
104                                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
105                                 return;
106                         }
107                         // Cleanup Values. 
108                         dmaapServers= dmaapServers.trim();
109                         aafLogin = aafLogin.trim();
110                         aafPassword = aafPassword.trim();
111                         // Get servers to List. 
112                         if(dmaapServers.contains(",")) {
113                                 dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
114                         } else {
115                                 dmaapList = new ArrayList<>();
116                                 dmaapList.add(dmaapServers);
117                         }
118                         callThread();
119                 }
120         }
121
122         private void run(String notificationTopic, NotificationServiceType serviceType) throws PolicyException{
123                 // Check Validation
124         if(notificationTopic==null){
125                 String message = "Notification Topic is null";
126             logger.error(message);
127             throw new PolicyException(message);
128         }
129         notificationTopic = notificationTopic.trim();
130         if(notificationTopic.isEmpty()){
131                 String message = "Notification Topic is not valid. ";
132             logger.error(message);
133             throw new PolicyException(message);
134         }
135         // if already exists give error.Saying already registered. 
136         // Get Result. 
137         try{
138             status = HttpStatus.OK;
139             switch (serviceType) {
140                         case ADD:
141                                 addTopic(notificationTopic);
142                                 notificationResponse = "Success!! Please give permissions to " + aafLogin + " that PDP will use to publish on given topic :" + notificationTopic +
143                                 "\n Start calling /sendHeartbeat API at an interval less than " + Integer.toString(interval) + "ms";
144                                 break;
145                         case REMOVE:
146                                 removeTopic(notificationTopic);
147                                 notificationResponse = "Notification Topic :" + notificationTopic + " has been removed and PDP will not publish notifications to this Topic.";
148                                 break;
149                         case HB:
150                                 heartBeat(notificationTopic);
151                                 notificationResponse = "Success!! HeartBeat registered.";
152                                 break;
153                         }
154         }catch (Exception e){
155             logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
156             status = HttpStatus.BAD_REQUEST;
157             throw new PolicyException(e);
158         }
159         }
160         
161         // Used to register Heart beat. 
162         private void heartBeat(String notificationTopic) throws PolicyException{
163                 if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
164                         topicQueue.put(notificationTopic, new Date());
165                 }else{
166                         logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
167                         throw new PolicyException("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
168                 }
169         }
170
171         // Used to remove Topic.
172         private static void removeTopic(String notificationTopic) throws PolicyException{ 
173                 if(topicQueue.containsKey(notificationTopic)){
174                         topicQueue.remove(notificationTopic);
175                         removeTopicFromBackup(notificationTopic);
176                 }else{
177                         logger.info("Failed Removal, Topic " + notificationTopic + " is not registered.");
178                         throw new PolicyException("Failed Removal, Topic " + notificationTopic + " is not registered.");
179                 }
180         }
181
182         private static void removeTopicFromBackup(String notificationTopic) {
183                 synchronized (resourceLock) {
184                         try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) {
185                                 List<String> replaced = lines.map(line-> (line.split("=")[0].equals(notificationTopic)?"":line)).collect(Collectors.toList());
186                                 try (PrintWriter pw = new PrintWriter( BACKUPFILE, "UTF-8")) {
187                                         replaced.forEach(line-> {
188                                                 if(line.trim().isEmpty()){
189                                                         return;
190                                                 }
191                                                 pw.println(line);
192                                         });
193                                 }
194                                 lines.close();
195                         } catch (IOException e) {
196                                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e);
197                         }
198                 }
199         }
200
201         // Used to add Topic. 
202         private void addTopic(String notificationTopic) throws PolicyException{
203                 // validate if topic exists. 
204                 if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
205                         topicQueue.put(notificationTopic, new Date());
206                         logger.info("Topic " + notificationTopic + " is already registered.");
207                         throw new PolicyException("Topic " + notificationTopic + " is already registered.");
208                 }
209                 topicQueue.put(notificationTopic, new Date());
210                 addTopictoBackUp(notificationTopic);
211         }
212
213         private void addTopictoBackUp(String notificationTopic) {
214                 synchronized (resourceLock) {
215                         try {
216                                 Files.write(Paths.get(BACKUPFILE),( notificationTopic+"="+new Date().toString()+"\n").getBytes() , StandardOpenOption.APPEND);
217                         } catch (IOException e) {
218                                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e);
219                         }
220                 }
221         }
222
223         // Maintains BackUp and Queue Topic. 
224         private static void callThread() {
225                 // Create the backup file if it not exists. 
226                 backup();
227                 if(backUpthread==null){
228                         Runnable task = () -> {
229                                 logger.info("BackUpThread not set. Starting now !");
230                                 threadTask();
231                         };
232                         backUpthread = new Thread(task);
233                         backUpthread.start();
234                 }
235         }
236
237         private static void backup(){
238                 synchronized (resourceLock) {
239                         try{
240                                 File backUpFile = new File(BACKUPFILE);
241                                 if(!backUpFile.exists() && backUpFile.createNewFile()){
242                                         logger.info(" BackUp File for topic's has been created !");
243                                 }else{
244                                         // File Already exists. Process file and load the Memory. 
245                                         Stream<String> stream = Files.lines(Paths.get(BACKUPFILE));
246                                         Map<String,Date> data = stream.map(line -> line.split(",")).collect(Collectors.toMap(e->e[0],e-> new Date()));
247                                         stream.close();
248                                         data.forEach((key, value)->logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value));
249                                         topicQueue.putAll(data);
250                                 }
251                         }catch(IOException e){
252                                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e);
253                         }
254                 }
255         }
256
257         private static void threadTask() {
258                 while(true){
259                         try {
260                                 TimeUnit.MILLISECONDS.sleep(interval);
261                                 for(Map.Entry<String, Date> map : topicQueue.entrySet()){
262                                         Date currentTime = new Date();
263                                         long timeDiff = 0;
264                                         timeDiff = currentTime.getTime()-map.getValue().getTime();
265                                         if(timeDiff < (interval+1500)){
266                                                 removeTopic(map.getKey());
267                                         }
268                                 }
269                         } catch (InterruptedException | PolicyException e) {
270                                 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
271                         }
272                 }
273         }
274
275         public String getResult() {
276                 return notificationResponse;
277         }
278
279         public HttpStatus getResponseCode() {
280                 return status;
281         }
282
283         /**
284          * Entry point for sending Notifications from Notification Server. 
285          * @param notification String JSON format of notification message which needs to be sent. 
286          */
287         public static void sendNotification(String notification) {
288                 init();
289                 for (String topic: topicQueue.keySet()){
290                         sendDmaapMessage(topic, notification);
291                 }
292         }
293         
294         private static void sendDmaapMessage(String topic, String notification) {
295                 BusPublisher publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList, 
296                                                                                topic, 
297                                                                                aafLogin, 
298                                                                                aafPassword);
299                 // Sending notification through DMaaP Message Router
300                 publisher.send( "MyPartitionKey", notification);
301                 logger.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
302                 publisher.close();
303         }
304         
305         /**
306          *  Notification service Type Enumeration 
307          */
308         public enum NotificationServiceType{
309                 ADD,
310                 REMOVE,
311                 HB
312         }
313
314 }