CHeckstyle and JUnit for base package in ONAP-REST
[policy/engine.git] / ONAP-PDP-REST / src / main / java / org / onap / policy / pdp / rest / api / services / NotificationService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.policy.pdp.rest.api.services;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.PrintWriter;
25 import java.nio.file.Files;
26 import java.nio.file.Paths;
27 import java.nio.file.StandardOpenOption;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Date;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.UUID;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.TimeUnit;
36 import java.util.stream.Collectors;
37 import java.util.stream.Stream;
38
39 import org.onap.policy.api.PolicyException;
40 import org.onap.policy.common.logging.flexlogger.FlexLogger;
41 import org.onap.policy.common.logging.flexlogger.Logger;
42 import org.onap.policy.rest.XacmlRestProperties;
43 import org.onap.policy.utils.BusPublisher;
44 import org.onap.policy.xacml.api.XACMLErrorConstants;
45 import org.springframework.http.HttpStatus;
46
47 import com.att.research.xacml.util.XACMLProperties;
48
49 public class NotificationService {
50     public static final String BACKUPFILE = "topicBackup.txt";
51     private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName());
52     private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>();
53     private static int interval = 15000;
54     private static Thread backUpthread = null;
55     private static Object resourceLock = new Object();
56     private static List<String> dmaapList = null;
57     private static String dmaapServers = null;
58     private static String aafLogin = null;
59     private static String aafPassword = null;
60
61     private String notificationResponse = null;
62     private HttpStatus status = HttpStatus.BAD_REQUEST;
63
64     /**
65      * NotificationService Constructor.
66      *
67      * @param notificationTopic Topic Name in String format.
68      * @param requestID Request ID in String format.
69      * @param serviceType Needs to be NotificationServiceType based enumeration value.
70      */
71     public NotificationService(
72             String notificationTopic, String requestID, NotificationServiceType serviceType) {
73         init();
74         if (dmaapServers == null || aafLogin == null || aafPassword == null) {
75             notificationResponse =
76                     XACMLErrorConstants.ERROR_DATA_ISSUE
77                     + "DMaaP properties are missing from the property file";
78             return;
79         }
80         UUID requestUUID = null;
81         if (requestID != null && !requestID.isEmpty()) {
82             try {
83                 requestUUID = UUID.fromString(requestID);
84             } catch (IllegalArgumentException e) {
85                 requestUUID = UUID.randomUUID();
86                 logger.info("Generated Random UUID: " + requestUUID.toString(), e);
87             }
88         } else {
89             requestUUID = UUID.randomUUID();
90             logger.info("Generated Random UUID: " + requestUUID.toString());
91         }
92         try {
93             run(notificationTopic, serviceType);
94         } catch (PolicyException e) {
95             notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e;
96             status = HttpStatus.BAD_REQUEST;
97         }
98     }
99
100     private static void init() {
101         if (dmaapServers == null || aafLogin == null || aafPassword == null) {
102             dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
103             aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
104             aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
105             interval =
106                     Integer.parseInt(
107                             XACMLProperties.getProperty("CLIENT_INTERVAL", Integer.toString(interval)));
108             if (dmaapServers == null || aafLogin == null || aafPassword == null) {
109                 logger.error(
110                         XACMLErrorConstants.ERROR_DATA_ISSUE
111                         + "DMaaP properties are missing from the property file ");
112                 return;
113             }
114             // Cleanup Values.
115             dmaapServers = dmaapServers.trim();
116             aafLogin = aafLogin.trim();
117             aafPassword = aafPassword.trim();
118             // Get servers to List.
119             if (dmaapServers.contains(",")) {
120                 dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
121             } else {
122                 dmaapList = new ArrayList<>();
123                 dmaapList.add(dmaapServers);
124             }
125             callThread();
126         }
127     }
128
129     public static void reloadProps() {
130         dmaapServers = null;
131         aafLogin = null;
132         aafPassword = null;
133         backUpthread = null;
134     }
135
136     private void run(String notificationTopic, NotificationServiceType serviceType)
137             throws PolicyException {
138         // Check Validation
139         if (notificationTopic == null) {
140             String message = "Notification Topic is null";
141             logger.error(message);
142             throw new PolicyException(message);
143         }
144         notificationTopic = notificationTopic.trim();
145         if (notificationTopic.isEmpty()) {
146             String message = "Notification Topic is not valid. ";
147             logger.error(message);
148             throw new PolicyException(message);
149         }
150         // if already exists give error.Saying already registered.
151         // Get Result.
152         try {
153             status = HttpStatus.OK;
154             switch (serviceType) {
155                 case ADD:
156                     addTopic(notificationTopic);
157                     notificationResponse =
158                             "Success!! Please give permissions to "
159                                     + aafLogin
160                                     + " that PDP will use to publish on given topic :"
161                                     + notificationTopic
162                                     + "\n Start calling /sendHeartbeat API at an interval less than "
163                                     + Integer.toString(interval)
164                                     + "ms";
165                     break;
166                 case REMOVE:
167                     removeTopic(notificationTopic);
168                     notificationResponse =
169                             "Notification Topic :"
170                                     + notificationTopic
171                                     + " has been removed and PDP will not publish notifications to this Topic.";
172                     break;
173                 case HB:
174                     heartBeat(notificationTopic);
175                     notificationResponse = "Success!! HeartBeat registered.";
176                     break;
177             }
178         } catch (Exception e) {
179             logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
180             status = HttpStatus.BAD_REQUEST;
181             throw new PolicyException(e);
182         }
183     }
184
185     // Used to register Heart beat.
186     private void heartBeat(String notificationTopic) throws PolicyException {
187         if (!topicQueue.isEmpty() && topicQueue.containsKey(notificationTopic)) {
188             topicQueue.put(notificationTopic, new Date());
189         } else {
190             logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
191             throw new PolicyException(
192                     "Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
193         }
194     }
195
196     // Used to remove Topic.
197     private static void removeTopic(String notificationTopic) throws PolicyException {
198         if (topicQueue.containsKey(notificationTopic)) {
199             topicQueue.remove(notificationTopic);
200             removeTopicFromBackup(notificationTopic);
201         } else {
202             logger.info("Failed Removal, Topic " + notificationTopic + " is not registered.");
203             throw new PolicyException(
204                     "Failed Removal, Topic " + notificationTopic + " is not registered.");
205         }
206     }
207
208     private static void removeTopicFromBackup(String notificationTopic) {
209         synchronized (resourceLock) {
210             try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) {
211                 List<String> replaced =
212                         lines
213                         .map(line -> (line.split("=")[0].equals(notificationTopic) ? "" : line))
214                         .collect(Collectors.toList());
215                 try (PrintWriter pw = new PrintWriter(BACKUPFILE, "UTF-8")) {
216                     replaced.forEach(
217                             line -> {
218                                 if (line.trim().isEmpty()) {
219                                     return;
220                                 }
221                                 pw.println(line);
222                             });
223                 }
224             } catch (IOException e) {
225                 logger.error(
226                         XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e);
227             }
228         }
229     }
230
231     // Used to add Topic.
232     private void addTopic(String notificationTopic) throws PolicyException {
233         // validate if topic exists.
234         if (!topicQueue.isEmpty() && topicQueue.containsKey(notificationTopic)) {
235             topicQueue.put(notificationTopic, new Date());
236             logger.info("Topic " + notificationTopic + " is already registered.");
237             throw new PolicyException("Topic " + notificationTopic + " is already registered.");
238         }
239         topicQueue.put(notificationTopic, new Date());
240         addTopictoBackUp(notificationTopic);
241     }
242
243     private void addTopictoBackUp(String notificationTopic) {
244         synchronized (resourceLock) {
245             try {
246                 Files.write(
247                         Paths.get(BACKUPFILE),
248                         (notificationTopic + "=" + new Date().toString() + "\n").getBytes(),
249                         StandardOpenOption.APPEND);
250             } catch (IOException e) {
251                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e);
252             }
253         }
254     }
255
256     // Maintains BackUp and Queue Topic.
257     private static void callThread() {
258         // Create the backup file if it not exists.
259         backup();
260         if (backUpthread == null) {
261             Runnable task =
262                     () -> {
263                         logger.info("BackUpThread not set. Starting now !");
264                         threadTask();
265                     };
266                     backUpthread = new Thread(task);
267                     backUpthread.start();
268         }
269     }
270
271     private static void backup() {
272         synchronized (resourceLock) {
273             try {
274                 File backUpFile = new File(BACKUPFILE);
275                 if (!backUpFile.exists() && backUpFile.createNewFile()) {
276                     logger.info(" BackUp File for topic's has been created !");
277                 } else {
278                     // File Already exists. Process file and load the Memory.
279                     Stream<String> stream = Files.lines(Paths.get(BACKUPFILE));
280                     Map<String, Date> data =
281                             stream
282                             .map(line -> line.split(","))
283                             .collect(Collectors.toMap(e -> e[0], e -> new Date()));
284                     stream.close();
285                     data.forEach(
286                             (key, value) ->
287                             logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value));
288                     topicQueue.putAll(data);
289                 }
290             } catch (IOException e) {
291                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e);
292             }
293         }
294     }
295
296     private static void threadTask() {
297         while (true) {
298             try {
299                 TimeUnit.MILLISECONDS.sleep(interval);
300                 for (Map.Entry<String, Date> map : topicQueue.entrySet()) {
301                     Date currentTime = new Date();
302                     long timeDiff = 0;
303                     timeDiff = currentTime.getTime() - map.getValue().getTime();
304                     if (timeDiff > (interval + 1500)) {
305                         removeTopic(map.getKey());
306                     }
307                 }
308             } catch (InterruptedException e) {
309                 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
310                 Thread.currentThread().interrupt();
311             } catch (PolicyException e) {
312                 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
313             }
314         }
315     }
316
317     public String getResult() {
318         return notificationResponse;
319     }
320
321     public HttpStatus getResponseCode() {
322         return status;
323     }
324
325     /**
326      * Entry point for sending Notifications from Notification Server.
327      *
328      * @param notification String JSON format of notification message which needs to be sent.
329      */
330     public static void sendNotification(String notification) {
331         init();
332         for (String topic : topicQueue.keySet()) {
333             sendDmaapMessage(topic, notification);
334         }
335     }
336
337     private static void sendDmaapMessage(String topic, String notification) {
338         BusPublisher publisher =
339                 new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
340         // Sending notification through DMaaP Message Router
341         logger.info("NotificationService: send DMaaP Message. ");
342         publisher.send("MyPartitionKey", notification);
343         logger.info("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
344         publisher.close();
345     }
346
347     /** Notification service Type Enumeration */
348     public enum NotificationServiceType {
349         ADD,
350         REMOVE,
351         HB
352     }
353 }