2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.policy.pdp.rest.api.services;
23 import java.io.IOException;
24 import java.io.PrintWriter;
25 import java.nio.file.Files;
26 import java.nio.file.Paths;
27 import java.nio.file.StandardOpenOption;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Date;
31 import java.util.List;
33 import java.util.UUID;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.TimeUnit;
36 import java.util.stream.Collectors;
37 import java.util.stream.Stream;
39 import org.openecomp.policy.api.PolicyException;
40 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
41 import org.openecomp.policy.common.logging.flexlogger.Logger;
42 import org.openecomp.policy.rest.XACMLRestProperties;
43 import org.openecomp.policy.utils.BusPublisher;
44 import org.openecomp.policy.xacml.api.XACMLErrorConstants;
45 import org.springframework.http.HttpStatus;
47 import com.att.research.xacml.util.XACMLProperties;
49 public class NotificationService {
50 public static final String BACKUPFILE = "topicBackup.txt";
51 private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName());
52 private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>();
53 private static int interval = 15000;
54 private static Thread backUpthread = null;
55 private static Object resourceLock = new Object();
56 private static List<String> dmaapList = null;
57 private static String dmaapServers = null;
58 private static String aafLogin = null;
59 private static String aafPassword = null;
61 private String notificationResponse = null;
62 private HttpStatus status = HttpStatus.BAD_REQUEST;
65 * NotificationService Constructor.
67 * @param notificationTopic Topic Name in String format.
68 * @param requestID Request ID in String format.
69 * @param serviceType Needs to be NotificationServiceType based enumeration value.
71 public NotificationService(String notificationTopic, String requestID, NotificationServiceType serviceType) {
73 if(dmaapServers==null || aafLogin==null || aafPassword==null){
74 notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file";
77 UUID requestUUID = null;
78 if (requestID != null && !requestID.isEmpty()) {
80 requestUUID = UUID.fromString(requestID);
81 } catch (IllegalArgumentException e) {
82 requestUUID = UUID.randomUUID();
83 logger.info("Generated Random UUID: " + requestUUID.toString(), e);
86 requestUUID = UUID.randomUUID();
87 logger.info("Generated Random UUID: " + requestUUID.toString());
90 run(notificationTopic, serviceType);
91 }catch(PolicyException e){
92 notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e;
93 status = HttpStatus.BAD_REQUEST;
97 private static void init() {
98 if(dmaapServers==null || aafLogin==null || aafPassword==null){
99 dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
100 aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
101 aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
102 interval = Integer.parseInt(XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_DELAY, Integer.toString(interval)));
103 if(dmaapServers==null || aafLogin==null || aafPassword==null){
104 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
108 dmaapServers= dmaapServers.trim();
109 aafLogin = aafLogin.trim();
110 aafPassword = aafPassword.trim();
111 // Get servers to List.
112 if(dmaapServers.contains(",")) {
113 dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
115 dmaapList = new ArrayList<>();
116 dmaapList.add(dmaapServers);
122 private void run(String notificationTopic, NotificationServiceType serviceType) throws PolicyException{
124 if(notificationTopic==null){
125 String message = "Notification Topic is null";
126 logger.error(message);
127 throw new PolicyException(message);
129 notificationTopic = notificationTopic.trim();
130 if(notificationTopic.isEmpty()){
131 String message = "Notification Topic is not valid. ";
132 logger.error(message);
133 throw new PolicyException(message);
135 // if already exists give error.Saying already registered.
138 status = HttpStatus.OK;
139 switch (serviceType) {
141 addTopic(notificationTopic);
142 notificationResponse = "Success!! Please give permissions to " + aafLogin + " that PDP will use to publish on given topic :" + notificationTopic +
143 "\n Start calling /sendHeartbeat API at an interval less than " + Integer.toString(interval) + "ms";
146 removeTopic(notificationTopic);
147 notificationResponse = "Notification Topic :" + notificationTopic + " has been removed and PDP will not publish notifications to this Topic.";
150 heartBeat(notificationTopic);
151 notificationResponse = "Success!! HeartBeat registered.";
154 }catch (Exception e){
155 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
156 status = HttpStatus.BAD_REQUEST;
157 throw new PolicyException(e);
161 // Used to register Heart beat.
162 private void heartBeat(String notificationTopic) throws PolicyException{
163 if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
164 topicQueue.put(notificationTopic, new Date());
166 logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
167 throw new PolicyException("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
171 // Used to remove Topic.
172 private static void removeTopic(String notificationTopic) throws PolicyException{
173 if(topicQueue.containsKey(notificationTopic)){
174 topicQueue.remove(notificationTopic);
175 removeTopicFromBackup(notificationTopic);
177 logger.info("Failed Removal, Topic " + notificationTopic + " is not registered.");
178 throw new PolicyException("Failed Removal, Topic " + notificationTopic + " is not registered.");
182 private static void removeTopicFromBackup(String notificationTopic) {
183 synchronized (resourceLock) {
184 try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) {
185 List<String> replaced = lines.map(line-> (line.split("=")[0].equals(notificationTopic)?"":line)).collect(Collectors.toList());
186 try (PrintWriter pw = new PrintWriter( BACKUPFILE, "UTF-8")) {
187 replaced.forEach(line-> {
188 if(line.trim().isEmpty()){
195 } catch (IOException e) {
196 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e);
201 // Used to add Topic.
202 private void addTopic(String notificationTopic) throws PolicyException{
203 // validate if topic exists.
204 if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
205 topicQueue.put(notificationTopic, new Date());
206 logger.info("Topic " + notificationTopic + " is already registered.");
207 throw new PolicyException("Topic " + notificationTopic + " is already registered.");
209 topicQueue.put(notificationTopic, new Date());
210 addTopictoBackUp(notificationTopic);
213 private void addTopictoBackUp(String notificationTopic) {
214 synchronized (resourceLock) {
216 Files.write(Paths.get(BACKUPFILE),( notificationTopic+"="+new Date().toString()+"\n").getBytes() , StandardOpenOption.APPEND);
217 } catch (IOException e) {
218 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e);
223 // Maintains BackUp and Queue Topic.
224 private static void callThread() {
225 // Create the backup file if it not exists.
227 if(backUpthread==null){
228 Runnable task = () -> {
229 logger.info("BackUpThread not set. Starting now !");
232 backUpthread = new Thread(task);
233 backUpthread.start();
237 private static void backup(){
238 synchronized (resourceLock) {
240 File backUpFile = new File(BACKUPFILE);
241 if(!backUpFile.exists() && backUpFile.createNewFile()){
242 logger.info(" BackUp File for topic's has been created !");
244 // File Already exists. Process file and load the Memory.
245 Stream<String> stream = Files.lines(Paths.get(BACKUPFILE));
246 Map<String,Date> data = stream.map(line -> line.split(",")).collect(Collectors.toMap(e->e[0],e-> new Date()));
248 data.forEach((key, value)->logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value));
249 topicQueue.putAll(data);
251 }catch(IOException e){
252 logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e);
257 private static void threadTask() {
260 TimeUnit.MILLISECONDS.sleep(interval);
261 for(Map.Entry<String, Date> map : topicQueue.entrySet()){
262 Date currentTime = new Date();
264 timeDiff = currentTime.getTime()-map.getValue().getTime();
265 if(timeDiff < (interval+1500)){
266 removeTopic(map.getKey());
269 } catch (InterruptedException | PolicyException e) {
270 logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
275 public String getResult() {
276 return notificationResponse;
279 public HttpStatus getResponseCode() {
284 * Entry point for sending Notifications from Notification Server.
285 * @param notification String JSON format of notification message which needs to be sent.
287 public static void sendNotification(String notification) {
289 for (String topic: topicQueue.keySet()){
290 sendDmaapMessage(topic, notification);
294 private static void sendDmaapMessage(String topic, String notification) {
295 BusPublisher publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,
299 // Sending notification through DMaaP Message Router
300 publisher.send( "MyPartitionKey", notification);
301 logger.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
306 * Notification service Type Enumeration
308 public enum NotificationServiceType{