+++ /dev/null
-/*-
- * ============LICENSE_START=======================================================
- * ECOMP-PDP-REST
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.openecomp.policy.pdp.rest.api.services;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.openecomp.policy.api.PolicyException;
-import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
-import org.openecomp.policy.common.logging.flexlogger.Logger;
-import org.openecomp.policy.rest.XACMLRestProperties;
-import org.openecomp.policy.utils.BusPublisher;
-import org.openecomp.policy.xacml.api.XACMLErrorConstants;
-import org.springframework.http.HttpStatus;
-
-import com.att.research.xacml.util.XACMLProperties;
-
-public class NotificationService {
- public static final String BACKUPFILE = "topicBackup.txt";
- private static Logger logger = FlexLogger.getLogger(GetDictionaryService.class.getName());
- private static ConcurrentHashMap<String, Date> topicQueue = new ConcurrentHashMap<>();
- private static int interval = 15000;
- private static Thread backUpthread = null;
- private static Object resourceLock = new Object();
- private static List<String> dmaapList = null;
- private static String dmaapServers = null;
- private static String aafLogin = null;
- private static String aafPassword = null;
-
- private String notificationResponse = null;
- private HttpStatus status = HttpStatus.BAD_REQUEST;
-
- /**
- * NotificationService Constructor.
- *
- * @param notificationTopic Topic Name in String format.
- * @param requestID Request ID in String format.
- * @param serviceType Needs to be NotificationServiceType based enumeration value.
- */
- public NotificationService(String notificationTopic, String requestID, NotificationServiceType serviceType) {
- init();
- if(dmaapServers==null || aafLogin==null || aafPassword==null){
- notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file";
- return;
- }
- UUID requestUUID = null;
- if (requestID != null && !requestID.isEmpty()) {
- try {
- requestUUID = UUID.fromString(requestID);
- } catch (IllegalArgumentException e) {
- requestUUID = UUID.randomUUID();
- logger.info("Generated Random UUID: " + requestUUID.toString(), e);
- }
- }else{
- requestUUID = UUID.randomUUID();
- logger.info("Generated Random UUID: " + requestUUID.toString());
- }
- try{
- run(notificationTopic, serviceType);
- }catch(PolicyException e){
- notificationResponse = XACMLErrorConstants.ERROR_DATA_ISSUE + e;
- status = HttpStatus.BAD_REQUEST;
- }
- }
-
- private static void init() {
- if(dmaapServers==null || aafLogin==null || aafPassword==null){
- dmaapServers = XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_SERVERS);
- aafLogin = XACMLProperties.getProperty("DMAAP_AAF_LOGIN");
- aafPassword = XACMLProperties.getProperty("DMAAP_AAF_PASSWORD");
- interval = Integer.parseInt(XACMLProperties.getProperty(XACMLRestProperties.PROP_NOTIFICATION_DELAY, Integer.toString(interval)));
- if(dmaapServers==null || aafLogin==null || aafPassword==null){
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + "DMaaP properties are missing from the property file ");
- return;
- }
- // Cleanup Values.
- dmaapServers= dmaapServers.trim();
- aafLogin = aafLogin.trim();
- aafPassword = aafPassword.trim();
- // Get servers to List.
- if(dmaapServers.contains(",")) {
- dmaapList = new ArrayList<>(Arrays.asList(dmaapServers.split("\\s*,\\s*")));
- } else {
- dmaapList = new ArrayList<>();
- dmaapList.add(dmaapServers);
- }
- callThread();
- }
- }
-
- private void run(String notificationTopic, NotificationServiceType serviceType) throws PolicyException{
- // Check Validation
- if(notificationTopic==null){
- String message = "Notification Topic is null";
- logger.error(message);
- throw new PolicyException(message);
- }
- notificationTopic = notificationTopic.trim();
- if(notificationTopic.isEmpty()){
- String message = "Notification Topic is not valid. ";
- logger.error(message);
- throw new PolicyException(message);
- }
- // if already exists give error.Saying already registered.
- // Get Result.
- try{
- status = HttpStatus.OK;
- switch (serviceType) {
- case ADD:
- addTopic(notificationTopic);
- notificationResponse = "Success!! Please give permissions to " + aafLogin + " that PDP will use to publish on given topic :" + notificationTopic +
- "\n Start calling /sendHeartbeat API at an interval less than " + Integer.toString(interval) + "ms";
- break;
- case REMOVE:
- removeTopic(notificationTopic);
- notificationResponse = "Notification Topic :" + notificationTopic + " has been removed and PDP will not publish notifications to this Topic.";
- break;
- case HB:
- heartBeat(notificationTopic);
- notificationResponse = "Success!! HeartBeat registered.";
- break;
- }
- }catch (Exception e){
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + e);
- status = HttpStatus.BAD_REQUEST;
- throw new PolicyException(e);
- }
- }
-
- // Used to register Heart beat.
- private void heartBeat(String notificationTopic) throws PolicyException{
- if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
- topicQueue.put(notificationTopic, new Date());
- }else{
- logger.info("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
- throw new PolicyException("Failed HeartBeat, Topic " + notificationTopic + "is not registered.");
- }
- }
-
- // Used to remove Topic.
- private static void removeTopic(String notificationTopic) throws PolicyException{
- if(topicQueue.containsKey(notificationTopic)){
- topicQueue.remove(notificationTopic);
- removeTopicFromBackup(notificationTopic);
- }else{
- logger.info("Failed Removal, Topic " + notificationTopic + " is not registered.");
- throw new PolicyException("Failed Removal, Topic " + notificationTopic + " is not registered.");
- }
- }
-
- private static void removeTopicFromBackup(String notificationTopic) {
- synchronized (resourceLock) {
- try (Stream<String> lines = Files.lines(Paths.get(BACKUPFILE))) {
- List<String> replaced = lines.map(line-> (line.split("=")[0].equals(notificationTopic)?"":line)).collect(Collectors.toList());
- try (PrintWriter pw = new PrintWriter( BACKUPFILE, "UTF-8")) {
- replaced.forEach(line-> {
- if(line.trim().isEmpty()){
- return;
- }
- pw.println(line);
- });
- }
- lines.close();
- } catch (IOException e) {
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not remove/recreate the backup. ", e);
- }
- }
- }
-
- // Used to add Topic.
- private void addTopic(String notificationTopic) throws PolicyException{
- // validate if topic exists.
- if(!topicQueue.isEmpty()&& topicQueue.containsKey(notificationTopic)){
- topicQueue.put(notificationTopic, new Date());
- logger.info("Topic " + notificationTopic + " is already registered.");
- throw new PolicyException("Topic " + notificationTopic + " is already registered.");
- }
- topicQueue.put(notificationTopic, new Date());
- addTopictoBackUp(notificationTopic);
- }
-
- private void addTopictoBackUp(String notificationTopic) {
- synchronized (resourceLock) {
- try {
- Files.write(Paths.get(BACKUPFILE),( notificationTopic+"="+new Date().toString()+"\n").getBytes() , StandardOpenOption.APPEND);
- } catch (IOException e) {
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not add to the backup. ", e);
- }
- }
- }
-
- // Maintains BackUp and Queue Topic.
- private static void callThread() {
- // Create the backup file if it not exists.
- backup();
- if(backUpthread==null){
- Runnable task = () -> {
- logger.info("BackUpThread not set. Starting now !");
- threadTask();
- };
- backUpthread = new Thread(task);
- backUpthread.start();
- }
- }
-
- private static void backup(){
- synchronized (resourceLock) {
- try{
- File backUpFile = new File(BACKUPFILE);
- if(!backUpFile.exists() && backUpFile.createNewFile()){
- logger.info(" BackUp File for topic's has been created !");
- }else{
- // File Already exists. Process file and load the Memory.
- Stream<String> stream = Files.lines(Paths.get(BACKUPFILE));
- Map<String,Date> data = stream.map(line -> line.split(",")).collect(Collectors.toMap(e->e[0],e-> new Date()));
- stream.close();
- data.forEach((key, value)->logger.debug("Topic retrieved from backUp : " + key + " with Time : " + value));
- topicQueue.putAll(data);
- }
- }catch(IOException e){
- logger.error(XACMLErrorConstants.ERROR_DATA_ISSUE + " Could not process the backup. ", e);
- }
- }
- }
-
- private static void threadTask() {
- while(true){
- try {
- TimeUnit.MILLISECONDS.sleep(interval);
- for(Map.Entry<String, Date> map : topicQueue.entrySet()){
- Date currentTime = new Date();
- long timeDiff = 0;
- timeDiff = currentTime.getTime()-map.getValue().getTime();
- if(timeDiff < (interval+1500)){
- removeTopic(map.getKey());
- }
- }
- } catch (InterruptedException | PolicyException e) {
- logger.error(XACMLErrorConstants.ERROR_PROCESS_FLOW + "Error during thread execution ", e);
- }
- }
- }
-
- public String getResult() {
- return notificationResponse;
- }
-
- public HttpStatus getResponseCode() {
- return status;
- }
-
- /**
- * Entry point for sending Notifications from Notification Server.
- * @param notification String JSON format of notification message which needs to be sent.
- */
- public static void sendNotification(String notification) {
- init();
- for (String topic: topicQueue.keySet()){
- sendDmaapMessage(topic, notification);
- }
- }
-
- private static void sendDmaapMessage(String topic, String notification) {
- BusPublisher publisher = new BusPublisher.DmaapPublisherWrapper(dmaapList,
- topic,
- aafLogin,
- aafPassword);
- // Sending notification through DMaaP Message Router
- publisher.send( "MyPartitionKey", notification);
- logger.debug("Message Published on DMaaP :" + dmaapList.get(0) + "for Topic: " + topic);
- publisher.close();
- }
-
- /**
- * Notification service Type Enumeration
- */
- public enum NotificationServiceType{
- ADD,
- REMOVE,
- HB
- }
-
-}
\ No newline at end of file