--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ ONAP : ccsdk features
+ ~ ================================================================================
+ ~ Update Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=======================================================
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.ccsdk.features</groupId>
+ <artifactId>ccsdk-features</artifactId>
+ <version>1.2.0-SNAPSHOT</version>
+ <relativePath>../../</relativePath>
+ </parent>
+
+ <groupId>org.onap.ccsdk.features.lib</groupId>
+ <artifactId>network-prioritization</artifactId>
+
+ <name>ccsdk-features :: lib :: ${project.artifactId}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <!-- Use SimpleLogger as the slf4j implementation in test -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm;
+
+public class NpmConstants {
+ public static final String PROPERTY_ENV_TYPE = "Env_Type";
+ public static final String PROPERTY_ENV_PROD = "field";
+ public static final String PROPERTY_ENV_SOLO = "solo";
+
+ public static final String MDC_REQUEST_ID = "RequestID";
+
+ public static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
+ public static final String DEFAULT_SDNC_CONFIG_DIR = "/opt/sdnc/data/properties";
+ public static final String NPM_CONFIG_PROPERTIES_FILE_NAME = "npm-config.properties";
+
+ private NpmConstants() {}
+}
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm;\r
+\r
+/**\r
+ * The type Npm exception.\r
+ * <p>\r
+ * It will be thrown in following cases:\r
+ * - Invalid Npm Transaction received\r
+ * - Couldn't queue RECEIVED Npm Transaction\r
+ * - Npm Transaction is already present in queue\r
+ * - Fails to invoke Service callback API\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmException extends Exception {\r
+\r
+ /**\r
+ * This is a NpmException constructor\r
+ *\r
+ * @param message the message\r
+ */\r
+ public NpmException(String message) {\r
+ super(message);\r
+ }\r
+\r
+ /**\r
+ * This is a NpmException constructor\r
+ *\r
+ * @param message the message\r
+ * @param cause the cause\r
+ */\r
+ public NpmException(String message, Throwable cause) {\r
+ super(message, cause);\r
+ }\r
+\r
+}\r
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+\r
+/**\r
+ * The interface NpmServiceCallbackApi.\r
+ * This must be implemented by all Services to receive notification back\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public interface NpmServiceCallbackApi {\r
+\r
+ /**\r
+ * Process to be implemented by all services on-boarding to Npm.\r
+ * This API will be invoked by Npm to notify Service to process a transaction\r
+ *\r
+ * @param npmTransaction the NpmTransaction\r
+ *\r
+ * @throws NpmException the npm exception\r
+ */\r
+ void process(NpmTransaction npmTransaction) throws NpmException;\r
+\r
+}\r
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.NavigableSet;\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+\r
+/**\r
+ * The interface Npm Service Manager being used internally by Npm\r
+ * <p>\r
+ * Placeholder for Priority Queues holding Npm Transactions:\r
+ * Create: During Npm Startup if un-processed Transaction are available in NPM_TRANSACTION Table\r
+ * Manage: If a tx is expired sitting in queue, invoke service callback api\r
+ * Remove: If gets notified by a service once done with processing Transaction\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public interface NpmServiceManager {\r
+\r
+ /**\r
+ * Add Transaction to queue.\r
+ *\r
+ * @param npmTransaction the NpmTransaction instance which contain serviceRequest with header information\r
+ *\r
+ * @throws NpmException the NpmException if Npm Transaction:\r
+ * if missing required header information\r
+ * couldn't be queued if priority queue is already full to it's max capacity\r
+ */\r
+ void addTransactionToQueue(NpmTransaction npmTransaction) throws NpmException;\r
+\r
+ /**\r
+ * Remove Transaction from queue and update connection counter\r
+ *\r
+ * @param npmTransaction the NpmTransaction instance which contain serviceRequest with header information\r
+ * @param updateConnectionCounter the update connection counter only if it's true\r
+ */\r
+ void removeTransactionFromQueue(NpmTransaction npmTransaction, boolean updateConnectionCounter);\r
+\r
+ /**\r
+ * Retrieve transaction from queue list.\r
+ *\r
+ * @param sbEndpoint the sb endpoint\r
+ * @param sbType the sb type\r
+ *\r
+ * @return the list of NpmTransaction\r
+ */\r
+ List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType);\r
+\r
+ /**\r
+ * Retrieve all priority queues map.\r
+ *\r
+ * @return the map\r
+ * <pre>\r
+ * npmPriorityQueues : Map [String:sbEndpoint##sbType, Map:sbPriorityQueues]\r
+ * sbPriorityQueues : Map [int:priority, TreeSet:priorityQueue]\r
+ * priorityQueue : NavigableSet [Contains the NpmTransaction Object]\r
+ * </pre>\r
+ */\r
+ Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues();\r
+\r
+ /**\r
+ * Load npm config boolean.\r
+ * <pre>\r
+ * Default properties:\r
+ * <b>Priority_List=0,1,2</b> Total possible priorities (lowest index is the highest priority)\r
+ * <b>Default_Priority=2</b> Default Priority value to be set if missing in request\r
+ * <b>EMS_ERICSSON=2</b> Maximum number of parallel connection that ERICSSON manufactured EMS can support\r
+ * <b>EMS_NOKIA=2</b> Maximum number of parallel connection that NOKIA manufactured EMS can support\r
+ * <b>queue_capacity_0=10</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 0\r
+ * <b>queue_capacity_1=7</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 1\r
+ * <b>queue_capacity_2=5</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 2\r
+ * <b>qsp_limit_0=5</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+ * <b>qsp_limit_1=3</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+ * <b>qsp_limit_2=2</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+ * </pre>\r
+ *\r
+ * @param configFilePath the Config File Name\r
+ *\r
+ * @throws NpmException the npm exception\r
+ */\r
+ void loadNpmConfig(String configFilePath) throws NpmException;\r
+\r
+ /**\r
+ * Gets npm config.\r
+ *\r
+ * @param referenceKey the reference key\r
+ *\r
+ * @return the npmConfig Value\r
+ */\r
+ String getNpmConfig(String referenceKey);\r
+\r
+ /**\r
+ * Register service : must be called to register with Npm\r
+ *\r
+ * @param serviceKey the unique serviceKey specific to service reference\r
+ * @param npmServiceCallbackApi the instance of service class implementing NpmServiceCallbackApi\r
+ */\r
+ void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi);\r
+\r
+}\r
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+import org.onap.ccsdk.features.lib.npm.NpmConstants;\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;\r
+import org.apache.commons.lang3.StringUtils;\r
+import org.apache.commons.lang3.math.NumberUtils;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.io.File;\r
+import java.io.FileInputStream;\r
+import java.io.IOException;\r
+import java.util.Comparator;\r
+import java.util.Iterator;\r
+import java.util.Map;\r
+import java.util.NavigableSet;\r
+import java.util.Properties;\r
+import java.util.TreeMap;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+import java.util.concurrent.ConcurrentSkipListSet;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import org.slf4j.MDC;\r
+\r
+import static org.onap.ccsdk.features.lib.npm.NpmConstants.MDC_REQUEST_ID;\r
+\r
+/**\r
+ * The type Npm Service Manager.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmServiceManagerImpl implements NpmServiceManager {\r
+ private static final Logger logger = LoggerFactory.getLogger(NpmServiceManagerImpl.class);\r
+ /**\r
+ * Npm `Priority Queues`.\r
+ * <p>\r
+ * npmPriorityQueues : Map <String:sbEndpoint##sbType, Map:sbPriorityQueues>\r
+ * sbPriorityQueues : Map <int:priority, TreeSet:priorityQueue>\r
+ * priorityQueue : NavigableSet [Contains the NpmTransaction Object]\r
+ * </p>\r
+ *\r
+ * <p>\r
+ * Npm will maintain multiple Priority queues per sb_endpoint (an EMS is an example of sb_endpoint)\r
+ * Priority Queues will be maintained per priority, having Npm Transactions, sorted based on timestamp, to accomplish a FIFO queue.\r
+ * </p>\r
+ *\r
+ * <p>\r
+ * Why NavigableSet or ConcurrentSkipListSet is being used:\r
+ * Need to maintain priority queue Sorted based om Transaction timestamp.!!\r
+ * Hence using ConcurrentSkipListSet -> which implements NavigableSet -> which extends SortedSet\r
+ * </p>\r
+ *\r
+ * <p>\r
+ * The ConcurrentSkipListSet class allows safe execution of\r
+ * Insertion, removal, and access operations on set concurrently by multiple threads.\r
+ * </p>\r
+ *\r
+ * <p>\r
+ * It should be preferred over other implementations of the Set interface\r
+ * when concurrent modification of set by multiple threads is required.\r
+ * </p>\r
+ */\r
+ private final Map<String, Map<Integer, NavigableSet<NpmTransaction>>> npmPriorityQueues = new ConcurrentHashMap<>();\r
+ private final Map<String, Integer> connectionCounter = new ConcurrentHashMap<>();\r
+ private final Map<String, NpmServiceCallbackApi> serviceRegistry = new ConcurrentHashMap<>();\r
+ private final Map<String, Integer> priorityExecState = new ConcurrentHashMap<>();\r
+ private final Map<String, Integer> qspExecState = new ConcurrentHashMap<>();\r
+\r
+ private final Properties npmConfigurations = new Properties();\r
+ private final ExecutorService executorService = Executors.newCachedThreadPool();\r
+\r
+ private boolean isProcessIngNpmPriorityQueues = false;\r
+\r
+ public NpmServiceManagerImpl() throws NpmException {\r
+ loadProperties();\r
+ Runnable processNpmPriorityQueuesTask = () -> {\r
+ try {\r
+ if (!isProcessIngNpmPriorityQueues) {\r
+ isProcessIngNpmPriorityQueues = true;\r
+ // Cleaning up MDC to make sure logging doesn't have old requestID being used for further processing\r
+ MDC.clear();\r
+ processNpmPriorityQueues();\r
+ isProcessIngNpmPriorityQueues = false;\r
+ }\r
+ } catch (StackOverflowError | Exception e) {\r
+ //Setting isProcessIngNpmPriorityQueues to false because next time when periodic task runs it should re-run processNpmPriorityQueues\r
+ isProcessIngNpmPriorityQueues = false;\r
+ // Catching both as there may not be any npm transaction at time of boot or eventual\r
+ logger.warn("----------- Task to processNpmPriorityQueues failed ----------- \nErrorMessage:({})", e.getMessage(), e);\r
+ }\r
+ };\r
+ Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(processNpmPriorityQueuesTask, 30, 5, TimeUnit.SECONDS);\r
+ }\r
+\r
+ @Override\r
+ public String getNpmConfig(String referenceKey) {\r
+ return npmConfigurations.getProperty(referenceKey);\r
+ }\r
+\r
+ @Override\r
+ public void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi) {\r
+ logger.trace("------------- Registering NpmServiceCallbackApi with serviceKey:({}) -------------", serviceKey);\r
+ serviceRegistry.put(serviceKey, npmServiceCallbackApi);\r
+ }\r
+\r
+ @Override\r
+ public void addTransactionToQueue(final NpmTransaction npmTransaction) throws NpmException {\r
+ logger.trace("------------- Inside NPM SM addTransactionToQueue -------------");\r
+ logger.trace("Queuing Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+\r
+ //Sorted Queue based on timestamp, if timestamp same for multiple Transaction it sorts those with NpmTransactionId.\r
+ //Using computeIfAbsent to make sure it creates priority_queue for particular sb_endpoint if not already present\r
+ final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(npmTransaction.getSbEndpoint(), npmTransaction.getSbType());\r
+\r
+ logger.trace("Locating npmPriorityQueues with key sbEndpoint##sbType :: ({})", npmPriorityQueueKey);\r
+ NavigableSet<NpmTransaction> priorityQueue = npmPriorityQueues.computeIfAbsent(npmPriorityQueueKey,\r
+ sbPriorityQueues -> new TreeMap<>()).computeIfAbsent(npmTransaction.getPriority(),\r
+ priorityQueueSet -> new ConcurrentSkipListSet<>\r
+ (Comparator.comparing(NpmTransaction :: getTimestamp).thenComparing(NpmTransaction :: getNpmTransactionId)));\r
+ logger.trace("Current queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
+\r
+ if (priorityQueue.contains(npmTransaction)) {\r
+ logger.trace("Npm Transaction with npmTransactionId:({}) is already present in queued, returning without altering the queue...",\r
+ npmTransaction.getNpmTransactionId());\r
+ return;\r
+ }\r
+\r
+ // Compare if queue_capacity_$priority available from Configurations, else by default it will be comparing against default value = 10\r
+ final int queueCapacity = NumberUtils.toInt(getNpmConfig("queue_capacity_" + npmTransaction.getPriority()), 10);\r
+ if (priorityQueue.size() >= queueCapacity) {\r
+ npmTransaction.setStatus(NpmStatusEnum.OUT_OF_CAPACITY);\r
+ String message = String.format("Queue %s Error. Npm Queue for sb_endpoint:(%s) with Priority:(%s) is maxed out to it's capacity limit:(%s)",\r
+ NpmStatusEnum.OUT_OF_CAPACITY, npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), queueCapacity);\r
+ logger.trace("Returning Error message:({})", message);\r
+ throw new NpmException(message);\r
+ }\r
+ npmTransaction.setStatus(NpmStatusEnum.QUEUED);\r
+ priorityQueue.add(npmTransaction);\r
+ logger.trace("Successfully queued Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+ logger.trace("Updated queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
+ }\r
+\r
+ @Override\r
+ public List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType) {\r
+ logger.trace("------------- Inside NPM SM retrieveTransactionFromQueue -------------");\r
+ logger.trace("Retrieving all Npm Transactions for sbEndpoint:sbType ({}:{}) from priorityQueues", sbEndpoint, sbType);\r
+\r
+ final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(sbEndpoint, sbType);\r
+ final List<NpmTransaction> npmTransactionList = new ArrayList<>();\r
+\r
+ //Using computeIfPresent as npmTransactionQueueMap doesn't need any alteration if Npm Transaction is not found.\r
+ npmPriorityQueues.computeIfPresent(npmPriorityQueueKey, (sb, sbPriorityQueues) -> {\r
+ sbPriorityQueues.forEach((priority, npmTransactionNavigableSet) -> {\r
+ npmTransactionList.addAll(npmTransactionNavigableSet);\r
+ });\r
+ return sbPriorityQueues;\r
+ });\r
+\r
+ logger.trace("Retrieved total {} Npm Transactions from priorityQueues", npmTransactionList.size());\r
+ return npmTransactionList;\r
+ }\r
+\r
+ @Override\r
+ public Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues() {\r
+ // TODO: Check if it should return the actual queue map instance or a clone !!\r
+ return npmPriorityQueues;\r
+ }\r
+\r
+ @Override\r
+ public void removeTransactionFromQueue(NpmTransaction npmTransaction, boolean updateConnectionCounter) {\r
+ logger.trace("------------- Inside NPM SM removeTransactionFromQueue -------------");\r
+ logger.trace("Removing Npm Transaction from priority queue with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+ if (updateConnectionCounter) {\r
+ // Updating connection counter so that next transaction can be processed from queue for same sbEndpoint\r
+ updateConnectionCounter(npmTransaction.getSbEndpoint(), Math.negateExact(npmTransaction.getConnectionCount()));\r
+ }\r
+ final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(npmTransaction.getSbEndpoint(), npmTransaction.getSbType());\r
+\r
+ //Using computeIfPresent as npmTransactionQueueMap doesn't need any alteration if Npm Transaction is not found.\r
+ npmPriorityQueues.computeIfPresent(npmPriorityQueueKey, (sb, sbPriorityQueues) -> {\r
+ NavigableSet<NpmTransaction> priorityQueue = sbPriorityQueues.get(npmTransaction.getPriority());\r
+ if (priorityQueue != null) {\r
+ logger.trace("Current queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
+\r
+ priorityQueue.remove(npmTransaction);\r
+ logger.trace("Successfully removed Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+ logger.trace("Updated queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
+\r
+ // Cleaning up Priority Queue if empty\r
+ if (priorityQueue.isEmpty()) {\r
+ logger.trace("As priorityQueue for sbEndpoint:({}) with priority:({}) is empty, removing the priority queue.",\r
+ npmTransaction.getSbEndpoint(), npmTransaction.getPriority());\r
+ sbPriorityQueues.remove(npmTransaction.getPriority());\r
+ }\r
+ }\r
+ return sbPriorityQueues;\r
+ });\r
+ }\r
+\r
+ private void processExpiredNpmTransaction() {\r
+ logger.trace("------------- Inside NPM SM processExpiredNpmTransaction -------------");\r
+ if (npmPriorityQueues.isEmpty()) {\r
+ logger.trace("------------- No Priority Queue is present, nothing to cleanup, hence returning -------------");\r
+ // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
+ return;\r
+ }\r
+ // Converting to entrySet so that it can parallel stream :)\r
+ npmPriorityQueues.entrySet().parallelStream().forEach(sbEndpointMapEntry -> {\r
+ sbEndpointMapEntry.getValue().entrySet().parallelStream().forEach(prioritySetEntry -> {\r
+\r
+ // TODO: Need to test, Periodic Printing of Current Queue Length is expected by Rajesh and Team\r
+ logger.trace("Current queue length with key sbEndpoint##sbType :: ({}) with priority:({}) is:({})",\r
+ sbEndpointMapEntry.getKey(), prioritySetEntry.getKey(), prioritySetEntry.getValue().size());\r
+\r
+ prioritySetEntry.getValue().parallelStream().forEach(npmTransaction -> {\r
+ // Checking if npmTransaction is already expired\r
+ if (NpmUtils.isExpired(npmTransaction)) {\r
+ logger.trace("Npm Transaction with npmTransactionId:({}) is Expired and will be removed from priority queue, as timeToLive has passed.",\r
+ npmTransaction.getNpmTransactionId());\r
+ npmTransaction.setStatus(NpmStatusEnum.EXPIRED);\r
+ npmTransaction.setMessage("Npm Transaction is Expired and will be removed from priority queue, as timeToLive has passed.");\r
+ Runnable notifyServiceTask = () -> invokeServiceCallbackApi(npmTransaction, false);\r
+ executorService.execute(notifyServiceTask);\r
+ removeTransactionFromQueue(npmTransaction, false);\r
+ }\r
+ });\r
+ });\r
+ });\r
+ logger.trace("------------- Done with checking all priority queues for any expired Npm Transaction -------------");\r
+ }\r
+\r
+ private void processNpmPriorityQueues() {\r
+ logger.trace("------------- Inside NPM SM processNpmPriorityQueues -------------");\r
+ if (npmPriorityQueues.isEmpty()) {\r
+ // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
+ return;\r
+ }\r
+ logger.trace("Calling processExpiredNpmTransaction to cleanup expired Npm Transactions before processing any queue.");\r
+ processExpiredNpmTransaction();\r
+\r
+ // Converting to entrySet so that it can parallel stream :)\r
+ npmPriorityQueues.entrySet().parallelStream().forEach(sbQueueEntry -> {\r
+ final String sbEndpoint = sbQueueEntry.getKey().split("##")[0];\r
+ final String sbType = sbQueueEntry.getKey().split("##")[1];\r
+ final int sbConnectionLimit = NumberUtils.toInt(getNpmConfig(sbType), 1);\r
+\r
+ if (sbConnectionLimit <= connectionCounter.getOrDefault(sbEndpoint, 0)) {\r
+ logger.trace("Not processing any Npm Transaction for sbEndpoint:({}) as it is already occupied to it's maximum connection limit:({}).",\r
+ sbEndpoint, sbConnectionLimit);\r
+ //returning when a particular SB (sbEndpoint) is already occupied to it's max limit\r
+ return;\r
+ }\r
+ logger.trace("Trying to process priority queue for sbEndpoint({}) with connectionLimit:({})", sbEndpoint, sbConnectionLimit);\r
+ processSbPriorityQueues(sbEndpoint, sbType, sbConnectionLimit, sbQueueEntry.getValue());\r
+ });\r
+ }\r
+\r
+ private void processSbPriorityQueues(String sbEndpoint, String sbType, int sbConnectionLimit, final Map<Integer, NavigableSet<NpmTransaction>> priorityQueues) {\r
+ logger.trace("------------- Inside NPM SM processSbPriorityQueues -------------");\r
+ if (priorityQueues == null || priorityQueues.isEmpty()) {\r
+ // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
+ return;\r
+ }\r
+\r
+ Iterator<Map.Entry<Integer,NavigableSet<NpmTransaction>>> priorityQueuesIterator = priorityQueues.entrySet().iterator();\r
+ while (priorityQueuesIterator.hasNext()) {\r
+ Map.Entry<Integer,NavigableSet<NpmTransaction>> entry = priorityQueuesIterator.next();\r
+ final NavigableSet<NpmTransaction> npmTransactions = entry.getValue();\r
+ if (npmTransactions.isEmpty()) {\r
+ priorityQueuesIterator.remove();\r
+ continue;\r
+ }\r
+\r
+ final Integer priorityIndex = entry.getKey();\r
+ final String qspLimitKey = "qsp_limit_" + priorityIndex;\r
+ final String qspStateKey = sbEndpoint + "_" + priorityIndex;\r
+\r
+ AtomicInteger qspLimit = new AtomicInteger(NumberUtils.toInt(getNpmConfig(qspLimitKey), 5));\r
+ AtomicInteger qspCounter = new AtomicInteger(qspExecState.getOrDefault(qspStateKey, 0));\r
+ logger.trace("For sbEndpoint:({}) with priority:({}) qspLimit is:({}) and current qspCounter is:({})",\r
+ sbEndpoint, priorityIndex, qspLimit.get(), qspCounter.get());\r
+\r
+ // On re-iteration it should be processing same priority queue which was processed last only if qsp hasn't met\r
+ if (NpmUtils.isAllOkToProcess(qspLimit.get(), qspCounter.get(), connectionCounter.getOrDefault(sbEndpoint, 0), sbConnectionLimit)\r
+ && priorityExecState.containsKey(sbEndpoint) && priorityQueues.containsKey(priorityExecState.get(sbEndpoint))\r
+ && !priorityIndex.equals(priorityExecState.get(sbEndpoint))) {\r
+ logger.trace("Last execution state for sbEndpoint:({}) was for priority:({})", sbEndpoint, priorityExecState.get(sbEndpoint));\r
+ return;\r
+ }\r
+\r
+ logger.trace("------------- Iterating npmTransactions from priorityQueue -------------");\r
+ for (final NpmTransaction npmTransaction : npmTransactions) {\r
+ // Setting RequestID in MDC same as NPM Transaction RequestId\r
+ MDC.put(MDC_REQUEST_ID, npmTransaction.getRequestId());\r
+ // Should pick npmTransactions which are in QUEUED state and are not Expired\r
+ if (NpmStatusEnum.QUEUED.equals(npmTransaction.getStatus()) && !NpmUtils.isExpired(npmTransaction)\r
+ && NpmUtils.isAllOkToProcess(qspLimit.get(), qspCounter.get(), connectionCounter.getOrDefault(sbEndpoint, 0), sbConnectionLimit)\r
+ && invokeServiceCallbackApi(npmTransaction, true)) {\r
+\r
+ logger.trace("------------- Updating priorityExecState and qspExecState -------------");\r
+ priorityExecState.put(sbEndpoint, priorityIndex);\r
+ qspExecState.put(qspStateKey, qspCounter.incrementAndGet());\r
+ logger.trace("Updated priorityExecState for sbEndpoint:({}) with priority:({})", sbEndpoint, priorityIndex);\r
+ logger.trace("Updated qspExecState for qspStateKey:({}) with qspCounter value:({})", qspStateKey, qspExecState.get(qspStateKey));\r
+ }\r
+ }\r
+ resetExecStates(sbEndpoint, sbType);\r
+ }\r
+ }\r
+\r
+ private boolean invokeServiceCallbackApi(NpmTransaction npmTransaction, boolean updateConnectionCounter) {\r
+ logger.trace("------------- Inside NPM SM invokeServiceCallbackApi -------------");\r
+ try {\r
+ logger.trace("Notifying Registered Service with serviceKey:({}) to process Npm Transaction with npmTransactionId:({})",\r
+ npmTransaction.getServiceKey(), npmTransaction.getNpmTransactionId());\r
+ //Setting the status as PROCESSING so that same won't be picked up again in processNpmPriorityQueues\r
+ npmTransaction.setStatus(NpmStatusEnum.PROCESSING);\r
+ serviceRegistry.get(npmTransaction.getServiceKey()).process(npmTransaction);\r
+ logger.trace("Notified Registered Service to process Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+\r
+ if (updateConnectionCounter) {\r
+ updateConnectionCounter(npmTransaction.getSbEndpoint(), npmTransaction.getConnectionCount());\r
+ }\r
+ } catch (NpmException e) {\r
+ logger.error("Notifying Registered Service with serviceKey:({}) for npmTransactionId:({}) failed with ErrorMessage:({})",\r
+ npmTransaction.getServiceKey(), npmTransaction.getNpmTransactionId(), e.getMessage(), e);\r
+ removeTransactionFromQueue(npmTransaction, true);\r
+ return false;\r
+ }\r
+ return true;\r
+ }\r
+\r
+ /**\r
+ * Resetting Execution States only if QSP met for all priorities so that Npm can reiterate in Round Robin fashion.\r
+ */\r
+ private void resetExecStates(String sbEndpoint, String sbType) {\r
+ logger.trace("------------- Inside NPM SM resetExecStates -------------");\r
+ boolean temp = true;\r
+ final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(sbEndpoint, sbType);\r
+ for (int priority : NpmUtils.getPriorityList(getNpmConfig("Priority_List"))) {\r
+ if (npmPriorityQueues.containsKey(npmPriorityQueueKey) && npmPriorityQueues.get(npmPriorityQueueKey).containsKey(priority)\r
+ && !priorityExecState.containsValue(priority)) {\r
+ //Setting temp to false so that it won't cleanup priorityExecState and qspExecState as all priorityQueues hasn't been processed yet\r
+ logger.trace("Execution States won't be resetting for sbEndpoint:({}) as all priorityQueues hasn't been processed yet.", sbEndpoint);\r
+ temp = false;\r
+ break;\r
+ }\r
+ }\r
+ if (temp) {\r
+ for (int priority : NpmUtils.getPriorityList(getNpmConfig("Priority_List"))) {\r
+ logger.trace("Resetting Execution States for sbEndpoint:({}) as all priorityQueues processed and those needs to reiterate.", sbEndpoint);\r
+ priorityExecState.remove(sbEndpoint);\r
+ qspExecState.remove(sbEndpoint + "_" + priority);\r
+ }\r
+ }\r
+ }\r
+\r
+ private void updateConnectionCounter(String sbEndpoint, int connectionCounterValue) {\r
+ logger.trace("------------- Inside NPM SM updateConnectionCounter -------------");\r
+ //Updating connectionCounter value to be 0 or +ve integer whichever is larger\r
+ connectionCounter.computeIfPresent(sbEndpoint, (key, value) -> Math.max((value + connectionCounterValue), 0));\r
+ connectionCounter.computeIfAbsent(sbEndpoint, s -> Math.max(connectionCounterValue, 0));\r
+ logger.trace("For sbEndpoint:({}) updated connectionCounter value is:({}) ", sbEndpoint, connectionCounter.get(sbEndpoint));\r
+ }\r
+\r
+ private void loadProperties() throws NpmException {\r
+ logger.trace("------------- Inside NPM SM loadProperties -------------");\r
+ String propDir = System.getProperty(NpmConstants.SDNC_CONFIG_DIR);\r
+ if (StringUtils.isBlank(propDir)) {\r
+ propDir = System.getenv(NpmConstants.SDNC_CONFIG_DIR);\r
+ }\r
+ if (StringUtils.isBlank(propDir)) {\r
+ logger.warn("Environment variable:({}) is not set, defaulting properties directory to:({})",\r
+ NpmConstants.SDNC_CONFIG_DIR, NpmConstants.DEFAULT_SDNC_CONFIG_DIR);\r
+ propDir = NpmConstants.DEFAULT_SDNC_CONFIG_DIR;\r
+ }\r
+ loadNpmConfig(propDir + File.separator + NpmConstants.NPM_CONFIG_PROPERTIES_FILE_NAME);\r
+ }\r
+\r
+ @Override\r
+ public void loadNpmConfig(String configFilePath) throws NpmException {\r
+ logger.trace("------------- Inside NPM SM loadNpmConfig -------------");\r
+ try {\r
+ logger.trace("Initializing NPM Configurations from:({})", configFilePath);\r
+ if (new File(configFilePath).exists()) {\r
+ npmConfigurations.load(new FileInputStream(configFilePath));\r
+ } else {\r
+ logger.warn("Config File:({}) not found, Initializing NPM with default configurations.", configFilePath);\r
+ configFilePath = "properties" + File.separator + NpmConstants.NPM_CONFIG_PROPERTIES_FILE_NAME;\r
+ npmConfigurations.load(getClass().getClassLoader().getResourceAsStream(configFilePath));\r
+ }\r
+ logger.trace("Initialized NPM with Configurations:({}) from configFilePath:({})", npmConfigurations, configFilePath);\r
+ } catch (IOException e) {\r
+ throw new NpmException(String.format("SDN-R Internal Error: Failed to load NPM Configurations form:(%s)", configFilePath), e);\r
+ }\r
+ }\r
+\r
+}\r
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import java.util.Map;\r
+import java.util.NavigableSet;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+\r
+import java.time.Instant;\r
+import java.util.List;\r
+import java.util.UUID;\r
+\r
+/**\r
+ * The interface Npm Transaction Service provider.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public interface NpmTransactionService {\r
+\r
+ /**\r
+ * Form npm transaction npm transaction with all mandatory/non-null parameters.\r
+ *\r
+ * @param sbEndpoint the sb endpoint (Host IP Address)\r
+ * @param sbType the sb type (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc)\r
+ * @param serviceKey the service key (the unique serviceKey specific to service reference)\r
+ * @param serviceRequest the service request (actual request payload received from upstream)\r
+ *\r
+ * @return the npm transaction\r
+ */\r
+ NpmTransaction formNpmTransaction(String sbEndpoint, String sbType, String serviceKey, Object serviceRequest);\r
+\r
+ /**\r
+ * Form npm transaction npm transaction with all NPM header Information (nullable and non-nullable)\r
+ *\r
+ * @param npmTransactionId the npmTransactionId (instance of UUID :: defaults to Random UUID if null)\r
+ * @param sbEndpoint the sbEndpoint (Host IP Address :: used to create priority queues)\r
+ * @param sbType the sbType (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc :: used to determine parallel connections it can support )\r
+ * @param priority the priority (n :: lowest index is the highest priority, defaults to (least priority) if -1\r
+ * @param connectionCount the connectionCount (n :: total number of connection a transaction would occupy while processing defaults to (1) if -1\r
+ * @param timestamp the timestamp (instance of Instant :: defaults to Current Time if null)\r
+ * @param timeToLive the timeToLive (instance of Instant :: defaults to MAX Time if null)\r
+ * @param serviceKey the serviceKey (the unique serviceKey specific to service reference)\r
+ * @param serviceRequest the serviceRequest (actual request payload received from upstream)\r
+ *\r
+ * @return the npm transaction\r
+ */\r
+ NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,\r
+ Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest);\r
+\r
+ /**\r
+ * Add Transactions to queue : called by all services to get Transactions added to respective priority queue\r
+ *\r
+ * @param npmTransactionList the NpmTransaction list\r
+ *\r
+ * @return the list of NpmAck with status and message\r
+ */\r
+ List<NpmAck> addTransactionsToQueue(List<NpmTransaction> npmTransactionList);\r
+\r
+ /**\r
+ * Add Transaction to queue : called by all services to get Transaction added to respective priority queue\r
+ *\r
+ * @param npmTransaction the NpmTransaction\r
+ *\r
+ * @return the NpmAck with status and message\r
+ */\r
+ NpmAck addTransactionsToQueue(NpmTransaction npmTransaction);\r
+\r
+ /**\r
+ * Remove Transactions from queue : called by all services to get Transactions removed from priority queue\r
+ *\r
+ * @param npmTransactionList the NpmTransaction list\r
+ *\r
+ * @return the list of NpmAck with status and message\r
+ */\r
+ List<NpmAck> removeTransactionsFromQueue(List<NpmTransaction> npmTransactionList);\r
+\r
+ /**\r
+ * Remove Transaction from queue : called by all services to get Transaction removed from priority queue\r
+ *\r
+ * @param npmTransaction the NpmTransaction\r
+ *\r
+ * @return the NpmAck with status and message\r
+ */\r
+ NpmAck removeTransactionsFromQueue(NpmTransaction npmTransaction);\r
+\r
+ /**\r
+ * Retrieve transaction from queue list.\r
+ *\r
+ * @param sbEndpoint the sb endpoint\r
+ * @param sbType the sb type\r
+ *\r
+ * @return the list\r
+ */\r
+ List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType);\r
+\r
+ /**\r
+ * Retrieve NPM priority queues map.\r
+ *\r
+ * @return the map\r
+ * <pre>\r
+ * npmPriorityQueues : Map [String:sbEndpoint##sbType, Map:sbPriorityQueues]\r
+ * sbPriorityQueues : Map [int:priority, TreeSet:priorityQueue]\r
+ * priorityQueue : NavigableSet [Contains the NpmTransaction Object]\r
+ * </pre>\r
+ */\r
+ Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues();\r
+\r
+ /**\r
+ * Register service : must be called by all services to register with Npm\r
+ *\r
+ * @param serviceKey the unique serviceKey specific to service reference\r
+ * @param npmServiceCallbackApi the instance of service class implementing NpmServiceCallbackApi\r
+ */\r
+ void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi);\r
+\r
+ /**\r
+ * Load npm config boolean.\r
+ * <pre>\r
+ * Default properties:\r
+ * <b>Priority_List=0,1,2</b> Total possible priorities (lowest index is the highest priority)\r
+ * <b>Default_Priority=2</b> Default Priority value to be set if missing in request\r
+ * <b>EMS_ERICSSON=2</b> Maximum number of parallel connection that ERICSSON manufactured EMS can support\r
+ * <b>EMS_NOKIA=2</b> Maximum number of parallel connection that NOKIA manufactured EMS can support\r
+ * <b>queue_capacity_0=10</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 0\r
+ * <b>queue_capacity_1=7</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 1\r
+ * <b>queue_capacity_2=5</b> Total capacity of queue (maximum number of transactions a queue can hold) with priority 2\r
+ * <b>qsp_limit_0=5</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+ * <b>qsp_limit_1=3</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+ * <b>qsp_limit_2=2</b> Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+ * </pre>\r
+ *\r
+ * @param configFilePath the Config File Name\r
+ *\r
+ * @return the boolean\r
+ */\r
+ boolean loadNpmConfig(String configFilePath);\r
+\r
+}\r
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import java.util.Map;\r
+import java.util.NavigableSet;\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;\r
+import org.apache.commons.lang3.math.NumberUtils;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.time.Instant;\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.List;\r
+import java.util.UUID;\r
+\r
+/**\r
+ * The interface Npm Transaction Service provider.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmTransactionServiceImpl implements NpmTransactionService {\r
+ private static final Logger logger = LoggerFactory.getLogger(NpmTransactionServiceImpl.class);\r
+ private final NpmServiceManager npmServiceManager;\r
+\r
+ public NpmTransactionServiceImpl(NpmServiceManager npmServiceManager) {\r
+ this.npmServiceManager = npmServiceManager;\r
+ }\r
+\r
+ @Override\r
+ public List<NpmAck> addTransactionsToQueue(List<NpmTransaction> npmTransactionList) {\r
+ logger.debug("------------- Inside NPM TS addTransactionsToQueue (List<NpmAck>) -------------");\r
+ logger.trace("Received addTransactionsToQueue(List) for Npm Transactions:({})", npmTransactionList);\r
+\r
+ List<NpmAck> npmAckList = new ArrayList<>();\r
+ for (NpmTransaction npmTransaction : npmTransactionList) {\r
+ logger.trace("Npm Transaction with npmTransactionId:({}) is being queued.", npmTransaction.getNpmTransactionId());\r
+ npmAckList.add(addTransactionsToQueue(npmTransaction));\r
+ }\r
+ logger.trace("Responding with npmAckList:({})", npmAckList);\r
+ return npmAckList;\r
+ }\r
+\r
+ @Override\r
+ public NpmAck addTransactionsToQueue(NpmTransaction npmTransaction) {\r
+ logger.debug("------------- Inside NPM TS addTransactionsToQueue -------------");\r
+ logger.trace("Received addTransactionsToQueue for Npm Transaction:({})", npmTransaction);\r
+\r
+ NpmAck npmAck = new NpmAck();\r
+ npmAck.setNpmTransactionId(npmTransaction.getNpmTransactionId());\r
+ npmAck.setRequestId(npmTransaction.getRequestId());\r
+ try {\r
+ //Validate Npm Transaction before creating entry to NPM_TRANSACTION Table.\r
+ npmTransaction.validate();\r
+ if (Arrays.stream(NpmUtils.getPriorityList(npmServiceManager.getNpmConfig("Priority_List"))).noneMatch(i -> i == npmTransaction.getPriority())) {\r
+ // Setting up the configured default priority value it it's missing in request\r
+ npmTransaction.setPriority(NumberUtils.toInt(npmServiceManager.getNpmConfig("Default_Priority"), 2));\r
+ logger.trace("Default priority value:({}) has been set, as it's missing in request.", npmTransaction.getPriority());\r
+ }\r
+ logger.trace("Trying to queue Npm Transaction");\r
+ npmServiceManager.addTransactionToQueue(npmTransaction);\r
+ npmAck.setStatus(NpmStatusEnum.QUEUED);\r
+ npmAck.setMessage("Added to Priority Queue.");\r
+ } catch (NpmException e) {\r
+ logger.error("Failed to queue Npm Transaction.\nErrorMessage: {}", e.getMessage(), e);\r
+ npmAck.setStatus(NpmStatusEnum.FAILED);\r
+ npmAck.setMessage(e.getMessage());\r
+ }\r
+ return npmAck;\r
+ }\r
+\r
+ @Override\r
+ public List<NpmAck> removeTransactionsFromQueue(List<NpmTransaction> npmTransactionList) {\r
+ logger.debug("------------- Inside NPM TS removeTransactionsFromQueue (List<NpmAck>) -------------");\r
+ logger.trace("Received removeTransactionsFromQueue for Npm Transactions:({})", npmTransactionList);\r
+\r
+ List<NpmAck> npmAckList = new ArrayList<>();\r
+ for (NpmTransaction npmTransaction : npmTransactionList) {\r
+ npmAckList.add(removeTransactionsFromQueue(npmTransaction));\r
+ }\r
+ logger.trace("Responding with npmAckList:({})", npmAckList);\r
+ return npmAckList;\r
+ }\r
+\r
+ @Override\r
+ public NpmAck removeTransactionsFromQueue(NpmTransaction npmTransaction) {\r
+ logger.debug("------------- Inside NPM TS removeTransactionsFromQueue -------------");\r
+ logger.trace("Received removeTransactionsFromQueue for Npm Transactions:({})", npmTransaction);\r
+\r
+ npmServiceManager.removeTransactionFromQueue(npmTransaction, true);\r
+\r
+ NpmAck npmAck = new NpmAck();\r
+ npmAck.setNpmTransactionId(npmTransaction.getNpmTransactionId());\r
+ npmAck.setRequestId(npmTransaction.getRequestId());\r
+ npmAck.setStatus(NpmStatusEnum.PROCESSED);\r
+ npmAck.setMessage("Removed from Priority Queue");\r
+ logger.trace("Responding with npmAck:({})", npmAck);\r
+ return npmAck;\r
+ }\r
+\r
+ @Override\r
+ public List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType) {\r
+ logger.debug("------------- Inside NPM TS retrieveQueueStatus (List<NpmTransaction>) -------------");\r
+ logger.trace("Received retrieveTransactionFromQueue for sbEndpoint:({}) and sbType:({})", sbEndpoint, sbType);\r
+ return npmServiceManager.retrieveTransactionFromQueue(sbEndpoint, sbType);\r
+ }\r
+\r
+ @Override\r
+ public Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues() {\r
+ logger.debug("------------- Inside NPM TS retrieveAllPriorityQueues (Map<sbEndpoint, Map<priority, NavigableSet<NpmTransaction>>>) -------------");\r
+ return npmServiceManager.retrieveNpmPriorityQueues();\r
+ }\r
+\r
+ @Override\r
+ public void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi) {\r
+ logger.trace("Registering NpmServiceCallbackApi with serviceKey:({})", serviceKey);\r
+ npmServiceManager.registerService(serviceKey, npmServiceCallbackApi);\r
+ }\r
+\r
+ @Override\r
+ public boolean loadNpmConfig(String configFilePath) {\r
+ try {\r
+ npmServiceManager.loadNpmConfig(configFilePath);\r
+ return true;\r
+ } catch (NpmException e) {\r
+ logger.trace("Loading configurations from file:({}), failed with:({}): ", configFilePath, e.getMessage(), e);\r
+ }\r
+ return false;\r
+ }\r
+\r
+ @Override\r
+ public NpmTransaction formNpmTransaction(String sbEndpoint, String sbType, String serviceKey, Object serviceRequest) {\r
+ return NpmUtils.formNpmTransaction(null, sbEndpoint, sbType, -1, -1,\r
+ null, null, serviceKey, serviceRequest);\r
+ }\r
+\r
+ @Override\r
+ public NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,\r
+ Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest) {\r
+ return NpmUtils.formNpmTransaction(npmTransactionId, sbEndpoint, sbType, priority, connectionCount,\r
+ timestamp, timeToLive, serviceKey, serviceRequest);\r
+ }\r
+\r
+}\r
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.models;
+
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;
+
+import java.util.UUID;
+
+/**
+ * The type Npm Ack.
+ *
+ * @author Kapil Singal
+ */
+public class NpmAck {
+ private UUID npmTransactionId;
+ private String requestId; // multiple transactions can have the same requestId
+ private NpmStatusEnum status;
+ private String message;
+
+ public UUID getNpmTransactionId() {
+ return npmTransactionId;
+ }
+
+ public void setNpmTransactionId(UUID npmTransactionId) {
+ this.npmTransactionId = npmTransactionId;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public NpmStatusEnum getStatus() {
+ return status;
+ }
+
+ public void setStatus(NpmStatusEnum status) {
+ this.status = status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return NpmUtils.getJson(this);
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.models;
+
+/**
+ * The enum Transaction status enum.
+ *
+ * @author Kapil Singal
+ */
+public enum NpmStatusEnum {
+ /**
+ * RECEIVED When Npm receives Npm Transaction
+ */
+ RECEIVED,
+ /**
+ * QUEUED When Npm adds Npm Transaction to a priority queue
+ */
+ QUEUED,
+ /**
+ * PROCESSING When Npm pulls and notifies Npm Transaction to service.
+ */
+ PROCESSING,
+ /**
+ * PROCESSED When Service notifies back Npm about processing done.
+ */
+ PROCESSED,
+ /**
+ * FAILED When Npm fails to either queue or notify Npm Transaction.
+ */
+ FAILED,
+ /**
+ * EXPIRED When timeToLive passes current UTC time.
+ */
+ EXPIRED,
+ /**
+ * OUT_OF_CAPACITY When priority queue is full to it's maximum capacity.
+ */
+ OUT_OF_CAPACITY,
+}
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.models;\r
+\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;\r
+import org.apache.commons.lang3.StringUtils;\r
+\r
+import java.time.Instant;\r
+import java.util.Objects;\r
+import java.util.UUID;\r
+\r
+/**\r
+ * The type Npm Transaction.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmTransaction {\r
+\r
+ private UUID npmTransactionId;\r
+ private String requestId; // multiple transactions can have the same requestId\r
+\r
+ private String sbEndpoint;\r
+ private String sbType;\r
+\r
+ private int priority = -1;\r
+ private int connectionCount = 1;\r
+\r
+ private Instant timestamp = Instant.now();\r
+ private Instant timeToLive = Instant.MAX;\r
+\r
+ private NpmStatusEnum status = NpmStatusEnum.RECEIVED;\r
+ private String message;\r
+\r
+ private String serviceKey;\r
+ private Object serviceRequest;\r
+\r
+ public UUID getNpmTransactionId() {\r
+ return npmTransactionId;\r
+ }\r
+\r
+ public void setNpmTransactionId(UUID npmTransactionId) {\r
+ this.npmTransactionId = npmTransactionId;\r
+ }\r
+\r
+ public String getRequestId() {\r
+ return requestId;\r
+ }\r
+\r
+ public void setRequestId(String requestId) {\r
+ this.requestId = requestId;\r
+ }\r
+\r
+ public String getSbEndpoint() {\r
+ return sbEndpoint;\r
+ }\r
+\r
+ public void setSbEndpoint(String sbEndpoint) {\r
+ this.sbEndpoint = sbEndpoint;\r
+ }\r
+\r
+ public String getSbType() {\r
+ return sbType;\r
+ }\r
+\r
+ public void setSbType(String sbType) {\r
+ this.sbType = sbType;\r
+ }\r
+\r
+ public int getPriority() {\r
+ return priority;\r
+ }\r
+\r
+ public void setPriority(int priority) {\r
+ this.priority = priority;\r
+ }\r
+\r
+ public int getConnectionCount() {\r
+ return connectionCount;\r
+ }\r
+\r
+ public void setConnectionCount(int connectionCount) {\r
+ this.connectionCount = connectionCount;\r
+ }\r
+\r
+ public Instant getTimestamp() {\r
+ return timestamp;\r
+ }\r
+\r
+ public void setTimestamp(Instant timestamp) {\r
+ this.timestamp = timestamp;\r
+ }\r
+\r
+ public Instant getTimeToLive() {\r
+ return timeToLive;\r
+ }\r
+\r
+ public void setTimeToLive(Instant timeToLive) {\r
+ this.timeToLive = timeToLive;\r
+ }\r
+\r
+ public NpmStatusEnum getStatus() {\r
+ return status;\r
+ }\r
+\r
+ public void setStatus(NpmStatusEnum status) {\r
+ this.status = status;\r
+ }\r
+\r
+ public String getMessage() {\r
+ return message;\r
+ }\r
+\r
+ public void setMessage(String message) {\r
+ this.message = message;\r
+ }\r
+\r
+ public String getServiceKey() {\r
+ return serviceKey;\r
+ }\r
+\r
+ public void setServiceKey(String serviceKey) {\r
+ this.serviceKey = serviceKey;\r
+ }\r
+\r
+ public Object getServiceRequest() {\r
+ return serviceRequest;\r
+ }\r
+\r
+ public void setServiceRequest(Object serviceRequest) {\r
+ this.serviceRequest = serviceRequest;\r
+ }\r
+\r
+ /**\r
+ * Validate boolean.\r
+ *\r
+ * @throws NpmException the validator exception\r
+ */\r
+ public void validate() throws NpmException {\r
+ if (npmTransactionId == null) {\r
+ throw new NpmException("Transaction is not valid: npmTransactionId is required.");\r
+ }\r
+ if (StringUtils.isBlank(sbEndpoint)) {\r
+ throw new NpmException("Transaction is not valid: sbEndpoint is required.");\r
+ }\r
+ if (StringUtils.isBlank(sbType)) {\r
+ throw new NpmException("Transaction is not valid: sbType is required.");\r
+ }\r
+ if (timestamp == null) {\r
+ throw new NpmException("Transaction is not valid: txTimestamp is required.");\r
+ }\r
+ if (timeToLive == null) {\r
+ throw new NpmException("Transaction is not valid: timeToLive is required.");\r
+ }\r
+ if (StringUtils.isBlank(serviceKey)) {\r
+ throw new NpmException("Transaction is not valid: serviceKey is required.");\r
+ }\r
+ if (serviceRequest == null) {\r
+ throw new NpmException("Transaction is not valid: serviceRequest is required.");\r
+ }\r
+ }\r
+\r
+ @Override\r
+ public String toString() {\r
+ return NpmUtils.getJson(this);\r
+ }\r
+\r
+ @Override\r
+ public boolean equals(Object o) {\r
+ if (this == o) {\r
+ return true;\r
+ }\r
+ if (!(o instanceof NpmTransaction)) {\r
+ return false;\r
+ }\r
+ NpmTransaction that = (NpmTransaction) o;\r
+ return npmTransactionId.equals(that.npmTransactionId);\r
+ }\r
+\r
+ @Override\r
+ public int hashCode() {\r
+ return Math.abs(Objects.hash(npmTransactionId));\r
+ }\r
+\r
+}\r
--- /dev/null
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.utils;\r
+\r
+import com.fasterxml.jackson.core.JsonProcessingException;\r
+import com.fasterxml.jackson.databind.ObjectMapper;\r
+import com.fasterxml.jackson.databind.SerializationFeature;\r
+import com.fasterxml.jackson.databind.node.ArrayNode;\r
+import org.apache.commons.lang3.StringUtils;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.time.Instant;\r
+import java.util.Collections;\r
+import java.util.List;\r
+import java.util.UUID;\r
+import java.util.stream.Stream;\r
+\r
+/**\r
+ * The type Npm utils.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmUtils {\r
+\r
+ private static final Logger logger = LoggerFactory.getLogger(NpmUtils.class);\r
+\r
+ private NpmUtils() {\r
+ }\r
+\r
+ /**\r
+ * This is a getJson method\r
+ *\r
+ * @param instance the instance\r
+ *\r
+ * @return String json\r
+ */\r
+ public static String getJson(Object instance) {\r
+ try {\r
+ ObjectMapper mapper = new ObjectMapper();\r
+ mapper.enable(SerializationFeature.INDENT_OUTPUT);\r
+ return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(instance);\r
+ } catch (JsonProcessingException e) {\r
+ e.printStackTrace();\r
+ }\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ * Gets list from json string.\r
+ *\r
+ * @param <T> the type parameter\r
+ * @param content the content\r
+ * @param valueType the value type\r
+ *\r
+ * @return the list of type parameter from json string\r
+ */\r
+ public static <T> List<T> getListFromJsonString(String content, Class<T> valueType) {\r
+ try {\r
+ ObjectMapper mapper = new ObjectMapper();\r
+ if (mapper.readTree(content) instanceof ArrayNode) {\r
+ return mapper.readValue(content, mapper.getTypeFactory().constructCollectionType(List.class, valueType));\r
+ } else {\r
+ return Collections.singletonList(mapper.readValue(content, valueType));\r
+ }\r
+ } catch (Exception e) {\r
+ logger.warn(e.getMessage(), e);\r
+ }\r
+ return Collections.emptyList();\r
+ }\r
+\r
+ /**\r
+ * Form npm transaction npm transaction.\r
+ *\r
+ * @param npmTransactionId the npmTransactionId (instance of UUID :: defaults to Random UUID if null)\r
+ * @param sbEndpoint the sbEndpoint (Host IP Address :: used to create priority queues)\r
+ * @param sbType the sbType (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc :: used to determine parallel connections it can support )\r
+ * @param priority the priority (n :: lowest index is the highest priority, defaults to (least priority) if -1\r
+ * @param connectionCount the connectionCount (n :: total number of connection a transaction would occupy while processing defaults to (1) if -1\r
+ * @param timestamp the timestamp (instance of Instant :: defaults to Current Time if null)\r
+ * @param timeToLive the timeToLive (instance of Instant :: defaults to MAX Time if null)\r
+ * @param serviceKey the serviceKey (the unique serviceKey specific to service reference)\r
+ * @param serviceRequest the serviceRequest (actual request payload received from upstream)\r
+ *\r
+ * @return the npm transaction\r
+ */\r
+ public static NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,\r
+ Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest) {\r
+\r
+ NpmTransaction npmTransaction = new NpmTransaction();\r
+ npmTransaction.setNpmTransactionId(npmTransactionId == null ? UUID.randomUUID() : npmTransactionId);\r
+ npmTransaction.setSbEndpoint(sbEndpoint);\r
+ npmTransaction.setSbType(sbType);\r
+ if (priority > -1) {\r
+ npmTransaction.setPriority(priority);\r
+ }\r
+ if (connectionCount > 0) {\r
+ npmTransaction.setConnectionCount(connectionCount);\r
+ }\r
+ npmTransaction.setTimestamp(timestamp == null ? Instant.now() : timestamp);\r
+ npmTransaction.setTimeToLive(timeToLive == null ? Instant.MAX : timeToLive);\r
+ npmTransaction.setServiceKey(serviceKey);\r
+ npmTransaction.setServiceRequest(serviceRequest);\r
+\r
+ return npmTransaction;\r
+ }\r
+\r
+ /**\r
+ * This is a isAllOkToProcess method\r
+ *\r
+ * @param qspLimit the queue serve period limit\r
+ * @param qspCounter the queue serve period counter\r
+ * @param connectionCounter the connection counter for sb_endpoint\r
+ * @param sbConnectionLimit the sb_endpoint parallel connection limit\r
+ *\r
+ * @return true if: queue serve period is not met and sbEndpoint is still having connection slot empty\r
+ * <p>\r
+ * false if: queue serve period is reached to max limit or particular SB (sbEndpoint) is already occupied to max connection limit\r
+ */\r
+ public static boolean isAllOkToProcess(int qspLimit, int qspCounter, int connectionCounter, int sbConnectionLimit) {\r
+ return qspLimit > qspCounter && connectionCounter < sbConnectionLimit;\r
+ }\r
+\r
+ /**\r
+ * Is expired boolean.\r
+ *\r
+ * @param npmTransaction the NpmTransaction instance\r
+ *\r
+ * @return true if timeToLive is passed than current UTC Time else false\r
+ */\r
+ public static boolean isExpired(NpmTransaction npmTransaction) {\r
+ return npmTransaction != null && npmTransaction.getTimeToLive().compareTo(Instant.now()) <= 0;\r
+ }\r
+\r
+ /**\r
+ * Get priority list.\r
+ *\r
+ * @param priorities defined Property_List from properties file\r
+ *\r
+ * @return the int [priorities]\r
+ */\r
+ public static int[] getPriorityList(String priorities) {\r
+ return Stream.of(StringUtils.defaultIfBlank(priorities, "0,1,2").split(",")).mapToInt(Integer::parseInt).toArray();\r
+ }\r
+\r
+ /**\r
+ * Gets npm priority queue key.\r
+ *\r
+ * @param sbEndpoint the sb endpoint\r
+ * @param sbType the sb type\r
+ *\r
+ * @return the npm priority queue key\r
+ */\r
+ public static String getNpmPriorityQueueKey(String sbEndpoint, String sbType) {\r
+ return sbEndpoint.concat("##").concat(sbType);\r
+ }\r
+\r
+}\r
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ ONAP : ccsdk features
+ ~ ================================================================================
+ ~ Update Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=======================================================
+ ~
+ -->
+
+<blueprint xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+ xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+ odl:use-default-for-reference-types="true">
+
+ <bean id="npmServiceManager" class="org.onap.ccsdk.features.lib.npm.api.NpmServiceManagerImpl"/>
+
+ <bean id="npmTransactionService" class="org.onap.ccsdk.features.lib.npm.api.NpmTransactionServiceImpl">
+ <argument ref="npmServiceManager"/>
+ </bean>
+
+ <service ref="npmTransactionService" interface="org.onap.ccsdk.features.lib.npm.api.NpmTransactionService"/>
+
+</blueprint>
--- /dev/null
+Priority_List=0,1,2
+Default_Priority=2
+EMS_ERICSSON=2
+EMS_NOKIA=2
+queue_capacity_0=10
+queue_capacity_1=7
+queue_capacity_2=5
+qsp_limit_0=5
+qsp_limit_1=3
+qsp_limit_2=2
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NpmServiceCallbackHandler implements NpmServiceCallbackApi {
+ private static final Logger logger = LoggerFactory.getLogger(NpmServiceCallbackApi.class);
+
+ private final NpmTransactionService npmTransactionService;
+
+ public NpmServiceCallbackHandler(NpmTransactionService npmTransactionService) {
+ this.npmTransactionService = npmTransactionService;
+ npmTransactionService.registerService("npmServiceCallbackHandler", this);
+ }
+
+ @Override
+ public void process(NpmTransaction npmTransaction) {
+ logger.debug("Received NpmTransaction with npmTransactionId ({}) to process.", npmTransaction.getNpmTransactionId());
+ logger.debug("NpmTransaction serviceRequest ({}).", NpmUtils.getJson(npmTransaction.getServiceRequest()));
+ removeTransactionsFromQueue(npmTransaction);
+ }
+
+ private void removeTransactionsFromQueue(NpmTransaction npmTransaction) {
+ NpmAck npmAck = npmTransactionService.removeTransactionsFromQueue(npmTransaction);
+ logger.debug("Removed NpmTransaction with npmAck ({}) after processing.", npmAck);
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import org.junit.BeforeClass;
+import org.onap.ccsdk.features.lib.npm.NpmConstants;
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;
+import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NpmTransactionServiceTest {
+
+ private static NpmTransactionServiceImpl npmTransactionService;
+
+ private List<NpmTransaction> npmTransactionList;
+
+ @BeforeClass
+ public static void once() throws Exception {
+ System.setProperty(NpmConstants.SDNC_CONFIG_DIR, "src/test/resources/properties");
+ npmTransactionService = new NpmTransactionServiceImpl(new NpmServiceManagerImpl());
+
+ npmTransactionService.registerService("npmServiceCallbackHandler", new NpmServiceCallbackHandler(npmTransactionService));
+ }
+
+ @Before
+ public void before() throws Exception {
+
+ npmTransactionList = new ArrayList<>();
+ npmTransactionList.add(npmTransactionService.formNpmTransaction(null,
+ "1.1.1.1",
+ "EMS_ERICSSON",
+ 0,
+ 1,
+ Instant.now(),
+ Instant.MAX,
+ "npmServiceCallbackHandler",
+ new ObjectMapper().readTree("{\"attr\": \"EMS_ERICSSON_1\"}")));
+ npmTransactionList.add(npmTransactionService.formNpmTransaction(null,
+ "1.1.1.1",
+ "EMS_ERICSSON",
+ 0,
+ 1,
+ Instant.now(),
+ Instant.MAX,
+ "npmServiceCallbackHandler",
+ new ObjectMapper().readTree("{\"attr\": \"EMS_ERICSSON_2\"}")));
+
+ npmTransactionList.add(npmTransactionService.formNpmTransaction(null,
+ "2.2.2.2",
+ "EMS_NOKIA",
+ 0,
+ 1,
+ Instant.now(),
+ Instant.MAX,
+ "npmServiceCallbackHandler",
+ new ObjectMapper().readTree("{\"attr\": \"EMS_NOKIA_1\"}")));
+ npmTransactionList.add(npmTransactionService.formNpmTransaction(null,
+ "2.2.2.2",
+ "EMS_NOKIA",
+ 0,
+ 1,
+ Instant.now(),
+ Instant.MAX,
+ "npmServiceCallbackHandler",
+ new ObjectMapper().readTree("{\"attr\": \"EMS_NOKIA_2\"}")));
+ }
+
+ @Test
+ public void addTransactionsToQueue_validTransaction() {
+ List<NpmAck> npmAckList = npmTransactionService.addTransactionsToQueue(npmTransactionList);
+ assertEquals(npmTransactionList.size(), npmAckList.size());
+ assertEquals(NpmStatusEnum.QUEUED, npmAckList.get(0).getStatus());
+ assertEquals(NpmStatusEnum.QUEUED, npmAckList.get(1).getStatus());
+ }
+
+ @Test
+ public void addTransactionsToQueue_invalidTransaction() {
+ npmTransactionList.get(0).setServiceRequest(null);
+ NpmAck npmAck = npmTransactionService.addTransactionsToQueue(npmTransactionList.get(0));
+ assertEquals(NpmStatusEnum.FAILED, npmAck.getStatus());
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.utils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class NpmUtilTest {
+
+ @Test
+ public void testGetListFromJsonString() {
+ String content = "{\"key\":\"value\"}";
+ List<JsonNode> jsonNodeForString = NpmUtils.getListFromJsonString(content, JsonNode.class);
+ assertEquals(1, jsonNodeForString.size());
+
+ content = "[{\"key\":\"value\"}, {\"key\":\"value2\"}]";
+ jsonNodeForString = NpmUtils.getListFromJsonString(content, JsonNode.class);
+ assertEquals(2, jsonNodeForString.size());
+ }
+
+}
--- /dev/null
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ ONAP : ccsdk features
+ ~ ================================================================================
+ ~ Update Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=======================================================
+ ~
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-5p %c{1} - %m%n"/>
+ </layout>
+ </appender>
+ <root>
+ <priority value="trace" />
+ <appender-ref ref="console" />
+ </root>
+</log4j:configuration>
\ No newline at end of file
--- /dev/null
+Env_Type=solo
+Priority_List=0,1,2
+Default_Priority=2
+EMS_ERICSSON=2
+EMS_NOKIA=2
+queue_capacity_0=10
+queue_capacity_1=7
+queue_capacity_2=5
+qsp_limit_0=5
+qsp_limit_1=3
+qsp_limit_2=2
<module>aafshiro</module>
<module>lib/rlock</module>
<module>lib/doorman</module>
+ <module>lib/network-prioritization</module>
</modules>
<scm>