/*- * ============LICENSE_START======================================================= * ONAP-PDP-REST * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.pdp.rest.api.services; import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import org.onap.policy.api.PolicyException; import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; import org.onap.policy.rest.XACMLRestProperties; import org.onap.policy.utils.BusPublisher; import org.onap.policy.xacml.api.XACMLErrorConstants; import org.springframework.http.HttpStatus; import com.att.research.xacml.util.XACMLProperties; public class NotificationService { public static final String BACKUPFILE = "topicBackup.txt"; private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName()); private static ConcurrentHashMap topicQueue = new ConcurrentHashMap<>(); private static int interval = 15000; private static Thread backUpthread = null; private static Object resourceLock = new Object(); private static List dmaapList = null; private static String dmaapServers = null; private static String aafLogin = null; private static String aafPassword = null; private String notificationResponse = null; private HttpStatus status = HttpStatus.BAD_REQUEST; /** * NotificationService Constructor. * * @param notificationTopic Topic Name in String format. * @param requestID Request ID in String format. * @param serviceType Needs to be NotificationServiceType based enumeration value. */ public NotificationService(String notificationTopic, String requestID, NotificationServiceType serviceType) { init(); if(dmaapServers==null || aafLogin==null || aafPassword==null){ notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file"; return; } UUID requestUUID = null; if (requestID != null && !requestID.isEmpty()) { try { requestUUID = UUID.fromString(requestID); } catch (IllegalArgumentException e) { requestUUID = UUID.randomUUID(); logger.info("Generated Random UUID: " + requestUUID.toString(), e); } }else{ requestUUID = UUID.randomUUID(); logger.info("Generated Random UUID: " + requestUUID.toString()); } try{ run(notificationTopic, serviceType); }catch(PolicyException e){ notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e; status = HttpStatus.BAD_REQUEST; } } private static void init() { if(dmaapServers==null || aafLogin==null || aafPassword==null){ dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS); aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN"); aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD"); interval = Integer.parseInt(XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_DELAY, Integer.toString(interval))); if(dmaapServers==null || aafLogin==null || aafPassword==null){ logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file "); return; } // Cleanup Values. dmaapServers= dmaapServers.trim(); aafLogin = aafLogin.trim(); aafPassword = aafPassword.trim(); // Get servers to List. if(dmaapServers.contains(",")) { dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*"))); } else { dmaapList = new ArrayList<>(); dmaapList.add(dmaapServers); } callThread(); } } public static void reloadProps(){ dmaapServers = null; aafLogin = null; aafPassword = null; backUpthread = null; } private void run(String notificationTopic, NotificationServiceType serviceType) throws PolicyException{ // Check Validation if(notificationTopic==null){ String message = "Notification Topic is null"; logger.error(message); throw new PolicyException(message); } notificationTopic = notificationTopic.trim(); if(notificationTopic.isEmpty()){ String message = "Notification Topic is not valid. "; logger.error(message); throw new PolicyException(message); } // if already exists give error.Saying already registered. // Get Result. try{ status = HttpStatus.OK; switch (serviceType) { case ADD: addTopic(notificationTopic); notificationResponse = "Success!! Please give permissions to " + aafLogin + " that PDP will use to publish on given topic :" + notificationTopic + "\n Start calling /sendHeartbeat API at an interval less than " + Integer.toString(interval) + "ms"; break; case REMOVE: removeTopic(notificationTopic); notificationResponse = "Notification Topic :" + notificationTopic + " has been removed and PDP will not publish notifications to this Topic."; break; case HB: heartBeat(notificationTopic); notificationResponse = "Success!! HeartBeat registered."; break; } }catch (Exception e){ logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e); status = HttpStatus.BAD_REQUEST; throw new PolicyException(e); } } // Used to register Heart beat. private void heartBeat(String notificationTopic) throws PolicyException{ if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){ topicQueue.put(notificationTopic, new Date()); }else{ logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered."); throw new PolicyException("Failed HeartBeat, Topic " + notificationTopic + "is not registered."); } } // Used to remove Topic. private static void removeTopic(String notificationTopic) throws PolicyException{ if(topicQueue.containsKey(notificationTopic)){ topicQueue.remove(notificationTopic); removeTopicFromBackup(notificationTopic); }else{ logger.info("Failed Removal, Topic " + notificationTopic + " is not registered."); throw new PolicyException("Failed Removal, Topic " + notificationTopic + " is not registered."); } } private static void removeTopicFromBackup(String notificationTopic) { synchronized (resourceLock) { try (Stream lines = Files.lines(Paths.get(BACKUPFILE))) { List replaced = lines.map(line-> (line.split("=")[0].equals(notificationTopic)?"":line)).collect(Collectors.toList()); try (PrintWriter pw = new PrintWriter( BACKUPFILE, "UTF-8")) { replaced.forEach(line-> { if(line.trim().isEmpty()){ return; } pw.println(line); }); } lines.close(); } catch (IOException e) { logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e); } } } // Used to add Topic. private void addTopic(String notificationTopic) throws PolicyException{ // validate if topic exists. if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){ topicQueue.put(notificationTopic, new Date()); logger.info("Topic " + notificationTopic + " is already registered."); throw new PolicyException("Topic " + notificationTopic + " is already registered."); } topicQueue.put(notificationTopic, new Date()); addTopictoBackUp(notificationTopic); } private void addTopictoBackUp(String notificationTopic) { synchronized (resourceLock) { try { Files.write(Paths.get(BACKUPFILE),( notificationTopic+"="+new Date().toString()+"\n").getBytes() , StandardOpenOption.APPEND); } catch (IOException e) { logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e); } } } // Maintains BackUp and Queue Topic. private static void callThread() { // Create the backup file if it not exists. backup(); if(backUpthread==null){ Runnable task = () -> { logger.info("BackUpThread not set. Starting now !"); threadTask(); }; backUpthread = new Thread(task); backUpthread.start(); } } private static void backup(){ synchronized (resourceLock) { try{ File backUpFile = new File(BACKUPFILE); if(!backUpFile.exists() && backUpFile.createNewFile()){ logger.info(" BackUp File for topic's has been created !"); }else{ // File Already exists. Process file and load the Memory. Stream stream = Files.lines(Paths.get(BACKUPFILE)); Map data = stream.map(line -> line.split(",")).collect(Collectors.toMap(e->e[0],e-> new Date())); stream.close(); data.forEach((key, value)->logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value)); topicQueue.putAll(data); } }catch(IOException e){ logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e); } } } private static void threadTask() { while(true){ try { TimeUnit.MILLISECONDS.sleep(interval); for(Map.Entry map : topicQueue.entrySet()){ Date currentTime = new Date(); long timeDiff = 0; timeDiff = currentTime.getTime()-map.getValue().getTime(); if(timeDiff < (interval+1500)){ removeTopic(map.getKey()); } } } catch (InterruptedException | PolicyException e) { logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e); } } } public String getResult() { return notificationResponse; } public HttpStatus getResponseCode() { return status; } /** * Entry point for sending Notifications from Notification Server. * @param notification String JSON format of notification message which needs to be sent. */ public static void sendNotification(String notification) { init(); for (String topic: topicQueue.keySet()){ sendDmaapMessage(topic, notification); } } private static void sendDmaapMessage(String topic, String notification) { BusPublisher publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList, topic, aafLogin, aafPassword); // Sending notification through DMaaP Message Router publisher.send( "MyPartitionKey", notification); logger.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic); publisher.close(); } /** * Notification service Type Enumeration */ public enum NotificationServiceType{ ADD, REMOVE, HB } }