Merge "MS Model Input Validation"
[policy/engine.git] / ONAP-PDP-REST / src / main / java / org / onap / policy / pdp / rest / api / services / NotificationService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP-PDP-REST
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.onap.policy.pdp.rest.api.services;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.PrintWriter;
25 import java.nio.file.Files;
26 import java.nio.file.Paths;
27 import java.nio.file.StandardOpenOption;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Date;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.UUID;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.TimeUnit;
36 import java.util.stream.Collectors;
37 import java.util.stream.Stream;
38
39 import org.onap.policy.api.PolicyException;
40 import org.onap.policy.common.logging.flexlogger.FlexLogger;
41 import org.onap.policy.common.logging.flexlogger.Logger;
42 import org.onap.policy.rest.XACMLRestProperties;
43 import org.onap.policy.utils.BusPublisher;
44 import org.onap.policy.xacml.api.XACMLErrorConstants;
45 import org.springframework.http.HttpStatus;
46
47 import com.att.research.xacml.util.XACMLProperties;
48
49 public class NotificationService {
50         public static final String BACKUPFILE = "topicBackup.txt";
51         private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName());
52         private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>();
53         private static int interval = 15000;  
54         private static Thread backUpthread = null;
55         private static Object resourceLock = new Object();
56         private static List<String> dmaapList = null;
57         private static String dmaapServers = null;
58         private static String aafLogin = null;
59         private static String aafPassword = null;
60         
61         private String notificationResponse = null;
62         private HttpStatus status = HttpStatus.BAD_REQUEST;
63         
64         /**
65          * NotificationService Constructor.  
66          * 
67          * @param notificationTopic Topic Name in String format. 
68          * @param requestID Request ID in String format. 
69          * @param serviceType Needs to be NotificationServiceType based enumeration value. 
70          */
71         public NotificationService(String notificationTopic, String requestID, NotificationServiceType serviceType) {
72                 init();
73                 if(dmaapServers==null || aafLogin==null || aafPassword==null){
74                         notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file";
75                         return;
76                 }
77                 UUID requestUUID = null;
78                 if (requestID != null && !requestID.isEmpty()) {
79                         try {
80                                 requestUUID = UUID.fromString(requestID);
81                         } catch (IllegalArgumentException e) {
82                                 requestUUID = UUID.randomUUID();
83                                 logger.info("Generated Random UUID: " + requestUUID.toString(), e);
84                         }
85                 }else{
86                         requestUUID = UUID.randomUUID();
87                         logger.info("Generated Random UUID: " + requestUUID.toString());
88                 }
89         try{
90             run(notificationTopic, serviceType);
91         }catch(PolicyException e){
92                 notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e;
93             status = HttpStatus.BAD_REQUEST;
94         }
95         }
96
97         private static void init() {
98                 if(dmaapServers==null || aafLogin==null || aafPassword==null){
99                         dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
100                         aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
101                         aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
102             interval = Integer.parseInt(XACMLProperties.getProperty("CLIENT_INTERVAL", Integer.toString(interval)));
103                         if(dmaapServers==null || aafLogin==null || aafPassword==null){
104                                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
105                                 return;
106                         }
107                         // Cleanup Values. 
108                         dmaapServers= dmaapServers.trim();
109                         aafLogin = aafLogin.trim();
110                         aafPassword = aafPassword.trim();
111                         // Get servers to List. 
112                         if(dmaapServers.contains(",")) {
113                                 dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
114                         } else {
115                                 dmaapList = new ArrayList<>();
116                                 dmaapList.add(dmaapServers);
117                         }
118                         callThread();
119                 }
120         }
121         
122         public static void reloadProps(){
123             dmaapServers = null;
124             aafLogin = null;
125             aafPassword = null;
126             backUpthread = null;
127         }
128
129         private void run(String notificationTopic, NotificationServiceType serviceType) throws PolicyException{
130                 // Check Validation
131         if(notificationTopic==null){
132                 String message = "Notification Topic is null";
133             logger.error(message);
134             throw new PolicyException(message);
135         }
136         notificationTopic = notificationTopic.trim();
137         if(notificationTopic.isEmpty()){
138                 String message = "Notification Topic is not valid. ";
139             logger.error(message);
140             throw new PolicyException(message);
141         }
142         // if already exists give error.Saying already registered. 
143         // Get Result. 
144         try{
145             status = HttpStatus.OK;
146             switch (serviceType) {
147                         case ADD:
148                                 addTopic(notificationTopic);
149                                 notificationResponse = "Success!! Please give permissions to " + aafLogin + " that PDP will use to publish on given topic :" + notificationTopic +
150                                 "\n Start calling /sendHeartbeat API at an interval less than " + Integer.toString(interval) + "ms";
151                                 break;
152                         case REMOVE:
153                                 removeTopic(notificationTopic);
154                                 notificationResponse = "Notification Topic :" + notificationTopic + " has been removed and PDP will not publish notifications to this Topic.";
155                                 break;
156                         case HB:
157                                 heartBeat(notificationTopic);
158                                 notificationResponse = "Success!! HeartBeat registered.";
159                                 break;
160                         }
161         }catch (Exception e){
162             logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
163             status = HttpStatus.BAD_REQUEST;
164             throw new PolicyException(e);
165         }
166         }
167         
168         // Used to register Heart beat. 
169         private void heartBeat(String notificationTopic) throws PolicyException{
170                 if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
171                         topicQueue.put(notificationTopic, new Date());
172                 }else{
173                         logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
174                         throw new PolicyException("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
175                 }
176         }
177
178         // Used to remove Topic.
179         private static void removeTopic(String notificationTopic) throws PolicyException{ 
180                 if(topicQueue.containsKey(notificationTopic)){
181                         topicQueue.remove(notificationTopic);
182                         removeTopicFromBackup(notificationTopic);
183                 }else{
184                         logger.info("Failed Removal, Topic " + notificationTopic + " is not registered.");
185                         throw new PolicyException("Failed Removal, Topic " + notificationTopic + " is not registered.");
186                 }
187         }
188
189         private static void removeTopicFromBackup(String notificationTopic) {
190                 synchronized (resourceLock) {
191                         try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) {
192                                 List<String> replaced = lines.map(line-> (line.split("=")[0].equals(notificationTopic)?"":line)).collect(Collectors.toList());
193                                 try (PrintWriter pw = new PrintWriter( BACKUPFILE, "UTF-8")) {
194                                         replaced.forEach(line-> {
195                                                 if(line.trim().isEmpty()){
196                                                         return;
197                                                 }
198                                                 pw.println(line);
199                                         });
200                                 }
201                                 lines.close();
202                         } catch (IOException e) {
203                                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e);
204                         }
205                 }
206         }
207
208         // Used to add Topic. 
209         private void addTopic(String notificationTopic) throws PolicyException{
210                 // validate if topic exists. 
211                 if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
212                         topicQueue.put(notificationTopic, new Date());
213                         logger.info("Topic " + notificationTopic + " is already registered.");
214                         throw new PolicyException("Topic " + notificationTopic + " is already registered.");
215                 }
216                 topicQueue.put(notificationTopic, new Date());
217                 addTopictoBackUp(notificationTopic);
218         }
219
220         private void addTopictoBackUp(String notificationTopic) {
221                 synchronized (resourceLock) {
222                         try {
223                                 Files.write(Paths.get(BACKUPFILE),( notificationTopic+"="+new Date().toString()+"\n").getBytes() , StandardOpenOption.APPEND);
224                         } catch (IOException e) {
225                                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e);
226                         }
227                 }
228         }
229
230         // Maintains BackUp and Queue Topic. 
231         private static void callThread() {
232                 // Create the backup file if it not exists. 
233                 backup();
234                 if(backUpthread==null){
235                         Runnable task = () -> {
236                                 logger.info("BackUpThread not set. Starting now !");
237                                 threadTask();
238                         };
239                         backUpthread = new Thread(task);
240                         backUpthread.start();
241                 }
242         }
243
244         private static void backup(){
245                 synchronized (resourceLock) {
246                         try{
247                                 File backUpFile = new File(BACKUPFILE);
248                                 if(!backUpFile.exists() && backUpFile.createNewFile()){
249                                         logger.info(" BackUp File for topic's has been created !");
250                                 }else{
251                                         // File Already exists. Process file and load the Memory. 
252                                         Stream<String> stream = Files.lines(Paths.get(BACKUPFILE));
253                                         Map<String,Date> data = stream.map(line -> line.split(",")).collect(Collectors.toMap(e->e[0],e-> new Date()));
254                                         stream.close();
255                                         data.forEach((key, value)->logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value));
256                                         topicQueue.putAll(data);
257                                 }
258                         }catch(IOException e){
259                                 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e);
260                         }
261                 }
262         }
263
264         private static void threadTask() {
265                 while(true){
266                         try {
267                                 TimeUnit.MILLISECONDS.sleep(interval);
268                                 for(Map.Entry<String, Date> map : topicQueue.entrySet()){
269                                         Date currentTime = new Date();
270                                         long timeDiff = 0;
271                                         timeDiff = currentTime.getTime()-map.getValue().getTime();
272                                         if(timeDiff > (interval+1500)){
273                                                 removeTopic(map.getKey());
274                                         }
275                                 }
276                         } catch (InterruptedException | PolicyException e) {
277                                 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
278                         }
279                 }
280         }
281
282         public String getResult() {
283                 return notificationResponse;
284         }
285
286         public HttpStatus getResponseCode() {
287                 return status;
288         }
289
290         /**
291          * Entry point for sending Notifications from Notification Server. 
292          * @param notification String JSON format of notification message which needs to be sent. 
293          */
294         public static void sendNotification(String notification) {
295                 init();
296                 for (String topic: topicQueue.keySet()){
297                         sendDmaapMessage(topic, notification);
298                 }
299         }
300         
301         private static void sendDmaapMessage(String topic, String notification) {
302                 BusPublisher publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList, 
303                                                                                topic, 
304                                                                                aafLogin, 
305                                                                                aafPassword);
306                 // Sending notification through DMaaP Message Router
307                 logger.info("NotificationService: send DMaaP Message. ");
308                 publisher.send( "MyPartitionKey", notification);
309                 logger.info("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
310                 publisher.close();
311         }
312         
313         /**
314          *  Notification service Type Enumeration 
315          */
316         public enum NotificationServiceType{
317                 ADD,
318                 REMOVE,
319                 HB
320         }
321
322 }