2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.policy.pdp.rest.api.services;
23 import java.io.IOException;
24 import java.io.PrintWriter;
25 import java.nio.file.Files;
26 import java.nio.file.Paths;
27 import java.nio.file.StandardOpenOption;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Date;
31 import java.util.List;
33 import java.util.UUID;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.TimeUnit;
36 import java.util.stream.Collectors;
37 import java.util.stream.Stream;
39 import org.onap.policy.api.PolicyException;
40 import org.onap.policy.common.logging.flexlogger.FlexLogger;
41 import org.onap.policy.common.logging.flexlogger.Logger;
42 import org.onap.policy.rest.XacmlRestProperties;
43 import org.onap.policy.utils.BusPublisher;
44 import org.onap.policy.xacml.api.XACMLErrorConstants;
45 import org.springframework.http.HttpStatus;
47 import com.att.research.xacml.util.XACMLProperties;
49 public class NotificationService {
50 public static final String BACKUPFILE = "topicBackup.txt";
51 private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName());
52 private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>();
53 private static int interval = 15000;
54 private static Thread backUpthread = null;
55 private static Object resourceLock = new Object();
56 private static List<String> dmaapList = null;
57 private static String dmaapServers = null;
58 private static String aafLogin = null;
59 private static String aafPassword = null;
61 private String notificationResponse = null;
62 private HttpStatus status = HttpStatus.BAD_REQUEST;
65 * NotificationService Constructor.
67 * @param notificationTopic Topic Name in String format.
68 * @param requestID Request ID in String format.
69 * @param serviceType Needs to be NotificationServiceType based enumeration value.
71 public NotificationService(
72 String notificationTopic, String requestID, NotificationServiceType serviceType) {
74 if (dmaapServers == null || aafLogin == null || aafPassword == null) {
75 notificationResponse =
76 XACMLErrorConstants.ERROR_DATA_ISSUE
77 + "DMaaP properties are missing from the property file";
80 UUID requestUUID = null;
81 if (requestID != null && !requestID.isEmpty()) {
83 requestUUID = UUID.fromString(requestID);
84 } catch (IllegalArgumentException e) {
85 requestUUID = UUID.randomUUID();
86 logger.info("Generated Random UUID: " + requestUUID.toString(), e);
89 requestUUID = UUID.randomUUID();
90 logger.info("Generated Random UUID: " + requestUUID.toString());
93 run(notificationTopic, serviceType);
94 } catch (PolicyException e) {
95 notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e;
96 status = HttpStatus.BAD_REQUEST;
100 private static void init() {
101 if (dmaapServers == null || aafLogin == null || aafPassword == null) {
102 dmaapServers = XACMLProperties.getProperty(XacmlRestProperties.PROP_NOTIFICATION_SERVERS);
103 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
104 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
107 XACMLProperties.getProperty("CLIENT_INTERVAL", Integer.toString(interval)));
108 if (dmaapServers == null || aafLogin == null || aafPassword == null) {
110 XACMLErrorConstants.ERROR_DATA_ISSUE
111 + "DMaaP properties are missing from the property file ");
115 dmaapServers = dmaapServers.trim();
116 aafLogin = aafLogin.trim();
117 aafPassword = aafPassword.trim();
118 // Get servers to List.
119 if (dmaapServers.contains(",")) {
120 dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
122 dmaapList = new ArrayList<>();
123 dmaapList.add(dmaapServers);
129 public static void reloadProps() {
136 private void run(String notificationTopic, NotificationServiceType serviceType)
137 throws PolicyException {
139 if (notificationTopic == null) {
140 String message = "Notification Topic is null";
141 logger.error(message);
142 throw new PolicyException(message);
144 notificationTopic = notificationTopic.trim();
145 if (notificationTopic.isEmpty()) {
146 String message = "Notification Topic is not valid. ";
147 logger.error(message);
148 throw new PolicyException(message);
150 // if already exists give error.Saying already registered.
153 status = HttpStatus.OK;
154 switch (serviceType) {
156 addTopic(notificationTopic);
157 notificationResponse =
158 "Success!! Please give permissions to "
160 + " that PDP will use to publish on given topic :"
162 + "\n Start calling /sendHeartbeat API at an interval less than "
163 + Integer.toString(interval)
167 removeTopic(notificationTopic);
168 notificationResponse =
169 "Notification Topic :"
171 + " has been removed and PDP will not publish notifications to this Topic.";
174 heartBeat(notificationTopic);
175 notificationResponse = "Success!! HeartBeat registered.";
178 } catch (Exception e) {
179 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
180 status = HttpStatus.BAD_REQUEST;
181 throw new PolicyException(e);
185 // Used to register Heart beat.
186 private void heartBeat(String notificationTopic) throws PolicyException {
187 if (!topicQueue.isEmpty() && topicQueue.containsKey(notificationTopic)) {
188 topicQueue.put(notificationTopic, new Date());
190 logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
191 throw new PolicyException(
192 "Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
196 // Used to remove Topic.
197 private static void removeTopic(String notificationTopic) throws PolicyException {
198 if (topicQueue.containsKey(notificationTopic)) {
199 topicQueue.remove(notificationTopic);
200 removeTopicFromBackup(notificationTopic);
202 logger.info("Failed Removal, Topic " + notificationTopic + " is not registered.");
203 throw new PolicyException(
204 "Failed Removal, Topic " + notificationTopic + " is not registered.");
208 private static void removeTopicFromBackup(String notificationTopic) {
209 synchronized (resourceLock) {
210 try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) {
211 List<String> replaced =
213 .map(line -> (line.split("=")[0].equals(notificationTopic) ? "" : line))
214 .collect(Collectors.toList());
215 try (PrintWriter pw = new PrintWriter(BACKUPFILE, "UTF-8")) {
218 if (line.trim().isEmpty()) {
224 } catch (IOException e) {
226 XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e);
231 // Used to add Topic.
232 private void addTopic(String notificationTopic) throws PolicyException {
233 // validate if topic exists.
234 if (!topicQueue.isEmpty() && topicQueue.containsKey(notificationTopic)) {
235 topicQueue.put(notificationTopic, new Date());
236 logger.info("Topic " + notificationTopic + " is already registered.");
237 throw new PolicyException("Topic " + notificationTopic + " is already registered.");
239 topicQueue.put(notificationTopic, new Date());
240 addTopictoBackUp(notificationTopic);
243 private void addTopictoBackUp(String notificationTopic) {
244 synchronized (resourceLock) {
247 Paths.get(BACKUPFILE),
248 (notificationTopic + "=" + new Date().toString() + "\n").getBytes(),
249 StandardOpenOption.APPEND);
250 } catch (IOException e) {
251 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e);
256 // Maintains BackUp and Queue Topic.
257 private static void callThread() {
258 // Create the backup file if it not exists.
260 if (backUpthread == null) {
263 logger.info("BackUpThread not set. Starting now !");
266 backUpthread = new Thread(task);
267 backUpthread.start();
271 private static void backup() {
272 synchronized (resourceLock) {
274 File backUpFile = new File(BACKUPFILE);
275 if (!backUpFile.exists() && backUpFile.createNewFile()) {
276 logger.info(" BackUp File for topic's has been created !");
278 // File Already exists. Process file and load the Memory.
279 Stream<String> stream = Files.lines(Paths.get(BACKUPFILE));
280 Map<String, Date> data =
282 .map(line -> line.split(","))
283 .collect(Collectors.toMap(e -> e[0], e -> new Date()));
287 logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value));
288 topicQueue.putAll(data);
290 } catch (IOException e) {
291 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e);
296 private static void threadTask() {
299 TimeUnit.MILLISECONDS.sleep(interval);
300 for (Map.Entry<String, Date> map : topicQueue.entrySet()) {
301 Date currentTime = new Date();
303 timeDiff = currentTime.getTime() - map.getValue().getTime();
304 if (timeDiff > (interval + 1500)) {
305 removeTopic(map.getKey());
308 } catch (InterruptedException e) {
309 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
310 Thread.currentThread().interrupt();
311 } catch (PolicyException e) {
312 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
317 public String getResult() {
318 return notificationResponse;
321 public HttpStatus getResponseCode() {
326 * Entry point for sending Notifications from Notification Server.
328 * @param notification String JSON format of notification message which needs to be sent.
330 public static void sendNotification(String notification) {
332 for (String topic : topicQueue.keySet()) {
333 sendDmaapMessage(topic, notification);
337 private static void sendDmaapMessage(String topic, String notification) {
338 BusPublisher publisher =
339 new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword);
340 // Sending notification through DMaaP Message Router
341 logger.info("NotificationService: send DMaaP Message. ");
342 publisher.send("MyPartitionKey", notification);
343 logger.info("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
347 /** Notification service Type Enumeration */
348 public enum NotificationServiceType {