2cdef353772dfabf60cc6824a00c34eccbb200b3
[ccsdk/features.git] /
1 /*\r
2  * ============LICENSE_START=======================================================\r
3  * ONAP : ccsdk features\r
4  * ================================================================================\r
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * you may not use this file except in compliance with the License.\r
9  * You may obtain a copy of the License at\r
10  *\r
11  *     http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  * Unless required by applicable law or agreed to in writing, software\r
14  * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * See the License for the specific language governing permissions and\r
17  * limitations under the License.\r
18  * ============LICENSE_END=======================================================\r
19  *\r
20  */\r
21 \r
22 package org.onap.ccsdk.features.lib.npm.api;\r
23 \r
24 import java.util.ArrayList;\r
25 import java.util.List;\r
26 import org.onap.ccsdk.features.lib.npm.NpmConstants;\r
27 import org.onap.ccsdk.features.lib.npm.NpmException;\r
28 import org.onap.ccsdk.features.lib.npm.models.NpmStatusEnum;\r
29 import org.onap.ccsdk.features.lib.npm.models.NpmTransaction;\r
30 import org.onap.ccsdk.features.lib.npm.utils.NpmUtils;\r
31 import org.apache.commons.lang3.StringUtils;\r
32 import org.apache.commons.lang3.math.NumberUtils;\r
33 import org.slf4j.Logger;\r
34 import org.slf4j.LoggerFactory;\r
35 \r
36 import java.io.File;\r
37 import java.io.FileInputStream;\r
38 import java.io.IOException;\r
39 import java.util.Comparator;\r
40 import java.util.Iterator;\r
41 import java.util.Map;\r
42 import java.util.NavigableSet;\r
43 import java.util.Properties;\r
44 import java.util.TreeMap;\r
45 import java.util.concurrent.ConcurrentHashMap;\r
46 import java.util.concurrent.ConcurrentSkipListSet;\r
47 import java.util.concurrent.ExecutorService;\r
48 import java.util.concurrent.Executors;\r
49 import java.util.concurrent.TimeUnit;\r
50 import java.util.concurrent.atomic.AtomicInteger;\r
51 import org.slf4j.MDC;\r
52 \r
53 import static org.onap.ccsdk.features.lib.npm.NpmConstants.MDC_REQUEST_ID;\r
54 \r
55 /**\r
56  * The type Npm Service Manager.\r
57  *\r
58  * @author Kapil Singal\r
59  */\r
60 public class NpmServiceManagerImpl implements NpmServiceManager {\r
61     private static final Logger logger = LoggerFactory.getLogger(NpmServiceManagerImpl.class);\r
62     /**\r
63      * Npm `Priority Queues`.\r
64      * <p>\r
65      * npmPriorityQueues : Map <String:sbEndpoint##sbType, Map:sbPriorityQueues>\r
66      * sbPriorityQueues  : Map <int:priority, TreeSet:priorityQueue>\r
67      * priorityQueue     : NavigableSet [Contains the NpmTransaction Object]\r
68      * </p>\r
69      *\r
70      * <p>\r
71      * Npm will maintain multiple Priority queues per sb_endpoint (an EMS is an example of sb_endpoint)\r
72      * Priority Queues will be maintained per priority, having Npm Transactions, sorted based on timestamp, to accomplish a FIFO queue.\r
73      * </p>\r
74      *\r
75      * <p>\r
76      * Why NavigableSet or ConcurrentSkipListSet is being used:\r
77      * Need to maintain priority queue Sorted based om Transaction timestamp.!!\r
78      * Hence using ConcurrentSkipListSet -> which implements NavigableSet -> which extends SortedSet\r
79      * </p>\r
80      *\r
81      * <p>\r
82      * The ConcurrentSkipListSet class allows safe execution of\r
83      * Insertion, removal, and access operations on set concurrently by multiple threads.\r
84      * </p>\r
85      *\r
86      * <p>\r
87      * It should be preferred over other implementations of the Set interface\r
88      * when concurrent modification of set by multiple threads is required.\r
89      * </p>\r
90      */\r
91     private final Map<String, Map<Integer, NavigableSet<NpmTransaction>>> npmPriorityQueues = new ConcurrentHashMap<>();\r
92     private final Map<String, Integer> connectionCounter = new ConcurrentHashMap<>();\r
93     private final Map<String, NpmServiceCallbackApi> serviceRegistry = new ConcurrentHashMap<>();\r
94     private final Map<String, Integer> priorityExecState = new ConcurrentHashMap<>();\r
95     private final Map<String, Integer> qspExecState = new ConcurrentHashMap<>();\r
96 \r
97     private final Properties npmConfigurations = new Properties();\r
98     private final ExecutorService executorService = Executors.newCachedThreadPool();\r
99 \r
100     private boolean isProcessIngNpmPriorityQueues = false;\r
101 \r
102     public NpmServiceManagerImpl() throws NpmException {\r
103         loadProperties();\r
104         Runnable processNpmPriorityQueuesTask = () -> {\r
105             try {\r
106                 if (!isProcessIngNpmPriorityQueues) {\r
107                     isProcessIngNpmPriorityQueues = true;\r
108                     // Cleaning up MDC to make sure logging doesn't have old requestID being used for further processing\r
109                     MDC.clear();\r
110                     processNpmPriorityQueues();\r
111                     isProcessIngNpmPriorityQueues = false;\r
112                 }\r
113             } catch (StackOverflowError | Exception e) {\r
114                 //Setting isProcessIngNpmPriorityQueues to false because next time when periodic task runs it should re-run processNpmPriorityQueues\r
115                 isProcessIngNpmPriorityQueues = false;\r
116                 // Catching both as there may not be any npm transaction at time of boot or eventual\r
117                 logger.warn("----------- Task to processNpmPriorityQueues failed ----------- \nErrorMessage:({})", e.getMessage(), e);\r
118             }\r
119         };\r
120         Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(processNpmPriorityQueuesTask, 30, 5, TimeUnit.SECONDS);\r
121     }\r
122 \r
123     @Override\r
124     public String getNpmConfig(String referenceKey) {\r
125         return npmConfigurations.getProperty(referenceKey);\r
126     }\r
127 \r
128     @Override\r
129     public void registerService(String serviceKey, NpmServiceCallbackApi npmServiceCallbackApi) {\r
130         logger.trace("------------- Registering NpmServiceCallbackApi with serviceKey:({}) -------------", serviceKey);\r
131         serviceRegistry.put(serviceKey, npmServiceCallbackApi);\r
132     }\r
133 \r
134     @Override\r
135     public void addTransactionToQueue(final NpmTransaction npmTransaction) throws NpmException {\r
136         logger.trace("------------- Inside NPM SM addTransactionToQueue -------------");\r
137         logger.trace("Queuing Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
138 \r
139         //Sorted Queue based on timestamp, if timestamp same for multiple Transaction it sorts those with NpmTransactionId.\r
140         //Using computeIfAbsent to make sure it creates priority_queue for particular sb_endpoint if not already present\r
141         final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(npmTransaction.getSbEndpoint(), npmTransaction.getSbType());\r
142 \r
143         logger.trace("Locating npmPriorityQueues with key sbEndpoint##sbType :: ({})", npmPriorityQueueKey);\r
144         NavigableSet<NpmTransaction> priorityQueue = npmPriorityQueues.computeIfAbsent(npmPriorityQueueKey,\r
145             sbPriorityQueues -> new TreeMap<>()).computeIfAbsent(npmTransaction.getPriority(),\r
146                 priorityQueueSet -> new ConcurrentSkipListSet<>\r
147                         (Comparator.comparing(NpmTransaction :: getTimestamp).thenComparing(NpmTransaction :: getNpmTransactionId)));\r
148         logger.trace("Current queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
149                 npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
150 \r
151         if (priorityQueue.contains(npmTransaction)) {\r
152             logger.trace("Npm Transaction with npmTransactionId:({}) is already present in queued, returning without altering the queue...",\r
153                     npmTransaction.getNpmTransactionId());\r
154             return;\r
155         }\r
156 \r
157         // Compare if queue_capacity_$priority available from Configurations, else by default it will be comparing against default value = 10\r
158         final int queueCapacity = NumberUtils.toInt(getNpmConfig("queue_capacity_" + npmTransaction.getPriority()), 10);\r
159         if (priorityQueue.size() >= queueCapacity) {\r
160             npmTransaction.setStatus(NpmStatusEnum.OUT_OF_CAPACITY);\r
161             String message = String.format("Queue %s Error. Npm Queue for sb_endpoint:(%s) with Priority:(%s) is maxed out to it's capacity limit:(%s)",\r
162                     NpmStatusEnum.OUT_OF_CAPACITY, npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), queueCapacity);\r
163             logger.trace("Returning Error message:({})", message);\r
164             throw new NpmException(message);\r
165         }\r
166         npmTransaction.setStatus(NpmStatusEnum.QUEUED);\r
167         priorityQueue.add(npmTransaction);\r
168         logger.trace("Successfully queued Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
169         logger.trace("Updated queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
170                 npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
171     }\r
172 \r
173     @Override\r
174     public List<NpmTransaction> retrieveTransactionFromQueue(String sbEndpoint, String sbType) {\r
175         logger.trace("------------- Inside NPM SM retrieveTransactionFromQueue -------------");\r
176         logger.trace("Retrieving all Npm Transactions for sbEndpoint:sbType ({}:{}) from priorityQueues", sbEndpoint, sbType);\r
177 \r
178         final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(sbEndpoint, sbType);\r
179         final List<NpmTransaction> npmTransactionList = new ArrayList<>();\r
180 \r
181         //Using computeIfPresent as npmTransactionQueueMap doesn't need any alteration if Npm Transaction is not found.\r
182         npmPriorityQueues.computeIfPresent(npmPriorityQueueKey, (sb, sbPriorityQueues) -> {\r
183             sbPriorityQueues.forEach((priority, npmTransactionNavigableSet) -> {\r
184                 npmTransactionList.addAll(npmTransactionNavigableSet);\r
185             });\r
186             return sbPriorityQueues;\r
187         });\r
188 \r
189         logger.trace("Retrieved total {} Npm Transactions from priorityQueues", npmTransactionList.size());\r
190         return npmTransactionList;\r
191     }\r
192 \r
193     @Override\r
194     public Map<String, Map<Integer, NavigableSet<NpmTransaction>>> retrieveNpmPriorityQueues() {\r
195         // TODO: Check if it should return the actual queue map instance or a clone !!\r
196         return npmPriorityQueues;\r
197     }\r
198 \r
199     @Override\r
200     public void removeTransactionFromQueue(NpmTransaction npmTransaction, boolean updateConnectionCounter) {\r
201         logger.trace("------------- Inside NPM SM removeTransactionFromQueue -------------");\r
202         logger.trace("Removing Npm Transaction from priority queue with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
203         if (updateConnectionCounter) {\r
204             // Updating connection counter so that next transaction can be processed from queue for same sbEndpoint\r
205             updateConnectionCounter(npmTransaction.getSbEndpoint(), Math.negateExact(npmTransaction.getConnectionCount()));\r
206         }\r
207         final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(npmTransaction.getSbEndpoint(), npmTransaction.getSbType());\r
208 \r
209         //Using computeIfPresent as npmTransactionQueueMap doesn't need any alteration if Npm Transaction is not found.\r
210         npmPriorityQueues.computeIfPresent(npmPriorityQueueKey, (sb, sbPriorityQueues) -> {\r
211             NavigableSet<NpmTransaction> priorityQueue = sbPriorityQueues.get(npmTransaction.getPriority());\r
212             if (priorityQueue != null) {\r
213                 logger.trace("Current queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
214                         npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
215 \r
216                 priorityQueue.remove(npmTransaction);\r
217                 logger.trace("Successfully removed Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
218                 logger.trace("Updated queue length for sbEndpoint:({}) with priority:({}) is:({})",\r
219                         npmTransaction.getSbEndpoint(), npmTransaction.getPriority(), priorityQueue.size());\r
220 \r
221                 // Cleaning up Priority Queue if empty\r
222                 if (priorityQueue.isEmpty()) {\r
223                     logger.trace("As priorityQueue for sbEndpoint:({}) with priority:({}) is empty, removing the priority queue.",\r
224                             npmTransaction.getSbEndpoint(), npmTransaction.getPriority());\r
225                     sbPriorityQueues.remove(npmTransaction.getPriority());\r
226                 }\r
227             }\r
228             return sbPriorityQueues;\r
229         });\r
230     }\r
231 \r
232     private void processExpiredNpmTransaction() {\r
233         logger.trace("------------- Inside NPM SM processExpiredNpmTransaction -------------");\r
234         if (npmPriorityQueues.isEmpty()) {\r
235             logger.trace("------------- No Priority Queue is present, nothing to cleanup, hence returning -------------");\r
236             // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
237             return;\r
238         }\r
239         // Converting to entrySet so that it can parallel stream :)\r
240         npmPriorityQueues.entrySet().parallelStream().forEach(sbEndpointMapEntry -> {\r
241             sbEndpointMapEntry.getValue().entrySet().parallelStream().forEach(prioritySetEntry -> {\r
242 \r
243                 // TODO: Need to test, Periodic Printing of Current Queue Length is expected by Rajesh and Team\r
244                 logger.trace("Current queue length with key sbEndpoint##sbType :: ({}) with priority:({}) is:({})",\r
245                         sbEndpointMapEntry.getKey(), prioritySetEntry.getKey(), prioritySetEntry.getValue().size());\r
246 \r
247                 prioritySetEntry.getValue().parallelStream().forEach(npmTransaction -> {\r
248                     // Checking if npmTransaction is already expired\r
249                     if (NpmUtils.isExpired(npmTransaction)) {\r
250                         logger.trace("Npm Transaction with npmTransactionId:({}) is Expired and will be removed from priority queue, as timeToLive has passed.",\r
251                                 npmTransaction.getNpmTransactionId());\r
252                         npmTransaction.setStatus(NpmStatusEnum.EXPIRED);\r
253                         npmTransaction.setMessage("Npm Transaction is Expired and will be removed from priority queue, as timeToLive has passed.");\r
254                         Runnable notifyServiceTask = () -> invokeServiceCallbackApi(npmTransaction, false);\r
255                         executorService.execute(notifyServiceTask);\r
256                         removeTransactionFromQueue(npmTransaction, false);\r
257                     }\r
258                 });\r
259             });\r
260         });\r
261         logger.trace("------------- Done with checking all priority queues for any expired Npm Transaction -------------");\r
262     }\r
263 \r
264     private void processNpmPriorityQueues() {\r
265         logger.trace("------------- Inside NPM SM processNpmPriorityQueues -------------");\r
266         if (npmPriorityQueues.isEmpty()) {\r
267             // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
268             return;\r
269         }\r
270         logger.trace("Calling processExpiredNpmTransaction to cleanup expired Npm Transactions before processing any queue.");\r
271         processExpiredNpmTransaction();\r
272 \r
273         // Converting to entrySet so that it can parallel stream :)\r
274         npmPriorityQueues.entrySet().parallelStream().forEach(sbQueueEntry -> {\r
275             final String sbEndpoint = sbQueueEntry.getKey().split("##")[0];\r
276             final String sbType = sbQueueEntry.getKey().split("##")[1];\r
277             final int sbConnectionLimit = NumberUtils.toInt(getNpmConfig(sbType), 1);\r
278 \r
279             if (sbConnectionLimit <= connectionCounter.getOrDefault(sbEndpoint, 0)) {\r
280                 logger.trace("Not processing any Npm Transaction for sbEndpoint:({}) as it is already occupied to it's maximum connection limit:({}).",\r
281                         sbEndpoint, sbConnectionLimit);\r
282                 //returning when a particular SB (sbEndpoint) is already occupied to it's max limit\r
283                 return;\r
284             }\r
285             logger.trace("Trying to process priority queue for sbEndpoint({}) with connectionLimit:({})", sbEndpoint, sbConnectionLimit);\r
286             processSbPriorityQueues(sbEndpoint, sbType, sbConnectionLimit, sbQueueEntry.getValue());\r
287         });\r
288     }\r
289 \r
290     private void processSbPriorityQueues(String sbEndpoint, String sbType, int sbConnectionLimit, final Map<Integer, NavigableSet<NpmTransaction>> priorityQueues) {\r
291         logger.trace("------------- Inside NPM SM processSbPriorityQueues -------------");\r
292         if (priorityQueues == null || priorityQueues.isEmpty()) {\r
293             // Returning here itself to avoid StackOverFlow or other runtime error as there may not be any npm transaction to process\r
294             return;\r
295         }\r
296 \r
297         Iterator<Map.Entry<Integer,NavigableSet<NpmTransaction>>> priorityQueuesIterator = priorityQueues.entrySet().iterator();\r
298         while (priorityQueuesIterator.hasNext()) {\r
299             Map.Entry<Integer,NavigableSet<NpmTransaction>> entry = priorityQueuesIterator.next();\r
300             final NavigableSet<NpmTransaction> npmTransactions = entry.getValue();\r
301             if (npmTransactions.isEmpty()) {\r
302                 priorityQueuesIterator.remove();\r
303                 continue;\r
304             }\r
305 \r
306             final Integer priorityIndex = entry.getKey();\r
307             final String qspLimitKey = "qsp_limit_" + priorityIndex;\r
308             final String qspStateKey = sbEndpoint + "_" + priorityIndex;\r
309 \r
310             AtomicInteger qspLimit = new AtomicInteger(NumberUtils.toInt(getNpmConfig(qspLimitKey), 5));\r
311             AtomicInteger qspCounter = new AtomicInteger(qspExecState.getOrDefault(qspStateKey, 0));\r
312             logger.trace("For sbEndpoint:({}) with priority:({}) qspLimit is:({}) and current qspCounter is:({})",\r
313                 sbEndpoint, priorityIndex, qspLimit.get(), qspCounter.get());\r
314 \r
315             // On re-iteration it should be processing same priority queue which was processed last only if qsp hasn't met\r
316             if (NpmUtils.isAllOkToProcess(qspLimit.get(), qspCounter.get(), connectionCounter.getOrDefault(sbEndpoint, 0), sbConnectionLimit)\r
317                 && priorityExecState.containsKey(sbEndpoint) && priorityQueues.containsKey(priorityExecState.get(sbEndpoint))\r
318                 && !priorityIndex.equals(priorityExecState.get(sbEndpoint))) {\r
319                 logger.trace("Last execution state for sbEndpoint:({}) was for priority:({})", sbEndpoint, priorityExecState.get(sbEndpoint));\r
320                 return;\r
321             }\r
322 \r
323             logger.trace("------------- Iterating npmTransactions from priorityQueue -------------");\r
324             for (final NpmTransaction npmTransaction : npmTransactions) {\r
325                 // Setting RequestID in MDC same as NPM Transaction RequestId\r
326                 MDC.put(MDC_REQUEST_ID, npmTransaction.getRequestId());\r
327                 // Should pick npmTransactions which are in QUEUED state and are not Expired\r
328                 if (NpmStatusEnum.QUEUED.equals(npmTransaction.getStatus()) && !NpmUtils.isExpired(npmTransaction)\r
329                     && NpmUtils.isAllOkToProcess(qspLimit.get(), qspCounter.get(), connectionCounter.getOrDefault(sbEndpoint, 0), sbConnectionLimit)\r
330                     && invokeServiceCallbackApi(npmTransaction, true)) {\r
331 \r
332                     logger.trace("------------- Updating priorityExecState and qspExecState -------------");\r
333                     priorityExecState.put(sbEndpoint, priorityIndex);\r
334                     qspExecState.put(qspStateKey, qspCounter.incrementAndGet());\r
335                     logger.trace("Updated priorityExecState for sbEndpoint:({}) with priority:({})", sbEndpoint, priorityIndex);\r
336                     logger.trace("Updated qspExecState for qspStateKey:({}) with qspCounter value:({})", qspStateKey, qspExecState.get(qspStateKey));\r
337                 }\r
338             }\r
339             resetExecStates(sbEndpoint, sbType);\r
340         }\r
341     }\r
342 \r
343     private boolean invokeServiceCallbackApi(NpmTransaction npmTransaction, boolean updateConnectionCounter) {\r
344         logger.trace("------------- Inside NPM SM invokeServiceCallbackApi -------------");\r
345         try {\r
346             logger.trace("Notifying Registered Service with serviceKey:({}) to process Npm Transaction with npmTransactionId:({})",\r
347                 npmTransaction.getServiceKey(), npmTransaction.getNpmTransactionId());\r
348             //Setting the status as PROCESSING so that same won't be picked up again in processNpmPriorityQueues\r
349             npmTransaction.setStatus(NpmStatusEnum.PROCESSING);\r
350             serviceRegistry.get(npmTransaction.getServiceKey()).process(npmTransaction);\r
351             logger.trace("Notified Registered Service to process Npm Transaction with npmTransactionId:({})", npmTransaction.getNpmTransactionId());\r
352 \r
353             if (updateConnectionCounter) {\r
354                 updateConnectionCounter(npmTransaction.getSbEndpoint(), npmTransaction.getConnectionCount());\r
355             }\r
356         } catch (NpmException e) {\r
357             logger.error("Notifying Registered Service with serviceKey:({}) for npmTransactionId:({}) failed with ErrorMessage:({})",\r
358                 npmTransaction.getServiceKey(), npmTransaction.getNpmTransactionId(), e.getMessage(), e);\r
359             removeTransactionFromQueue(npmTransaction, true);\r
360             return false;\r
361         }\r
362         return true;\r
363     }\r
364 \r
365     /**\r
366      * Resetting Execution States only if QSP met for all priorities so that Npm can reiterate in Round Robin fashion.\r
367      */\r
368     private void resetExecStates(String sbEndpoint, String sbType) {\r
369         logger.trace("------------- Inside NPM SM resetExecStates -------------");\r
370         boolean temp = true;\r
371         final String npmPriorityQueueKey = NpmUtils.getNpmPriorityQueueKey(sbEndpoint, sbType);\r
372         for (int priority : NpmUtils.getPriorityList(getNpmConfig("Priority_List"))) {\r
373             if (npmPriorityQueues.containsKey(npmPriorityQueueKey) && npmPriorityQueues.get(npmPriorityQueueKey).containsKey(priority)\r
374                 && !priorityExecState.containsValue(priority)) {\r
375                 //Setting temp to false so that it won't cleanup priorityExecState and qspExecState as all priorityQueues hasn't been processed yet\r
376                 logger.trace("Execution States won't be resetting for sbEndpoint:({}) as all priorityQueues hasn't been processed yet.", sbEndpoint);\r
377                 temp = false;\r
378                 break;\r
379             }\r
380         }\r
381         if (temp) {\r
382             for (int priority : NpmUtils.getPriorityList(getNpmConfig("Priority_List"))) {\r
383                 logger.trace("Resetting Execution States for sbEndpoint:({}) as all priorityQueues processed and those needs to reiterate.", sbEndpoint);\r
384                 priorityExecState.remove(sbEndpoint);\r
385                 qspExecState.remove(sbEndpoint + "_" + priority);\r
386             }\r
387         }\r
388     }\r
389 \r
390     private void updateConnectionCounter(String sbEndpoint, int connectionCounterValue) {\r
391         logger.trace("------------- Inside NPM SM updateConnectionCounter -------------");\r
392         //Updating connectionCounter value to be 0 or +ve integer whichever is larger\r
393         connectionCounter.computeIfPresent(sbEndpoint, (key, value) -> Math.max((value + connectionCounterValue), 0));\r
394         connectionCounter.computeIfAbsent(sbEndpoint, s -> Math.max(connectionCounterValue, 0));\r
395         logger.trace("For sbEndpoint:({}) updated connectionCounter value is:({}) ", sbEndpoint, connectionCounter.get(sbEndpoint));\r
396     }\r
397 \r
398     private void loadProperties() throws NpmException {\r
399         logger.trace("------------- Inside NPM SM loadProperties -------------");\r
400         String propDir = System.getProperty(NpmConstants.SDNC_CONFIG_DIR);\r
401         if (StringUtils.isBlank(propDir)) {\r
402             propDir = System.getenv(NpmConstants.SDNC_CONFIG_DIR);\r
403         }\r
404         if (StringUtils.isBlank(propDir)) {\r
405             logger.warn("Environment variable:({}) is not set, defaulting properties directory to:({})",\r
406                     NpmConstants.SDNC_CONFIG_DIR, NpmConstants.DEFAULT_SDNC_CONFIG_DIR);\r
407             propDir = NpmConstants.DEFAULT_SDNC_CONFIG_DIR;\r
408         }\r
409         loadNpmConfig(propDir + File.separator + NpmConstants.NPM_CONFIG_PROPERTIES_FILE_NAME);\r
410     }\r
411 \r
412     @Override\r
413     public void loadNpmConfig(String configFilePath) throws NpmException {\r
414         logger.trace("------------- Inside NPM SM loadNpmConfig -------------");\r
415         try {\r
416             logger.trace("Initializing NPM Configurations from:({})", configFilePath);\r
417             if (new File(configFilePath).exists()) {\r
418                 npmConfigurations.load(new FileInputStream(configFilePath));\r
419             } else {\r
420                 logger.warn("Config File:({}) not found, Initializing NPM with default configurations.", configFilePath);\r
421                 configFilePath = "properties" + File.separator + NpmConstants.NPM_CONFIG_PROPERTIES_FILE_NAME;\r
422                 npmConfigurations.load(getClass().getClassLoader().getResourceAsStream(configFilePath));\r
423             }\r
424             logger.trace("Initialized NPM with Configurations:({}) from configFilePath:({})", npmConfigurations, configFilePath);\r
425         } catch (IOException e) {\r
426             throw new NpmException(String.format("SDN-R Internal Error: Failed to load NPM Configurations form:(%s)", configFilePath), e);\r
427         }\r
428     }\r
429 \r
430 }\r