Adding network-prioritization-network 39/121239/1
authorSingal, Kapil (ks220y) <ks220y@att.com>
Mon, 10 May 2021 17:48:00 +0000 (13:48 -0400)
committerSingal, Kapil (ks220y) <ks220y@att.com>
Mon, 10 May 2021 17:48:34 +0000 (13:48 -0400)
Issue-ID: CCSDK-3292
Signed-off-by: Singal, Kapil (ks220y) <ks220y@att.com>
Change-Id: I03fed97bd85040b62cd308d9c8166d9ed023efc6

20 files changed:
lib/network-prioritization/pom.xml [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmConstants.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmException.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackApi.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManager.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManagerImpl.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionService.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceImpl.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmAck.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmStatusEnum.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmTransaction.java [new file with mode: 0644]
lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtils.java [new file with mode: 0644]
lib/network-prioritization/src/main/resources/OSGI-INF/blueprint/blueprint.xml [new file with mode: 0644]
lib/network-prioritization/src/main/resources/properties/npm-config.properties [new file with mode: 0644]
lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackHandler.java [new file with mode: 0644]
lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceTest.java [new file with mode: 0644]
lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtilTest.java [new file with mode: 0644]
lib/network-prioritization/src/test/resources/log4j.xml [new file with mode: 0644]
lib/network-prioritization/src/test/resources/properties/npm-config.properties [new file with mode: 0644]
pom.xml

diff --git a/lib/network-prioritization/pom.xml b/lib/network-prioritization/pom.xml
new file mode 100644 (file)
index 0000000..cfd7867
--- /dev/null
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ ONAP : ccsdk features
+  ~ ================================================================================
+  ~ Update Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=======================================================
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onap.ccsdk.features</groupId>
+        <artifactId>ccsdk-features</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+        <relativePath>../../</relativePath>
+    </parent>
+
+    <groupId>org.onap.ccsdk.features.lib</groupId>
+    <artifactId>network-prioritization</artifactId>
+
+    <name>ccsdk-features :: lib :: ${project.artifactId}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <!-- Use SimpleLogger as the slf4j implementation in test -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmConstants.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmConstants.java
new file mode 100644 (file)
index 0000000..0583f60
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm;
+
+public class NpmConstants {
+    public static final String PROPERTY_ENV_TYPE = "Env_Type";
+    public static final String PROPERTY_ENV_PROD = "field";
+    public static final String PROPERTY_ENV_SOLO = "solo";
+
+    public static final String MDC_REQUEST_ID = "RequestID";
+
+    public static final String SDNC_CONFIG_DIR = "SDNC_CONFIG_DIR";
+    public static final String DEFAULT_SDNC_CONFIG_DIR = "/opt/sdnc/data/properties";
+    public static final String NPM_CONFIG_PROPERTIES_FILE_NAME = "npm-config.properties";
+
+    private NpmConstants() {}
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmException.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/NpmException.java
new file mode 100644 (file)
index 0000000..ec0f199
--- /dev/null
@@ -0,0 +1,56 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm;\r
+\r
+/**\r
+ * The type Npm exception.\r
+ * <p>\r
+ * It will be thrown in following cases:\r
+ * - Invalid Npm Transaction received\r
+ * - Couldn't queue RECEIVED Npm Transaction\r
+ * - Npm Transaction is already present in queue\r
+ * - Fails to invoke Service callback API\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmException extends Exception {\r
+\r
+    /**\r
+     * This is a NpmException constructor\r
+     *\r
+     * @param message the message\r
+     */\r
+    public NpmException(String message) {\r
+        super(message);\r
+    }\r
+\r
+    /**\r
+     * This is a NpmException constructor\r
+     *\r
+     * @param message the message\r
+     * @param cause   the cause\r
+     */\r
+    public NpmException(String message, Throwable cause) {\r
+        super(message, cause);\r
+    }\r
+\r
+}\r
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackApi.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackApi.java
new file mode 100644 (file)
index 0000000..58e8cd5
--- /dev/null
@@ -0,0 +1,45 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+\r
+/**\r
+ * The interface NpmServiceCallbackApi.\r
+ * This must be implemented by all Services to receive notification back\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public interface NpmServiceCallbackApi {\r
+\r
+    /**\r
+     * Process to be implemented by all services on-boarding to Npm.\r
+     * This API will be invoked by Npm to notify Service to process a transaction\r
+     *\r
+     * @param npmTransaction the NpmTransaction\r
+     *\r
+     * @throws NpmException the npm exception\r
+     */\r
+    void process(NpmTransaction npmTransaction) throws NpmException;\r
+\r
+}\r
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManager.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManager.java
new file mode 100644 (file)
index 0000000..0b7ab72
--- /dev/null
@@ -0,0 +1,122 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.NavigableSet;\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+\r
+/**\r
+ * The interface Npm Service Manager being used internally by Npm\r
+ * <p>\r
+ * Placeholder for Priority Queues holding Npm Transactions:\r
+ * Create: During Npm Startup if un-processed Transaction are available in NPM_TRANSACTION Table\r
+ * Manage: If a tx is expired sitting in queue, invoke service callback api\r
+ * Remove: If gets notified by a service once done with processing Transaction\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public interface NpmServiceManager {\r
+\r
+    /**\r
+     * Add Transaction to queue.\r
+     *\r
+     * @param npmTransaction the NpmTransaction instance which contain serviceRequest with header information\r
+     *\r
+     * @throws NpmException the NpmException if Npm Transaction:\r
+     *                      if missing required header information\r
+     *                      couldn't be queued if priority queue is already full to it's max capacity\r
+     */\r
+    void addTransactionToQueue(NpmTransaction npmTransaction) throws NpmException;\r
+\r
+    /**\r
+     * Remove Transaction from queue and update connection counter\r
+     *\r
+     * @param npmTransaction          the NpmTransaction instance which contain serviceRequest with header information\r
+     * @param updateConnectionCounter the update connection counter only if it's true\r
+     */\r
+    void removeTransactionFromQueue(NpmTransaction npmTransaction, boolean updateConnectionCounter);\r
+\r
+    /**\r
+     * Retrieve transaction from queue list.\r
+     *\r
+     * @param sbEndpoint the sb endpoint\r
+     * @param sbType     the sb type\r
+     *\r
+     * @return the list of NpmTransaction\r
+     */\r
+    List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType);\r
+\r
+    /**\r
+     * Retrieve all priority queues map.\r
+     *\r
+     * @return the map\r
+     * <pre>\r
+     * npmPriorityQueues        : Map [String:sbEndpoint##sbType, Map:sbPriorityQueues]\r
+     *     sbPriorityQueues     : Map [int:priority, TreeSet:priorityQueue]\r
+     *         priorityQueue    : NavigableSet [Contains the NpmTransaction Object]\r
+     * </pre>\r
+     */\r
+    Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues();\r
+\r
+    /**\r
+     * Load npm config boolean.\r
+     * <pre>\r
+     * Default properties:\r
+     *      <b>Priority_List=0,1,2</b>     Total possible priorities (lowest index is the highest priority)\r
+     *      <b>Default_Priority=2</b>      Default Priority value to be set if missing in request\r
+     *      <b>EMS_ERICSSON=2</b>          Maximum number of parallel connection that ERICSSON manufactured EMS can support\r
+     *      <b>EMS_NOKIA=2</b>             Maximum number of parallel connection that NOKIA manufactured EMS can support\r
+     *      <b>queue_capacity_0=10</b>     Total capacity of queue (maximum number of transactions a queue can hold) with priority 0\r
+     *      <b>queue_capacity_1=7</b>      Total capacity of queue (maximum number of transactions a queue can hold) with priority 1\r
+     *      <b>queue_capacity_2=5</b>      Total capacity of queue (maximum number of transactions a queue can hold) with priority 2\r
+     *      <b>qsp_limit_0=5</b>           Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+     *      <b>qsp_limit_1=3</b>           Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+     *      <b>qsp_limit_2=2</b>           Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+     * </pre>\r
+     *\r
+     * @param configFilePath the Config File Name\r
+     *\r
+     * @throws NpmException the npm exception\r
+     */\r
+    void loadNpmConfig(String configFilePath) throws NpmException;\r
+\r
+    /**\r
+     * Gets npm config.\r
+     *\r
+     * @param referenceKey the reference key\r
+     *\r
+     * @return the npmConfig Value\r
+     */\r
+    String getNpmConfig(String referenceKey);\r
+\r
+    /**\r
+     * Register service : must be called to register with Npm\r
+     *\r
+     * @param serviceKey            the unique serviceKey specific to service reference\r
+     * @param npmServiceCallbackApi the instance of service class implementing NpmServiceCallbackApi\r
+     */\r
+    void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi);\r
+\r
+}\r
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManagerImpl.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceManagerImpl.java
new file mode 100644 (file)
index 0000000..2cdef35
--- /dev/null
@@ -0,0 +1,430 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+import org.onap.ccsdk.features.lib.npm.NpmConstants;\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;\r
+import org.apache.commons.lang3.StringUtils;\r
+import org.apache.commons.lang3.math.NumberUtils;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.io.File;\r
+import java.io.FileInputStream;\r
+import java.io.IOException;\r
+import java.util.Comparator;\r
+import java.util.Iterator;\r
+import java.util.Map;\r
+import java.util.NavigableSet;\r
+import java.util.Properties;\r
+import java.util.TreeMap;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+import java.util.concurrent.ConcurrentSkipListSet;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.TimeUnit;\r
+import java.util.concurrent.atomic.AtomicInteger;\r
+import org.slf4j.MDC;\r
+\r
+import static org.onap.ccsdk.features.lib.npm.NpmConstants.MDC_REQUEST_ID;\r
+\r
+/**\r
+ * The type Npm Service Manager.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmServiceManagerImpl implements NpmServiceManager {\r
+    private static final Logger logger = LoggerFactory.getLogger(NpmServiceManagerImpl.class);\r
+    /**\r
+     * Npm `Priority Queues`.\r
+     * <p>\r
+     * npmPriorityQueues : Map <String:sbEndpoint##sbType, Map:sbPriorityQueues>\r
+     * sbPriorityQueues  : Map <int:priority, TreeSet:priorityQueue>\r
+     * priorityQueue     : NavigableSet [Contains the NpmTransaction Object]\r
+     * </p>\r
+     *\r
+     * <p>\r
+     * Npm will maintain multiple Priority queues per sb_endpoint (an EMS is an example of sb_endpoint)\r
+     * Priority Queues will be maintained per priority, having Npm Transactions, sorted based on timestamp, to accomplish a FIFO queue.\r
+     * </p>\r
+     *\r
+     * <p>\r
+     * Why NavigableSet or ConcurrentSkipListSet is being used:\r
+     * Need to maintain priority queue Sorted based om Transaction timestamp.!!\r
+     * Hence using ConcurrentSkipListSet -> which implements NavigableSet -> which extends SortedSet\r
+     * </p>\r
+     *\r
+     * <p>\r
+     * The ConcurrentSkipListSet class allows safe execution of\r
+     * Insertion, removal, and access operations on set concurrently by multiple threads.\r
+     * </p>\r
+     *\r
+     * <p>\r
+     * It should be preferred over other implementations of the Set interface\r
+     * when concurrent modification of set by multiple threads is required.\r
+     * </p>\r
+     */\r
+    private final Map<String, Map<Integer, NavigableSet<NpmTransaction>>> npmPriorityQueues = new ConcurrentHashMap<>();\r
+    private final Map<String, Integer> connectionCounter = new ConcurrentHashMap<>();\r
+    private final Map<String, NpmServiceCallbackApi> serviceRegistry = new ConcurrentHashMap<>();\r
+    private final Map<String, Integer> priorityExecState = new ConcurrentHashMap<>();\r
+    private final Map<String, Integer> qspExecState = new ConcurrentHashMap<>();\r
+\r
+    private final Properties npmConfigurations = new Properties();\r
+    private final ExecutorService executorService = Executors.newCachedThreadPool();\r
+\r
+    private boolean isProcessIngNpmPriorityQueues = false;\r
+\r
+    public NpmServiceManagerImpl() throws NpmException {\r
+        loadProperties();\r
+        Runnable processNpmPriorityQueuesTask = () -> {\r
+            try {\r
+                if (!isProcessIngNpmPriorityQueues) {\r
+                    isProcessIngNpmPriorityQueues = true;\r
+                    // Cleaning up MDC to make sure logging doesn't have old requestID being used for further processing\r
+                    MDC.clear();\r
+                    processNpmPriorityQueues();\r
+                    isProcessIngNpmPriorityQueues = false;\r
+                }\r
+            } catch (StackOverflowError | Exception e) {\r
+                //Setting isProcessIngNpmPriorityQueues to false because next time when periodic task runs it should re-run processNpmPriorityQueues\r
+                isProcessIngNpmPriorityQueues = false;\r
+                // Catching both as there may not be any npm transaction at time of boot or eventual\r
+                logger.warn("----------- Task to processNpmPriorityQueues failed ----------- \nErrorMessage:({})", e.getMessage(), e);\r
+            }\r
+        };\r
+        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(processNpmPriorityQueuesTask, 30, 5, TimeUnit.SECONDS);\r
+    }\r
+\r
+    @Override\r
+    public String getNpmConfig(String referenceKey) {\r
+        return npmConfigurations.getProperty(referenceKey);\r
+    }\r
+\r
+    @Override\r
+    public void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi) {\r
+        logger.trace("------------- Registering NpmServiceCallbackApi with serviceKey:({}) -------------", serviceKey);\r
+        serviceRegistry.put(serviceKey, npmServiceCallbackApi);\r
+    }\r
+\r
+    @Override\r
+    public void addTransactionToQueue(final NpmTransaction npmTransaction) throws NpmException {\r
+        logger.trace("------------- Inside NPM SM addTransactionToQueue -------------");\r
+        logger.trace("Queuing Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+\r
+        //Sorted Queue based on timestamp, if timestamp same for multiple Transaction it sorts those with NpmTransactionId.\r
+        //Using computeIfAbsent to make sure it creates priority_queue for particular sb_endpoint if not already present\r
+        final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(npmTransaction.getSbEndpoint(), npmTransaction.getSbType());\r
+\r
+        logger.trace("Locating npmPriorityQueues with key sbEndpoint##sbType :: ({})", npmPriorityQueueKey);\r
+        NavigableSet<NpmTransaction> priorityQueue = npmPriorityQueues.computeIfAbsent(npmPriorityQueueKey,\r
+            sbPriorityQueues -> new TreeMap<>()).computeIfAbsent(npmTransaction.getPriority(),\r
+                priorityQueueSet -> new ConcurrentSkipListSet<>\r
+                        (Comparator.comparing(NpmTransaction :: getTimestamp).thenComparing(NpmTransaction :: getNpmTransactionId)));\r
+        logger.trace("Current queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
+                npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
+\r
+        if (priorityQueue.contains(npmTransaction)) {\r
+            logger.trace("Npm Transaction with npmTransactionId:({}) is already present in queued, returning without altering the queue...",\r
+                    npmTransaction.getNpmTransactionId());\r
+            return;\r
+        }\r
+\r
+        // Compare if queue_capacity_$priority available from Configurations, else by default it will be comparing against default value = 10\r
+        final int queueCapacity = NumberUtils.toInt(getNpmConfig("queue_capacity_" + npmTransaction.getPriority()), 10);\r
+        if (priorityQueue.size() >= queueCapacity) {\r
+            npmTransaction.setStatus(NpmStatusEnum.OUT_OF_CAPACITY);\r
+            String message = String.format("Queue %s Error. Npm Queue for sb_endpoint:(%s) with Priority:(%s) is maxed out to it's capacity limit:(%s)",\r
+                    NpmStatusEnum.OUT_OF_CAPACITY, npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), queueCapacity);\r
+            logger.trace("Returning Error message:({})", message);\r
+            throw new NpmException(message);\r
+        }\r
+        npmTransaction.setStatus(NpmStatusEnum.QUEUED);\r
+        priorityQueue.add(npmTransaction);\r
+        logger.trace("Successfully queued Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+        logger.trace("Updated queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
+                npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
+    }\r
+\r
+    @Override\r
+    public List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType) {\r
+        logger.trace("------------- Inside NPM SM retrieveTransactionFromQueue -------------");\r
+        logger.trace("Retrieving all Npm Transactions for sbEndpoint:sbType ({}:{}) from priorityQueues", sbEndpoint, sbType);\r
+\r
+        final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(sbEndpoint, sbType);\r
+        final List<NpmTransaction> npmTransactionList = new ArrayList<>();\r
+\r
+        //Using computeIfPresent as npmTransactionQueueMap doesn't need any alteration if Npm Transaction is not found.\r
+        npmPriorityQueues.computeIfPresent(npmPriorityQueueKey, (sb, sbPriorityQueues) -> {\r
+            sbPriorityQueues.forEach((priority, npmTransactionNavigableSet) -> {\r
+                npmTransactionList.addAll(npmTransactionNavigableSet);\r
+            });\r
+            return sbPriorityQueues;\r
+        });\r
+\r
+        logger.trace("Retrieved total {} Npm Transactions from priorityQueues", npmTransactionList.size());\r
+        return npmTransactionList;\r
+    }\r
+\r
+    @Override\r
+    public Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues() {\r
+        // TODO: Check if it should return the actual queue map instance or a clone !!\r
+        return npmPriorityQueues;\r
+    }\r
+\r
+    @Override\r
+    public void removeTransactionFromQueue(NpmTransaction npmTransaction, boolean updateConnectionCounter) {\r
+        logger.trace("------------- Inside NPM SM removeTransactionFromQueue -------------");\r
+        logger.trace("Removing Npm Transaction from priority queue with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+        if (updateConnectionCounter) {\r
+            // Updating connection counter so that next transaction can be processed from queue for same sbEndpoint\r
+            updateConnectionCounter(npmTransaction.getSbEndpoint(), Math.negateExact(npmTransaction.getConnectionCount()));\r
+        }\r
+        final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(npmTransaction.getSbEndpoint(), npmTransaction.getSbType());\r
+\r
+        //Using computeIfPresent as npmTransactionQueueMap doesn't need any alteration if Npm Transaction is not found.\r
+        npmPriorityQueues.computeIfPresent(npmPriorityQueueKey, (sb, sbPriorityQueues) -> {\r
+            NavigableSet<NpmTransaction> priorityQueue = sbPriorityQueues.get(npmTransaction.getPriority());\r
+            if (priorityQueue != null) {\r
+                logger.trace("Current queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
+                        npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
+\r
+                priorityQueue.remove(npmTransaction);\r
+                logger.trace("Successfully removed Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+                logger.trace("Updated queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
+                        npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
+\r
+                // Cleaning up Priority Queue if empty\r
+                if (priorityQueue.isEmpty()) {\r
+                    logger.trace("As priorityQueue for sbEndpoint:({}) with priority:({}) is empty, removing the priority queue.",\r
+                            npmTransaction.getSbEndpoint(), npmTransaction.getPriority());\r
+                    sbPriorityQueues.remove(npmTransaction.getPriority());\r
+                }\r
+            }\r
+            return sbPriorityQueues;\r
+        });\r
+    }\r
+\r
+    private void processExpiredNpmTransaction() {\r
+        logger.trace("------------- Inside NPM SM processExpiredNpmTransaction -------------");\r
+        if (npmPriorityQueues.isEmpty()) {\r
+            logger.trace("------------- No Priority Queue is present, nothing to cleanup, hence returning -------------");\r
+            // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
+            return;\r
+        }\r
+        // Converting to entrySet so that it can parallel stream :)\r
+        npmPriorityQueues.entrySet().parallelStream().forEach(sbEndpointMapEntry -> {\r
+            sbEndpointMapEntry.getValue().entrySet().parallelStream().forEach(prioritySetEntry -> {\r
+\r
+                // TODO: Need to test, Periodic Printing of Current Queue Length is expected by Rajesh and Team\r
+                logger.trace("Current queue length with key sbEndpoint##sbType :: ({}) with priority:({}) is:({})",\r
+                        sbEndpointMapEntry.getKey(), prioritySetEntry.getKey(), prioritySetEntry.getValue().size());\r
+\r
+                prioritySetEntry.getValue().parallelStream().forEach(npmTransaction -> {\r
+                    // Checking if npmTransaction is already expired\r
+                    if (NpmUtils.isExpired(npmTransaction)) {\r
+                        logger.trace("Npm Transaction with npmTransactionId:({}) is Expired and will be removed from priority queue, as timeToLive has passed.",\r
+                                npmTransaction.getNpmTransactionId());\r
+                        npmTransaction.setStatus(NpmStatusEnum.EXPIRED);\r
+                        npmTransaction.setMessage("Npm Transaction is Expired and will be removed from priority queue, as timeToLive has passed.");\r
+                        Runnable notifyServiceTask = () -> invokeServiceCallbackApi(npmTransaction, false);\r
+                        executorService.execute(notifyServiceTask);\r
+                        removeTransactionFromQueue(npmTransaction, false);\r
+                    }\r
+                });\r
+            });\r
+        });\r
+        logger.trace("------------- Done with checking all priority queues for any expired Npm Transaction -------------");\r
+    }\r
+\r
+    private void processNpmPriorityQueues() {\r
+        logger.trace("------------- Inside NPM SM processNpmPriorityQueues -------------");\r
+        if (npmPriorityQueues.isEmpty()) {\r
+            // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
+            return;\r
+        }\r
+        logger.trace("Calling processExpiredNpmTransaction to cleanup expired Npm Transactions before processing any queue.");\r
+        processExpiredNpmTransaction();\r
+\r
+        // Converting to entrySet so that it can parallel stream :)\r
+        npmPriorityQueues.entrySet().parallelStream().forEach(sbQueueEntry -> {\r
+            final String sbEndpoint = sbQueueEntry.getKey().split("##")[0];\r
+            final String sbType = sbQueueEntry.getKey().split("##")[1];\r
+            final int sbConnectionLimit = NumberUtils.toInt(getNpmConfig(sbType), 1);\r
+\r
+            if (sbConnectionLimit <= connectionCounter.getOrDefault(sbEndpoint, 0)) {\r
+                logger.trace("Not processing any Npm Transaction for sbEndpoint:({}) as it is already occupied to it's maximum connection limit:({}).",\r
+                        sbEndpoint, sbConnectionLimit);\r
+                //returning when a particular SB (sbEndpoint) is already occupied to it's max limit\r
+                return;\r
+            }\r
+            logger.trace("Trying to process priority queue for sbEndpoint({}) with connectionLimit:({})", sbEndpoint, sbConnectionLimit);\r
+            processSbPriorityQueues(sbEndpoint, sbType, sbConnectionLimit, sbQueueEntry.getValue());\r
+        });\r
+    }\r
+\r
+    private void processSbPriorityQueues(String sbEndpoint, String sbType, int sbConnectionLimit, final Map<Integer, NavigableSet<NpmTransaction>> priorityQueues) {\r
+        logger.trace("------------- Inside NPM SM processSbPriorityQueues -------------");\r
+        if (priorityQueues == null || priorityQueues.isEmpty()) {\r
+            // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
+            return;\r
+        }\r
+\r
+        Iterator<Map.Entry<Integer,NavigableSet<NpmTransaction>>> priorityQueuesIterator = priorityQueues.entrySet().iterator();\r
+        while (priorityQueuesIterator.hasNext()) {\r
+            Map.Entry<Integer,NavigableSet<NpmTransaction>> entry = priorityQueuesIterator.next();\r
+            final NavigableSet<NpmTransaction> npmTransactions = entry.getValue();\r
+            if (npmTransactions.isEmpty()) {\r
+                priorityQueuesIterator.remove();\r
+                continue;\r
+            }\r
+\r
+            final Integer priorityIndex = entry.getKey();\r
+            final String qspLimitKey = "qsp_limit_" + priorityIndex;\r
+            final String qspStateKey = sbEndpoint + "_" + priorityIndex;\r
+\r
+            AtomicInteger qspLimit = new AtomicInteger(NumberUtils.toInt(getNpmConfig(qspLimitKey), 5));\r
+            AtomicInteger qspCounter = new AtomicInteger(qspExecState.getOrDefault(qspStateKey, 0));\r
+            logger.trace("For sbEndpoint:({}) with priority:({}) qspLimit is:({}) and current qspCounter is:({})",\r
+                sbEndpoint, priorityIndex, qspLimit.get(), qspCounter.get());\r
+\r
+            // On re-iteration it should be processing same priority queue which was processed last only if qsp hasn't met\r
+            if (NpmUtils.isAllOkToProcess(qspLimit.get(), qspCounter.get(), connectionCounter.getOrDefault(sbEndpoint, 0), sbConnectionLimit)\r
+                && priorityExecState.containsKey(sbEndpoint) && priorityQueues.containsKey(priorityExecState.get(sbEndpoint))\r
+                && !priorityIndex.equals(priorityExecState.get(sbEndpoint))) {\r
+                logger.trace("Last execution state for sbEndpoint:({}) was for priority:({})", sbEndpoint, priorityExecState.get(sbEndpoint));\r
+                return;\r
+            }\r
+\r
+            logger.trace("------------- Iterating npmTransactions from priorityQueue -------------");\r
+            for (final NpmTransaction npmTransaction : npmTransactions) {\r
+                // Setting RequestID in MDC same as NPM Transaction RequestId\r
+                MDC.put(MDC_REQUEST_ID, npmTransaction.getRequestId());\r
+                // Should pick npmTransactions which are in QUEUED state and are not Expired\r
+                if (NpmStatusEnum.QUEUED.equals(npmTransaction.getStatus()) && !NpmUtils.isExpired(npmTransaction)\r
+                    && NpmUtils.isAllOkToProcess(qspLimit.get(), qspCounter.get(), connectionCounter.getOrDefault(sbEndpoint, 0), sbConnectionLimit)\r
+                    && invokeServiceCallbackApi(npmTransaction, true)) {\r
+\r
+                    logger.trace("------------- Updating priorityExecState and qspExecState -------------");\r
+                    priorityExecState.put(sbEndpoint, priorityIndex);\r
+                    qspExecState.put(qspStateKey, qspCounter.incrementAndGet());\r
+                    logger.trace("Updated priorityExecState for sbEndpoint:({}) with priority:({})", sbEndpoint, priorityIndex);\r
+                    logger.trace("Updated qspExecState for qspStateKey:({}) with qspCounter value:({})", qspStateKey, qspExecState.get(qspStateKey));\r
+                }\r
+            }\r
+            resetExecStates(sbEndpoint, sbType);\r
+        }\r
+    }\r
+\r
+    private boolean invokeServiceCallbackApi(NpmTransaction npmTransaction, boolean updateConnectionCounter) {\r
+        logger.trace("------------- Inside NPM SM invokeServiceCallbackApi -------------");\r
+        try {\r
+            logger.trace("Notifying Registered Service with serviceKey:({}) to process Npm Transaction with npmTransactionId:({})",\r
+                npmTransaction.getServiceKey(), npmTransaction.getNpmTransactionId());\r
+            //Setting the status as PROCESSING so that same won't be picked up again in processNpmPriorityQueues\r
+            npmTransaction.setStatus(NpmStatusEnum.PROCESSING);\r
+            serviceRegistry.get(npmTransaction.getServiceKey()).process(npmTransaction);\r
+            logger.trace("Notified Registered Service to process Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
+\r
+            if (updateConnectionCounter) {\r
+                updateConnectionCounter(npmTransaction.getSbEndpoint(), npmTransaction.getConnectionCount());\r
+            }\r
+        } catch (NpmException e) {\r
+            logger.error("Notifying Registered Service with serviceKey:({}) for npmTransactionId:({}) failed with ErrorMessage:({})",\r
+                npmTransaction.getServiceKey(), npmTransaction.getNpmTransactionId(), e.getMessage(), e);\r
+            removeTransactionFromQueue(npmTransaction, true);\r
+            return false;\r
+        }\r
+        return true;\r
+    }\r
+\r
+    /**\r
+     * Resetting Execution States only if QSP met for all priorities so that Npm can reiterate in Round Robin fashion.\r
+     */\r
+    private void resetExecStates(String sbEndpoint, String sbType) {\r
+        logger.trace("------------- Inside NPM SM resetExecStates -------------");\r
+        boolean temp = true;\r
+        final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(sbEndpoint, sbType);\r
+        for (int priority : NpmUtils.getPriorityList(getNpmConfig("Priority_List"))) {\r
+            if (npmPriorityQueues.containsKey(npmPriorityQueueKey) && npmPriorityQueues.get(npmPriorityQueueKey).containsKey(priority)\r
+                && !priorityExecState.containsValue(priority)) {\r
+                //Setting temp to false so that it won't cleanup priorityExecState and qspExecState as all priorityQueues hasn't been processed yet\r
+                logger.trace("Execution States won't be resetting for sbEndpoint:({}) as all priorityQueues hasn't been processed yet.", sbEndpoint);\r
+                temp = false;\r
+                break;\r
+            }\r
+        }\r
+        if (temp) {\r
+            for (int priority : NpmUtils.getPriorityList(getNpmConfig("Priority_List"))) {\r
+                logger.trace("Resetting Execution States for sbEndpoint:({}) as all priorityQueues processed and those needs to reiterate.", sbEndpoint);\r
+                priorityExecState.remove(sbEndpoint);\r
+                qspExecState.remove(sbEndpoint + "_" + priority);\r
+            }\r
+        }\r
+    }\r
+\r
+    private void updateConnectionCounter(String sbEndpoint, int connectionCounterValue) {\r
+        logger.trace("------------- Inside NPM SM updateConnectionCounter -------------");\r
+        //Updating connectionCounter value to be 0 or +ve integer whichever is larger\r
+        connectionCounter.computeIfPresent(sbEndpoint, (key, value) -> Math.max((value + connectionCounterValue), 0));\r
+        connectionCounter.computeIfAbsent(sbEndpoint, s -> Math.max(connectionCounterValue, 0));\r
+        logger.trace("For sbEndpoint:({}) updated connectionCounter value is:({}) ", sbEndpoint, connectionCounter.get(sbEndpoint));\r
+    }\r
+\r
+    private void loadProperties() throws NpmException {\r
+        logger.trace("------------- Inside NPM SM loadProperties -------------");\r
+        String propDir = System.getProperty(NpmConstants.SDNC_CONFIG_DIR);\r
+        if (StringUtils.isBlank(propDir)) {\r
+            propDir = System.getenv(NpmConstants.SDNC_CONFIG_DIR);\r
+        }\r
+        if (StringUtils.isBlank(propDir)) {\r
+            logger.warn("Environment variable:({}) is not set, defaulting properties directory to:({})",\r
+                    NpmConstants.SDNC_CONFIG_DIR, NpmConstants.DEFAULT_SDNC_CONFIG_DIR);\r
+            propDir = NpmConstants.DEFAULT_SDNC_CONFIG_DIR;\r
+        }\r
+        loadNpmConfig(propDir + File.separator + NpmConstants.NPM_CONFIG_PROPERTIES_FILE_NAME);\r
+    }\r
+\r
+    @Override\r
+    public void loadNpmConfig(String configFilePath) throws NpmException {\r
+        logger.trace("------------- Inside NPM SM loadNpmConfig -------------");\r
+        try {\r
+            logger.trace("Initializing NPM Configurations from:({})", configFilePath);\r
+            if (new File(configFilePath).exists()) {\r
+                npmConfigurations.load(new FileInputStream(configFilePath));\r
+            } else {\r
+                logger.warn("Config File:({}) not found, Initializing NPM with default configurations.", configFilePath);\r
+                configFilePath = "properties" + File.separator + NpmConstants.NPM_CONFIG_PROPERTIES_FILE_NAME;\r
+                npmConfigurations.load(getClass().getClassLoader().getResourceAsStream(configFilePath));\r
+            }\r
+            logger.trace("Initialized NPM with Configurations:({}) from configFilePath:({})", npmConfigurations, configFilePath);\r
+        } catch (IOException e) {\r
+            throw new NpmException(String.format("SDN-R Internal Error: Failed to load NPM Configurations form:(%s)", configFilePath), e);\r
+        }\r
+    }\r
+\r
+}\r
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionService.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionService.java
new file mode 100644 (file)
index 0000000..45777b8
--- /dev/null
@@ -0,0 +1,158 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import java.util.Map;\r
+import java.util.NavigableSet;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+\r
+import java.time.Instant;\r
+import java.util.List;\r
+import java.util.UUID;\r
+\r
+/**\r
+ * The interface Npm Transaction Service provider.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public interface NpmTransactionService {\r
+\r
+    /**\r
+     * Form npm transaction npm transaction with all mandatory/non-null parameters.\r
+     *\r
+     * @param sbEndpoint     the sb endpoint (Host IP Address)\r
+     * @param sbType         the sb type (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc)\r
+     * @param serviceKey     the service key (the unique serviceKey specific to service reference)\r
+     * @param serviceRequest the service request (actual request payload received from upstream)\r
+     *\r
+     * @return the npm transaction\r
+     */\r
+    NpmTransaction formNpmTransaction(String sbEndpoint, String sbType, String serviceKey, Object serviceRequest);\r
+\r
+    /**\r
+     * Form npm transaction npm transaction with all NPM header Information (nullable and non-nullable)\r
+     *\r
+     * @param npmTransactionId      the npmTransactionId (instance of UUID :: defaults to Random UUID if null)\r
+     * @param sbEndpoint            the sbEndpoint (Host IP Address :: used to create priority queues)\r
+     * @param sbType                the sbType (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc :: used to determine parallel connections it can support )\r
+     * @param priority              the priority (n :: lowest index is the highest priority, defaults to (least priority) if -1\r
+     * @param connectionCount       the connectionCount (n :: total number of connection a transaction would occupy while processing defaults to (1) if -1\r
+     * @param timestamp             the timestamp (instance of Instant :: defaults to Current Time if null)\r
+     * @param timeToLive            the timeToLive (instance of Instant :: defaults to MAX Time if null)\r
+     * @param serviceKey            the serviceKey (the unique serviceKey specific to service reference)\r
+     * @param serviceRequest        the serviceRequest (actual request payload received from upstream)\r
+     *\r
+     * @return the npm transaction\r
+     */\r
+    NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,\r
+        Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest);\r
+\r
+    /**\r
+     * Add Transactions to queue : called by all services to get Transactions added to respective priority queue\r
+     *\r
+     * @param npmTransactionList the NpmTransaction list\r
+     *\r
+     * @return the list of NpmAck with status and message\r
+     */\r
+    List<NpmAck> addTransactionsToQueue(List<NpmTransaction> npmTransactionList);\r
+\r
+    /**\r
+     * Add Transaction to queue : called by all services to get Transaction added to respective priority queue\r
+     *\r
+     * @param npmTransaction the NpmTransaction\r
+     *\r
+     * @return the NpmAck with status and message\r
+     */\r
+    NpmAck addTransactionsToQueue(NpmTransaction npmTransaction);\r
+\r
+    /**\r
+     * Remove Transactions from queue : called by all services to get Transactions removed from priority queue\r
+     *\r
+     * @param npmTransactionList the NpmTransaction list\r
+     *\r
+     * @return the list of NpmAck with status and message\r
+     */\r
+    List<NpmAck> removeTransactionsFromQueue(List<NpmTransaction> npmTransactionList);\r
+\r
+    /**\r
+     * Remove Transaction from queue : called by all services to get Transaction removed from priority queue\r
+     *\r
+     * @param npmTransaction the NpmTransaction\r
+     *\r
+     * @return the NpmAck with status and message\r
+     */\r
+    NpmAck removeTransactionsFromQueue(NpmTransaction npmTransaction);\r
+\r
+    /**\r
+     * Retrieve transaction from queue list.\r
+     *\r
+     * @param sbEndpoint the sb endpoint\r
+     * @param sbType     the sb type\r
+     *\r
+     * @return the list\r
+     */\r
+    List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType);\r
+\r
+    /**\r
+     * Retrieve NPM priority queues map.\r
+     *\r
+     * @return the map\r
+     * <pre>\r
+     * npmPriorityQueues        : Map [String:sbEndpoint##sbType, Map:sbPriorityQueues]\r
+     *     sbPriorityQueues     : Map [int:priority, TreeSet:priorityQueue]\r
+     *         priorityQueue    : NavigableSet [Contains the NpmTransaction Object]\r
+     * </pre>\r
+     */\r
+    Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues();\r
+\r
+    /**\r
+     * Register service : must be called by all services to register with Npm\r
+     *\r
+     * @param serviceKey            the unique serviceKey specific to service reference\r
+     * @param npmServiceCallbackApi the instance of service class implementing NpmServiceCallbackApi\r
+     */\r
+    void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi);\r
+\r
+    /**\r
+     * Load npm config boolean.\r
+     * <pre>\r
+     * Default properties:\r
+     *      <b>Priority_List=0,1,2</b>     Total possible priorities (lowest index is the highest priority)\r
+     *      <b>Default_Priority=2</b>      Default Priority value to be set if missing in request\r
+     *      <b>EMS_ERICSSON=2</b>          Maximum number of parallel connection that ERICSSON manufactured EMS can support\r
+     *      <b>EMS_NOKIA=2</b>             Maximum number of parallel connection that NOKIA manufactured EMS can support\r
+     *      <b>queue_capacity_0=10</b>     Total capacity of queue (maximum number of transactions a queue can hold) with priority 0\r
+     *      <b>queue_capacity_1=7</b>      Total capacity of queue (maximum number of transactions a queue can hold) with priority 1\r
+     *      <b>queue_capacity_2=5</b>      Total capacity of queue (maximum number of transactions a queue can hold) with priority 2\r
+     *      <b>qsp_limit_0=5</b>           Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+     *      <b>qsp_limit_1=3</b>           Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+     *      <b>qsp_limit_2=2</b>           Maximum number of transactions that can be processed from a queue before jumping to lower priority queues\r
+     * </pre>\r
+     *\r
+     * @param configFilePath the Config File Name\r
+     *\r
+     * @return the boolean\r
+     */\r
+    boolean loadNpmConfig(String configFilePath);\r
+\r
+}\r
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceImpl.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceImpl.java
new file mode 100644 (file)
index 0000000..5483aac
--- /dev/null
@@ -0,0 +1,168 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.api;\r
+\r
+import java.util.Map;\r
+import java.util.NavigableSet;\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;\r
+import org.apache.commons.lang3.math.NumberUtils;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.time.Instant;\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.List;\r
+import java.util.UUID;\r
+\r
+/**\r
+ * The interface Npm Transaction Service provider.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmTransactionServiceImpl implements NpmTransactionService {\r
+    private static final Logger logger = LoggerFactory.getLogger(NpmTransactionServiceImpl.class);\r
+    private final NpmServiceManager npmServiceManager;\r
+\r
+    public NpmTransactionServiceImpl(NpmServiceManager npmServiceManager) {\r
+        this.npmServiceManager = npmServiceManager;\r
+    }\r
+\r
+    @Override\r
+    public List<NpmAck> addTransactionsToQueue(List<NpmTransaction> npmTransactionList) {\r
+        logger.debug("------------- Inside NPM TS addTransactionsToQueue (List<NpmAck>) -------------");\r
+        logger.trace("Received addTransactionsToQueue(List) for Npm Transactions:({})", npmTransactionList);\r
+\r
+        List<NpmAck> npmAckList = new ArrayList<>();\r
+        for (NpmTransaction npmTransaction : npmTransactionList) {\r
+            logger.trace("Npm Transaction with npmTransactionId:({}) is being queued.", npmTransaction.getNpmTransactionId());\r
+            npmAckList.add(addTransactionsToQueue(npmTransaction));\r
+        }\r
+        logger.trace("Responding with npmAckList:({})", npmAckList);\r
+        return npmAckList;\r
+    }\r
+\r
+    @Override\r
+    public NpmAck addTransactionsToQueue(NpmTransaction npmTransaction) {\r
+        logger.debug("------------- Inside NPM TS addTransactionsToQueue -------------");\r
+        logger.trace("Received addTransactionsToQueue for Npm Transaction:({})", npmTransaction);\r
+\r
+        NpmAck npmAck = new NpmAck();\r
+        npmAck.setNpmTransactionId(npmTransaction.getNpmTransactionId());\r
+        npmAck.setRequestId(npmTransaction.getRequestId());\r
+        try {\r
+            //Validate Npm Transaction before creating entry to NPM_TRANSACTION Table.\r
+            npmTransaction.validate();\r
+            if (Arrays.stream(NpmUtils.getPriorityList(npmServiceManager.getNpmConfig("Priority_List"))).noneMatch(i -> i == npmTransaction.getPriority())) {\r
+                // Setting up the configured default priority value it it's missing in request\r
+                npmTransaction.setPriority(NumberUtils.toInt(npmServiceManager.getNpmConfig("Default_Priority"), 2));\r
+                logger.trace("Default priority value:({}) has been set, as it's missing in request.", npmTransaction.getPriority());\r
+            }\r
+            logger.trace("Trying to queue Npm Transaction");\r
+            npmServiceManager.addTransactionToQueue(npmTransaction);\r
+            npmAck.setStatus(NpmStatusEnum.QUEUED);\r
+            npmAck.setMessage("Added to Priority Queue.");\r
+        } catch (NpmException e) {\r
+            logger.error("Failed to queue Npm Transaction.\nErrorMessage: {}", e.getMessage(), e);\r
+            npmAck.setStatus(NpmStatusEnum.FAILED);\r
+            npmAck.setMessage(e.getMessage());\r
+        }\r
+        return npmAck;\r
+    }\r
+\r
+    @Override\r
+    public List<NpmAck> removeTransactionsFromQueue(List<NpmTransaction> npmTransactionList) {\r
+        logger.debug("------------- Inside NPM TS removeTransactionsFromQueue (List<NpmAck>) -------------");\r
+        logger.trace("Received removeTransactionsFromQueue for Npm Transactions:({})", npmTransactionList);\r
+\r
+        List<NpmAck> npmAckList = new ArrayList<>();\r
+        for (NpmTransaction npmTransaction : npmTransactionList) {\r
+            npmAckList.add(removeTransactionsFromQueue(npmTransaction));\r
+        }\r
+        logger.trace("Responding with npmAckList:({})", npmAckList);\r
+        return npmAckList;\r
+    }\r
+\r
+    @Override\r
+    public NpmAck removeTransactionsFromQueue(NpmTransaction npmTransaction) {\r
+        logger.debug("------------- Inside NPM TS removeTransactionsFromQueue -------------");\r
+        logger.trace("Received removeTransactionsFromQueue for Npm Transactions:({})", npmTransaction);\r
+\r
+        npmServiceManager.removeTransactionFromQueue(npmTransaction, true);\r
+\r
+        NpmAck npmAck = new NpmAck();\r
+        npmAck.setNpmTransactionId(npmTransaction.getNpmTransactionId());\r
+        npmAck.setRequestId(npmTransaction.getRequestId());\r
+        npmAck.setStatus(NpmStatusEnum.PROCESSED);\r
+        npmAck.setMessage("Removed from Priority Queue");\r
+        logger.trace("Responding with npmAck:({})", npmAck);\r
+        return npmAck;\r
+    }\r
+\r
+    @Override\r
+    public List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType) {\r
+        logger.debug("------------- Inside NPM TS retrieveQueueStatus (List<NpmTransaction>) -------------");\r
+        logger.trace("Received retrieveTransactionFromQueue for sbEndpoint:({}) and sbType:({})", sbEndpoint, sbType);\r
+        return npmServiceManager.retrieveTransactionFromQueue(sbEndpoint, sbType);\r
+    }\r
+\r
+    @Override\r
+    public Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues() {\r
+        logger.debug("------------- Inside NPM TS retrieveAllPriorityQueues (Map<sbEndpoint, Map<priority, NavigableSet<NpmTransaction>>>) -------------");\r
+        return npmServiceManager.retrieveNpmPriorityQueues();\r
+    }\r
+\r
+    @Override\r
+    public void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi) {\r
+        logger.trace("Registering NpmServiceCallbackApi with serviceKey:({})", serviceKey);\r
+        npmServiceManager.registerService(serviceKey, npmServiceCallbackApi);\r
+    }\r
+\r
+    @Override\r
+    public boolean loadNpmConfig(String configFilePath) {\r
+        try {\r
+            npmServiceManager.loadNpmConfig(configFilePath);\r
+            return true;\r
+        } catch (NpmException e) {\r
+            logger.trace("Loading configurations from file:({}), failed with:({}): ", configFilePath, e.getMessage(), e);\r
+        }\r
+        return false;\r
+    }\r
+\r
+    @Override\r
+    public NpmTransaction formNpmTransaction(String sbEndpoint, String sbType, String serviceKey, Object serviceRequest) {\r
+        return NpmUtils.formNpmTransaction(null, sbEndpoint, sbType, -1, -1,\r
+            null, null, serviceKey, serviceRequest);\r
+    }\r
+\r
+    @Override\r
+    public NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,\r
+        Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest) {\r
+        return NpmUtils.formNpmTransaction(npmTransactionId, sbEndpoint, sbType, priority, connectionCount,\r
+            timestamp, timeToLive, serviceKey, serviceRequest);\r
+    }\r
+\r
+}\r
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmAck.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmAck.java
new file mode 100644 (file)
index 0000000..e635df8
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.models;
+
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;
+
+import java.util.UUID;
+
+/**
+ * The type Npm Ack.
+ *
+ * @author Kapil Singal
+ */
+public class NpmAck {
+    private UUID npmTransactionId;
+    private String requestId; // multiple transactions can have the same requestId
+    private NpmStatusEnum status;
+    private String message;
+
+    public UUID getNpmTransactionId() {
+        return npmTransactionId;
+    }
+
+    public void setNpmTransactionId(UUID npmTransactionId) {
+        this.npmTransactionId = npmTransactionId;
+    }
+
+    public String getRequestId() {
+        return requestId;
+    }
+
+    public void setRequestId(String requestId) {
+        this.requestId = requestId;
+    }
+
+    public NpmStatusEnum getStatus() {
+        return status;
+    }
+
+    public void setStatus(NpmStatusEnum status) {
+        this.status = status;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    @Override
+    public String toString() {
+        return NpmUtils.getJson(this);
+    }
+
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmStatusEnum.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmStatusEnum.java
new file mode 100644 (file)
index 0000000..1fb4bfd
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.models;
+
+/**
+ * The enum Transaction status enum.
+ *
+ * @author Kapil Singal
+ */
+public enum NpmStatusEnum {
+    /**
+     * RECEIVED When Npm receives Npm Transaction
+     */
+    RECEIVED,
+    /**
+     * QUEUED When Npm adds Npm Transaction to a priority queue
+     */
+    QUEUED,
+    /**
+     * PROCESSING When Npm pulls and notifies Npm Transaction to service.
+     */
+    PROCESSING,
+    /**
+     * PROCESSED When Service notifies back Npm about processing done.
+     */
+    PROCESSED,
+    /**
+     * FAILED When Npm fails to either queue or notify Npm Transaction.
+     */
+    FAILED,
+    /**
+     * EXPIRED When timeToLive passes current UTC time.
+     */
+    EXPIRED,
+    /**
+     * OUT_OF_CAPACITY When priority queue is full to it's maximum capacity.
+     */
+    OUT_OF_CAPACITY,
+}
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmTransaction.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/models/NpmTransaction.java
new file mode 100644 (file)
index 0000000..297d63b
--- /dev/null
@@ -0,0 +1,204 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.models;\r
+\r
+import org.onap.ccsdk.features.lib.npm.NpmException;\r
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;\r
+import org.apache.commons.lang3.StringUtils;\r
+\r
+import java.time.Instant;\r
+import java.util.Objects;\r
+import java.util.UUID;\r
+\r
+/**\r
+ * The type Npm Transaction.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmTransaction {\r
+\r
+    private UUID npmTransactionId;\r
+    private String requestId; // multiple transactions can have the same requestId\r
+\r
+    private String sbEndpoint;\r
+    private String sbType;\r
+\r
+    private int priority = -1;\r
+    private int connectionCount = 1;\r
+\r
+    private Instant timestamp = Instant.now();\r
+    private Instant timeToLive = Instant.MAX;\r
+\r
+    private NpmStatusEnum status = NpmStatusEnum.RECEIVED;\r
+    private String message;\r
+\r
+    private String serviceKey;\r
+    private Object serviceRequest;\r
+\r
+    public UUID getNpmTransactionId() {\r
+        return npmTransactionId;\r
+    }\r
+\r
+    public void setNpmTransactionId(UUID npmTransactionId) {\r
+        this.npmTransactionId = npmTransactionId;\r
+    }\r
+\r
+    public String getRequestId() {\r
+        return requestId;\r
+    }\r
+\r
+    public void setRequestId(String requestId) {\r
+        this.requestId = requestId;\r
+    }\r
+\r
+    public String getSbEndpoint() {\r
+        return sbEndpoint;\r
+    }\r
+\r
+    public void setSbEndpoint(String sbEndpoint) {\r
+        this.sbEndpoint = sbEndpoint;\r
+    }\r
+\r
+    public String getSbType() {\r
+        return sbType;\r
+    }\r
+\r
+    public void setSbType(String sbType) {\r
+        this.sbType = sbType;\r
+    }\r
+\r
+    public int getPriority() {\r
+        return priority;\r
+    }\r
+\r
+    public void setPriority(int priority) {\r
+        this.priority = priority;\r
+    }\r
+\r
+    public int getConnectionCount() {\r
+        return connectionCount;\r
+    }\r
+\r
+    public void setConnectionCount(int connectionCount) {\r
+        this.connectionCount = connectionCount;\r
+    }\r
+\r
+    public Instant getTimestamp() {\r
+        return timestamp;\r
+    }\r
+\r
+    public void setTimestamp(Instant timestamp) {\r
+        this.timestamp = timestamp;\r
+    }\r
+\r
+    public Instant getTimeToLive() {\r
+        return timeToLive;\r
+    }\r
+\r
+    public void setTimeToLive(Instant timeToLive) {\r
+        this.timeToLive = timeToLive;\r
+    }\r
+\r
+    public NpmStatusEnum getStatus() {\r
+        return status;\r
+    }\r
+\r
+    public void setStatus(NpmStatusEnum status) {\r
+        this.status = status;\r
+    }\r
+\r
+    public String getMessage() {\r
+        return message;\r
+    }\r
+\r
+    public void setMessage(String message) {\r
+        this.message = message;\r
+    }\r
+\r
+    public String getServiceKey() {\r
+        return serviceKey;\r
+    }\r
+\r
+    public void setServiceKey(String serviceKey) {\r
+        this.serviceKey = serviceKey;\r
+    }\r
+\r
+    public Object getServiceRequest() {\r
+        return serviceRequest;\r
+    }\r
+\r
+    public void setServiceRequest(Object serviceRequest) {\r
+        this.serviceRequest = serviceRequest;\r
+    }\r
+\r
+    /**\r
+     * Validate boolean.\r
+     *\r
+     * @throws NpmException the validator exception\r
+     */\r
+    public void validate() throws NpmException {\r
+        if (npmTransactionId == null) {\r
+            throw new NpmException("Transaction is not valid: npmTransactionId is required.");\r
+        }\r
+        if (StringUtils.isBlank(sbEndpoint)) {\r
+            throw new NpmException("Transaction is not valid: sbEndpoint is required.");\r
+        }\r
+        if (StringUtils.isBlank(sbType)) {\r
+            throw new NpmException("Transaction is not valid: sbType is required.");\r
+        }\r
+        if (timestamp == null) {\r
+            throw new NpmException("Transaction is not valid: txTimestamp is required.");\r
+        }\r
+        if (timeToLive == null) {\r
+            throw new NpmException("Transaction is not valid: timeToLive is required.");\r
+        }\r
+        if (StringUtils.isBlank(serviceKey)) {\r
+            throw new NpmException("Transaction is not valid: serviceKey is required.");\r
+        }\r
+        if (serviceRequest == null) {\r
+            throw new NpmException("Transaction is not valid: serviceRequest is required.");\r
+        }\r
+    }\r
+\r
+    @Override\r
+    public String toString() {\r
+        return NpmUtils.getJson(this);\r
+    }\r
+\r
+    @Override\r
+    public boolean equals(Object o) {\r
+        if (this == o) {\r
+            return true;\r
+        }\r
+        if (!(o instanceof NpmTransaction)) {\r
+            return false;\r
+        }\r
+        NpmTransaction that = (NpmTransaction) o;\r
+        return npmTransactionId.equals(that.npmTransactionId);\r
+    }\r
+\r
+    @Override\r
+    public int hashCode() {\r
+        return Math.abs(Objects.hash(npmTransactionId));\r
+    }\r
+\r
+}\r
diff --git a/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtils.java b/lib/network-prioritization/src/main/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtils.java
new file mode 100644 (file)
index 0000000..735d6d9
--- /dev/null
@@ -0,0 +1,178 @@
+/*\r
+ * ============LICENSE_START=======================================================\r
+ * ONAP : ccsdk features\r
+ * ================================================================================\r
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *     http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=======================================================\r
+ *\r
+ */\r
+\r
+package org.onap.ccsdk.features.lib.npm.utils;\r
+\r
+import com.fasterxml.jackson.core.JsonProcessingException;\r
+import com.fasterxml.jackson.databind.ObjectMapper;\r
+import com.fasterxml.jackson.databind.SerializationFeature;\r
+import com.fasterxml.jackson.databind.node.ArrayNode;\r
+import org.apache.commons.lang3.StringUtils;\r
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.time.Instant;\r
+import java.util.Collections;\r
+import java.util.List;\r
+import java.util.UUID;\r
+import java.util.stream.Stream;\r
+\r
+/**\r
+ * The type Npm utils.\r
+ *\r
+ * @author Kapil Singal\r
+ */\r
+public class NpmUtils {\r
+\r
+    private static final Logger logger = LoggerFactory.getLogger(NpmUtils.class);\r
+\r
+    private NpmUtils() {\r
+    }\r
+\r
+    /**\r
+     * This is a getJson method\r
+     *\r
+     * @param instance the instance\r
+     *\r
+     * @return String json\r
+     */\r
+    public static String getJson(Object instance) {\r
+        try {\r
+            ObjectMapper mapper = new ObjectMapper();\r
+            mapper.enable(SerializationFeature.INDENT_OUTPUT);\r
+            return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(instance);\r
+        } catch (JsonProcessingException e) {\r
+            e.printStackTrace();\r
+        }\r
+        return null;\r
+    }\r
+\r
+    /**\r
+     * Gets list from json string.\r
+     *\r
+     * @param <T>       the type parameter\r
+     * @param content   the content\r
+     * @param valueType the value type\r
+     *\r
+     * @return the list of type parameter from json string\r
+     */\r
+    public static <T> List<T> getListFromJsonString(String content, Class<T> valueType) {\r
+        try {\r
+            ObjectMapper mapper = new ObjectMapper();\r
+            if (mapper.readTree(content) instanceof ArrayNode) {\r
+                return mapper.readValue(content, mapper.getTypeFactory().constructCollectionType(List.class, valueType));\r
+            } else {\r
+                return Collections.singletonList(mapper.readValue(content, valueType));\r
+            }\r
+        } catch (Exception e) {\r
+            logger.warn(e.getMessage(), e);\r
+        }\r
+        return Collections.emptyList();\r
+    }\r
+\r
+    /**\r
+     * Form npm transaction npm transaction.\r
+     *\r
+     * @param npmTransactionId      the npmTransactionId (instance of UUID :: defaults to Random UUID if null)\r
+     * @param sbEndpoint            the sbEndpoint (Host IP Address :: used to create priority queues)\r
+     * @param sbType                the sbType (eg. EMS_ERICSSON, EMS_NOKIA, D2_MSN etc :: used to determine parallel connections it can support )\r
+     * @param priority              the priority (n :: lowest index is the highest priority, defaults to (least priority) if -1\r
+     * @param connectionCount       the connectionCount (n :: total number of connection a transaction would occupy while processing defaults to (1) if -1\r
+     * @param timestamp             the timestamp (instance of Instant :: defaults to Current Time if null)\r
+     * @param timeToLive            the timeToLive (instance of Instant :: defaults to MAX Time if null)\r
+     * @param serviceKey            the serviceKey (the unique serviceKey specific to service reference)\r
+     * @param serviceRequest        the serviceRequest (actual request payload received from upstream)\r
+     *\r
+     * @return the npm transaction\r
+     */\r
+    public static NpmTransaction formNpmTransaction(UUID npmTransactionId, String sbEndpoint, String sbType, int priority, int connectionCount,\r
+        Instant timestamp, Instant timeToLive, String serviceKey, Object serviceRequest) {\r
+\r
+        NpmTransaction npmTransaction = new NpmTransaction();\r
+        npmTransaction.setNpmTransactionId(npmTransactionId == null ? UUID.randomUUID() : npmTransactionId);\r
+        npmTransaction.setSbEndpoint(sbEndpoint);\r
+        npmTransaction.setSbType(sbType);\r
+        if (priority > -1) {\r
+            npmTransaction.setPriority(priority);\r
+        }\r
+        if (connectionCount > 0) {\r
+            npmTransaction.setConnectionCount(connectionCount);\r
+        }\r
+        npmTransaction.setTimestamp(timestamp == null ? Instant.now() : timestamp);\r
+        npmTransaction.setTimeToLive(timeToLive == null ? Instant.MAX : timeToLive);\r
+        npmTransaction.setServiceKey(serviceKey);\r
+        npmTransaction.setServiceRequest(serviceRequest);\r
+\r
+        return npmTransaction;\r
+    }\r
+\r
+    /**\r
+     * This is a isAllOkToProcess method\r
+     *\r
+     * @param qspLimit          the queue serve period limit\r
+     * @param qspCounter        the queue serve period counter\r
+     * @param connectionCounter the connection counter for sb_endpoint\r
+     * @param sbConnectionLimit the sb_endpoint parallel connection limit\r
+     *\r
+     * @return  true if: queue serve period is not met and sbEndpoint is still having connection slot empty\r
+     *          <p>\r
+     *          false if: queue serve period is reached to max limit or particular SB (sbEndpoint) is already occupied to max connection limit\r
+     */\r
+    public static boolean isAllOkToProcess(int qspLimit, int qspCounter, int connectionCounter, int sbConnectionLimit) {\r
+        return qspLimit > qspCounter && connectionCounter < sbConnectionLimit;\r
+    }\r
+\r
+    /**\r
+     * Is expired boolean.\r
+     *\r
+     * @param npmTransaction the NpmTransaction instance\r
+     *\r
+     * @return true if timeToLive is passed than current UTC Time else false\r
+     */\r
+    public static boolean isExpired(NpmTransaction npmTransaction) {\r
+        return npmTransaction != null && npmTransaction.getTimeToLive().compareTo(Instant.now()) <= 0;\r
+    }\r
+\r
+    /**\r
+     * Get priority list.\r
+     *\r
+     * @param priorities defined Property_List from properties file\r
+     *\r
+     * @return the int [priorities]\r
+     */\r
+    public static int[] getPriorityList(String priorities) {\r
+        return Stream.of(StringUtils.defaultIfBlank(priorities, "0,1,2").split(",")).mapToInt(Integer::parseInt).toArray();\r
+    }\r
+\r
+    /**\r
+     * Gets npm priority queue key.\r
+     *\r
+     * @param sbEndpoint the sb endpoint\r
+     * @param sbType     the sb type\r
+     *\r
+     * @return the npm priority queue key\r
+     */\r
+    public static String getNpmPriorityQueueKey(String sbEndpoint, String sbType) {\r
+        return sbEndpoint.concat("##").concat(sbType);\r
+    }\r
+\r
+}\r
diff --git a/lib/network-prioritization/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/lib/network-prioritization/src/main/resources/OSGI-INF/blueprint/blueprint.xml
new file mode 100644 (file)
index 0000000..6ad1bc7
--- /dev/null
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ ONAP : ccsdk features
+  ~ ================================================================================
+  ~ Update Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=======================================================
+  ~
+  -->
+
+<blueprint xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+           xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           odl:use-default-for-reference-types="true">
+
+    <bean id="npmServiceManager" class="org.onap.ccsdk.features.lib.npm.api.NpmServiceManagerImpl"/>
+
+    <bean id="npmTransactionService" class="org.onap.ccsdk.features.lib.npm.api.NpmTransactionServiceImpl">
+        <argument ref="npmServiceManager"/>
+    </bean>
+
+    <service ref="npmTransactionService" interface="org.onap.ccsdk.features.lib.npm.api.NpmTransactionService"/>
+
+</blueprint>
diff --git a/lib/network-prioritization/src/main/resources/properties/npm-config.properties b/lib/network-prioritization/src/main/resources/properties/npm-config.properties
new file mode 100644 (file)
index 0000000..9df556a
--- /dev/null
@@ -0,0 +1,10 @@
+Priority_List=0,1,2
+Default_Priority=2
+EMS_ERICSSON=2
+EMS_NOKIA=2
+queue_capacity_0=10
+queue_capacity_1=7
+queue_capacity_2=5
+qsp_limit_0=5
+qsp_limit_1=3
+qsp_limit_2=2
diff --git a/lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackHandler.java b/lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/api/NpmServiceCallbackHandler.java
new file mode 100644 (file)
index 0000000..6682f8e
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NpmServiceCallbackHandler implements NpmServiceCallbackApi {
+    private static final Logger logger = LoggerFactory.getLogger(NpmServiceCallbackApi.class);
+
+    private final NpmTransactionService npmTransactionService;
+
+    public NpmServiceCallbackHandler(NpmTransactionService npmTransactionService) {
+        this.npmTransactionService = npmTransactionService;
+        npmTransactionService.registerService("npmServiceCallbackHandler", this);
+    }
+
+    @Override
+    public void process(NpmTransaction npmTransaction) {
+        logger.debug("Received NpmTransaction with npmTransactionId ({}) to process.", npmTransaction.getNpmTransactionId());
+        logger.debug("NpmTransaction serviceRequest ({}).", NpmUtils.getJson(npmTransaction.getServiceRequest()));
+        removeTransactionsFromQueue(npmTransaction);
+    }
+
+    private void removeTransactionsFromQueue(NpmTransaction npmTransaction) {
+        NpmAck npmAck = npmTransactionService.removeTransactionsFromQueue(npmTransaction);
+        logger.debug("Removed NpmTransaction with npmAck ({}) after processing.", npmAck);
+    }
+
+}
diff --git a/lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceTest.java b/lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/api/NpmTransactionServiceTest.java
new file mode 100644 (file)
index 0000000..ae7a9a6
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.api;
+
+import org.junit.BeforeClass;
+import org.onap.ccsdk.features.lib.npm.NpmConstants;
+import org.onap.ccsdk.features.lib.npm.models.NpmAck;
+import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;
+import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NpmTransactionServiceTest {
+
+    private static NpmTransactionServiceImpl npmTransactionService;
+
+    private List<NpmTransaction> npmTransactionList;
+
+    @BeforeClass
+    public static void once() throws Exception {
+        System.setProperty(NpmConstants.SDNC_CONFIG_DIR, "src/test/resources/properties");
+        npmTransactionService = new NpmTransactionServiceImpl(new NpmServiceManagerImpl());
+
+        npmTransactionService.registerService("npmServiceCallbackHandler", new NpmServiceCallbackHandler(npmTransactionService));
+    }
+
+    @Before
+    public void before() throws Exception {
+
+        npmTransactionList = new ArrayList<>();
+        npmTransactionList.add(npmTransactionService.formNpmTransaction(null,
+                "1.1.1.1",
+                "EMS_ERICSSON",
+                0,
+                1,
+                Instant.now(),
+                Instant.MAX,
+                "npmServiceCallbackHandler",
+                new ObjectMapper().readTree("{\"attr\": \"EMS_ERICSSON_1\"}")));
+        npmTransactionList.add(npmTransactionService.formNpmTransaction(null,
+                "1.1.1.1",
+                "EMS_ERICSSON",
+                0,
+                1,
+                Instant.now(),
+                Instant.MAX,
+                "npmServiceCallbackHandler",
+                new ObjectMapper().readTree("{\"attr\": \"EMS_ERICSSON_2\"}")));
+
+        npmTransactionList.add(npmTransactionService.formNpmTransaction(null,
+                "2.2.2.2",
+                "EMS_NOKIA",
+                0,
+                1,
+                Instant.now(),
+                Instant.MAX,
+                "npmServiceCallbackHandler",
+                new ObjectMapper().readTree("{\"attr\": \"EMS_NOKIA_1\"}")));
+        npmTransactionList.add(npmTransactionService.formNpmTransaction(null,
+                "2.2.2.2",
+                "EMS_NOKIA",
+                0,
+                1,
+                Instant.now(),
+                Instant.MAX,
+                "npmServiceCallbackHandler",
+                new ObjectMapper().readTree("{\"attr\": \"EMS_NOKIA_2\"}")));
+    }
+
+    @Test
+    public void addTransactionsToQueue_validTransaction() {
+        List<NpmAck> npmAckList = npmTransactionService.addTransactionsToQueue(npmTransactionList);
+        assertEquals(npmTransactionList.size(), npmAckList.size());
+        assertEquals(NpmStatusEnum.QUEUED, npmAckList.get(0).getStatus());
+        assertEquals(NpmStatusEnum.QUEUED, npmAckList.get(1).getStatus());
+    }
+
+    @Test
+    public void addTransactionsToQueue_invalidTransaction() {
+        npmTransactionList.get(0).setServiceRequest(null);
+        NpmAck npmAck = npmTransactionService.addTransactionsToQueue(npmTransactionList.get(0));
+        assertEquals(NpmStatusEnum.FAILED, npmAck.getStatus());
+    }
+
+}
\ No newline at end of file
diff --git a/lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtilTest.java b/lib/network-prioritization/src/test/java/org/onap/ccsdk/features/lib/npm/utils/NpmUtilTest.java
new file mode 100644 (file)
index 0000000..491984a
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+
+package org.onap.ccsdk.features.lib.npm.utils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class NpmUtilTest {
+
+    @Test
+    public void testGetListFromJsonString() {
+        String content = "{\"key\":\"value\"}";
+        List<JsonNode> jsonNodeForString = NpmUtils.getListFromJsonString(content, JsonNode.class);
+        assertEquals(1, jsonNodeForString.size());
+
+        content = "[{\"key\":\"value\"}, {\"key\":\"value2\"}]";
+        jsonNodeForString = NpmUtils.getListFromJsonString(content, JsonNode.class);
+        assertEquals(2, jsonNodeForString.size());
+    }
+
+}
diff --git a/lib/network-prioritization/src/test/resources/log4j.xml b/lib/network-prioritization/src/test/resources/log4j.xml
new file mode 100644 (file)
index 0000000..4289ba9
--- /dev/null
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ ONAP : ccsdk features
+  ~ ================================================================================
+  ~ Update Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=======================================================
+  ~
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="console" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%-5p %c{1} - %m%n"/>
+        </layout>
+    </appender>
+    <root>
+        <priority value="trace" />
+        <appender-ref ref="console" />
+    </root>
+</log4j:configuration>
\ No newline at end of file
diff --git a/lib/network-prioritization/src/test/resources/properties/npm-config.properties b/lib/network-prioritization/src/test/resources/properties/npm-config.properties
new file mode 100644 (file)
index 0000000..096d2d2
--- /dev/null
@@ -0,0 +1,11 @@
+Env_Type=solo
+Priority_List=0,1,2
+Default_Priority=2
+EMS_ERICSSON=2
+EMS_NOKIA=2
+queue_capacity_0=10
+queue_capacity_1=7
+queue_capacity_2=5
+qsp_limit_0=5
+qsp_limit_1=3
+qsp_limit_2=2
diff --git a/pom.xml b/pom.xml
index 6ddf225..e50b13d 100755 (executable)
--- a/pom.xml
+++ b/pom.xml
@@ -49,6 +49,7 @@
         <module>aafshiro</module>
         <module>lib/rlock</module>
         <module>lib/doorman</module>
+        <module>lib/network-prioritization</module>
     </modules>
 
     <scm>