2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.policy.pdp.rest.api.services;
23 import java.io.IOException;
24 import java.io.PrintWriter;
25 import java.nio.file.Files;
26 import java.nio.file.Paths;
27 import java.nio.file.StandardOpenOption;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Date;
31 import java.util.List;
33 import java.util.UUID;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.TimeUnit;
36 import java.util.stream.Collectors;
37 import java.util.stream.Stream;
39 import org.onap.policy.api.PolicyException;
40 import org.onap.policy.common.logging.flexlogger.FlexLogger;
41 import org.onap.policy.common.logging.flexlogger.Logger;
42 import org.onap.policy.rest.XACMLRestProperties;
43 import org.onap.policy.utils.BusPublisher;
44 import org.onap.policy.xacml.api.XACMLErrorConstants;
45 import org.springframework.http.HttpStatus;
47 import com.att.research.xacml.util.XACMLProperties;
49 public class NotificationService {
50 public static final String BACKUPFILE = "topicBackup.txt";
51 private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName());
52 private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>();
53 private static int interval = 15000;
54 private static Thread backUpthread = null;
55 private static Object resourceLock = new Object();
56 private static List<String> dmaapList = null;
57 private static String dmaapServers = null;
58 private static String aafLogin = null;
59 private static String aafPassword = null;
61 private String notificationResponse = null;
62 private HttpStatus status = HttpStatus.BAD_REQUEST;
65 * NotificationService Constructor.
67 * @param notificationTopic Topic Name in String format.
68 * @param requestID Request ID in String format.
69 * @param serviceType Needs to be NotificationServiceType based enumeration value.
71 public NotificationService(String notificationTopic, String requestID, NotificationServiceType serviceType) {
73 if(dmaapServers==null || aafLogin==null || aafPassword==null){
74 notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file";
77 UUID requestUUID = null;
78 if (requestID != null && !requestID.isEmpty()) {
80 requestUUID = UUID.fromString(requestID);
81 } catch (IllegalArgumentException e) {
82 requestUUID = UUID.randomUUID();
83 logger.info("Generated Random UUID: " + requestUUID.toString(), e);
86 requestUUID = UUID.randomUUID();
87 logger.info("Generated Random UUID: " + requestUUID.toString());
90 run(notificationTopic, serviceType);
91 }catch(PolicyException e){
92 notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e;
93 status = HttpStatus.BAD_REQUEST;
97 private static void init() {
98 if(dmaapServers==null || aafLogin==null || aafPassword==null){
99 dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
100 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
101 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
102 interval = Integer.parseInt(XACMLProperties.getProperty("CLIENT_INTERVAL", Integer.toString(interval)));
103 if(dmaapServers==null || aafLogin==null || aafPassword==null){
104 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
108 dmaapServers= dmaapServers.trim();
109 aafLogin = aafLogin.trim();
110 aafPassword = aafPassword.trim();
111 // Get servers to List.
112 if(dmaapServers.contains(",")) {
113 dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
115 dmaapList = new ArrayList<>();
116 dmaapList.add(dmaapServers);
122 public static void reloadProps(){
129 private void run(String notificationTopic, NotificationServiceType serviceType) throws PolicyException{
131 if(notificationTopic==null){
132 String message = "Notification Topic is null";
133 logger.error(message);
134 throw new PolicyException(message);
136 notificationTopic = notificationTopic.trim();
137 if(notificationTopic.isEmpty()){
138 String message = "Notification Topic is not valid. ";
139 logger.error(message);
140 throw new PolicyException(message);
142 // if already exists give error.Saying already registered.
145 status = HttpStatus.OK;
146 switch (serviceType) {
148 addTopic(notificationTopic);
149 notificationResponse = "Success!! Please give permissions to " + aafLogin + " that PDP will use to publish on given topic :" + notificationTopic +
150 "\n Start calling /sendHeartbeat API at an interval less than " + Integer.toString(interval) + "ms";
153 removeTopic(notificationTopic);
154 notificationResponse = "Notification Topic :" + notificationTopic + " has been removed and PDP will not publish notifications to this Topic.";
157 heartBeat(notificationTopic);
158 notificationResponse = "Success!! HeartBeat registered.";
161 }catch (Exception e){
162 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
163 status = HttpStatus.BAD_REQUEST;
164 throw new PolicyException(e);
168 // Used to register Heart beat.
169 private void heartBeat(String notificationTopic) throws PolicyException{
170 if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
171 topicQueue.put(notificationTopic, new Date());
173 logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
174 throw new PolicyException("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
178 // Used to remove Topic.
179 private static void removeTopic(String notificationTopic) throws PolicyException{
180 if(topicQueue.containsKey(notificationTopic)){
181 topicQueue.remove(notificationTopic);
182 removeTopicFromBackup(notificationTopic);
184 logger.info("Failed Removal, Topic " + notificationTopic + " is not registered.");
185 throw new PolicyException("Failed Removal, Topic " + notificationTopic + " is not registered.");
189 private static void removeTopicFromBackup(String notificationTopic) {
190 synchronized (resourceLock) {
191 try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) {
192 List<String> replaced = lines.map(line-> (line.split("=")[0].equals(notificationTopic)?"":line)).collect(Collectors.toList());
193 try (PrintWriter pw = new PrintWriter( BACKUPFILE, "UTF-8")) {
194 replaced.forEach(line-> {
195 if(line.trim().isEmpty()){
202 } catch (IOException e) {
203 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e);
208 // Used to add Topic.
209 private void addTopic(String notificationTopic) throws PolicyException{
210 // validate if topic exists.
211 if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
212 topicQueue.put(notificationTopic, new Date());
213 logger.info("Topic " + notificationTopic + " is already registered.");
214 throw new PolicyException("Topic " + notificationTopic + " is already registered.");
216 topicQueue.put(notificationTopic, new Date());
217 addTopictoBackUp(notificationTopic);
220 private void addTopictoBackUp(String notificationTopic) {
221 synchronized (resourceLock) {
223 Files.write(Paths.get(BACKUPFILE),( notificationTopic+"="+new Date().toString()+"\n").getBytes() , StandardOpenOption.APPEND);
224 } catch (IOException e) {
225 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e);
230 // Maintains BackUp and Queue Topic.
231 private static void callThread() {
232 // Create the backup file if it not exists.
234 if(backUpthread==null){
235 Runnable task = () -> {
236 logger.info("BackUpThread not set. Starting now !");
239 backUpthread = new Thread(task);
240 backUpthread.start();
244 private static void backup(){
245 synchronized (resourceLock) {
247 File backUpFile = new File(BACKUPFILE);
248 if(!backUpFile.exists() && backUpFile.createNewFile()){
249 logger.info(" BackUp File for topic's has been created !");
251 // File Already exists. Process file and load the Memory.
252 Stream<String> stream = Files.lines(Paths.get(BACKUPFILE));
253 Map<String,Date> data = stream.map(line -> line.split(",")).collect(Collectors.toMap(e->e[0],e-> new Date()));
255 data.forEach((key, value)->logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value));
256 topicQueue.putAll(data);
258 }catch(IOException e){
259 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e);
264 private static void threadTask() {
267 TimeUnit.MILLISECONDS.sleep(interval);
268 for(Map.Entry<String, Date> map : topicQueue.entrySet()){
269 Date currentTime = new Date();
271 timeDiff = currentTime.getTime()-map.getValue().getTime();
272 if(timeDiff > (interval+1500)){
273 removeTopic(map.getKey());
276 } catch (InterruptedException | PolicyException e) {
277 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
282 public String getResult() {
283 return notificationResponse;
286 public HttpStatus getResponseCode() {
291 * Entry point for sending Notifications from Notification Server.
292 * @param notification String JSON format of notification message which needs to be sent.
294 public static void sendNotification(String notification) {
296 for (String topic: topicQueue.keySet()){
297 sendDmaapMessage(topic, notification);
301 private static void sendDmaapMessage(String topic, String notification) {
302 BusPublisher publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,
306 // Sending notification through DMaaP Message Router
307 logger.info("NotificationService: send DMaaP Message. ");
308 publisher.send( "MyPartitionKey", notification);
309 logger.info("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
314 * Notification service Type Enumeration
316 public enum NotificationServiceType{