1e740a9f12ac0ce95c57e7c30301897405f672d8
[dcaegen2/services/son-handler.git] / src / main / java / org / onap / dcaegen2 / services / sonhms / child / ChildThread.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  son-handler
4  *  ================================================================================
5  *   Copyright (C) 2019-2020 Wipro Limited.
6  *   ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *  
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *  
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *  
20  *******************************************************************************/
21
22 package org.onap.dcaegen2.services.sonhms.child;
23
24 import com.fasterxml.jackson.core.type.TypeReference;
25 import com.fasterxml.jackson.databind.ObjectMapper;
26
27 import fj.data.Either;
28
29 import java.io.IOException;
30 import java.sql.Timestamp;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.UUID;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.LinkedBlockingQueue;
40
41 import org.onap.dcaegen2.services.sonhms.BufferNotificationComponent;
42 import org.onap.dcaegen2.services.sonhms.ClusterDetailsComponent;
43 import org.onap.dcaegen2.services.sonhms.ConfigPolicy;
44 import org.onap.dcaegen2.services.sonhms.Configuration;
45 import org.onap.dcaegen2.services.sonhms.HoMetricsComponent;
46 import org.onap.dcaegen2.services.sonhms.Timer;
47 import org.onap.dcaegen2.services.sonhms.dao.ClusterDetailsRepository;
48 import org.onap.dcaegen2.services.sonhms.dao.FixedPciCellsRepository;
49 import org.onap.dcaegen2.services.sonhms.dao.PciUpdateRepository;
50 import org.onap.dcaegen2.services.sonhms.dao.SonRequestsRepository;
51 import org.onap.dcaegen2.services.sonhms.dmaap.PolicyDmaapClient;
52 import org.onap.dcaegen2.services.sonhms.entity.HandOverMetrics;
53 import org.onap.dcaegen2.services.sonhms.entity.PciUpdate;
54 import org.onap.dcaegen2.services.sonhms.exceptions.ConfigDbNotFoundException;
55 import org.onap.dcaegen2.services.sonhms.exceptions.OofNotFoundException;
56 import org.onap.dcaegen2.services.sonhms.model.AnrInput;
57 import org.onap.dcaegen2.services.sonhms.model.CellPciPair;
58 import org.onap.dcaegen2.services.sonhms.model.ClusterMap;
59 import org.onap.dcaegen2.services.sonhms.model.Flag;
60 import org.onap.dcaegen2.services.sonhms.model.HoDetails;
61 import org.onap.dcaegen2.services.sonhms.model.ThreadId;
62 import org.onap.dcaegen2.services.sonhms.restclient.AsyncResponseBody;
63 import org.onap.dcaegen2.services.sonhms.restclient.PciSolutions;
64 import org.onap.dcaegen2.services.sonhms.restclient.SdnrRestClient;
65 import org.onap.dcaegen2.services.sonhms.utils.BeanUtil;
66 import org.onap.dcaegen2.services.sonhms.utils.ClusterUtils;
67 import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils;
68 import org.slf4j.Logger;
69 import org.slf4j.MDC;
70
71 public class ChildThread implements Runnable {
72
73     private BlockingQueue<List<String>> childStatusUpdate;
74     private BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue = new LinkedBlockingQueue<>();
75
76     private static Map<Long, AsyncResponseBody> responseMap = new HashMap<>();
77     private Graph cluster;
78     private ThreadId threadId;
79     Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
80     HoMetricsComponent hoMetricsComponent;
81     private static final Logger log = org.slf4j.LoggerFactory.getLogger(ChildThread.class);
82     private static Timestamp startTime;
83
84
85     /**
86      * Constructor with parameters.
87      */
88     public ChildThread(BlockingQueue<List<String>> childStatusUpdate, Graph cluster,
89             BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue, ThreadId threadId,
90             HoMetricsComponent hoMetricsComponent) {
91         super();
92         this.childStatusUpdate = childStatusUpdate;
93         this.queue = queue;
94         this.threadId = threadId;
95         this.cluster = cluster;
96         this.hoMetricsComponent = hoMetricsComponent;
97     }
98
99     public ChildThread() {
100
101     }
102
103     /**
104      * Puts notification in queue.
105      */
106     // change this interface to send cell and neighbours to keep it generic for sdnr
107     // and fm
108
109     public void putInQueue(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
110         try {
111             queue.put(clusterMap);
112         } catch (InterruptedException e) {
113             log.error(" The Thread is Interrupted", e);
114             Thread.currentThread().interrupt();
115         }
116     }
117
118     /**
119      * Puts notification in queue with notify.
120      */
121     public void putInQueueWithNotify(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
122         synchronized (queue) {
123             try {
124                 queue.put(clusterMap);
125                 queue.notifyAll();
126             } catch (InterruptedException e) {
127                 log.error(" The Thread is Interrupted", e);
128                 Thread.currentThread().interrupt();
129             }
130
131         }
132
133     }
134
135     /**
136      * Puts response in queue.
137      */
138     public static void putResponse(Long threadId, AsyncResponseBody obj) {
139         synchronized (responseMap) {
140             responseMap.put(threadId, obj);
141         }
142
143     }
144
145     public static Map<Long, AsyncResponseBody> getResponseMap() {
146         return responseMap;
147     }
148
149         public static Timestamp getLastInvokedOofTimeStamp() {
150                 return startTime;
151
152         }
153
154     @Override
155     public void run() {
156
157                 threadId.setChildThreadId(Thread.currentThread().getId());
158                 synchronized (threadId) {
159                         threadId.notifyAll();
160                 }
161
162                 MDC.put("logFileName", Thread.currentThread().getName());
163                 log.info("Starting child thread");
164
165                 StateOof oof = new StateOof(childStatusUpdate);
166                 ClusterUtils clusterUtils = new ClusterUtils();
167                 Detection detect = new Detection();
168                 ChildThreadUtils childUtils = new ChildThreadUtils(ConfigPolicy.getInstance(), new PnfUtils(),
169                                 new PolicyDmaapClient(new DmaapUtils(), Configuration.getInstance()), new HoMetricsComponent());
170
171                 try {
172                         String networkId = cluster.getNetworkId();
173
174                         Boolean done = false;
175
176                         Map<String, ArrayList<Integer>> collisionConfusionResult = new HashMap<String, ArrayList<Integer>>();
177
178                         while (!done) {
179
180                                 if (!cluster.getCellPciNeighbourMap().isEmpty()) {
181
182                                         if (cluster.getCollisionConfusionMap().isEmpty()) {
183
184                                                 collisionConfusionResult = detect.detectCollisionConfusion(cluster);
185                                         } else {
186                                                 collisionConfusionResult = cluster.getCollisionConfusionMap();
187                                         }
188
189                                         Boolean trigger = childUtils.triggerOrWait(collisionConfusionResult);
190                                         ConfigPolicy configPolicy = ConfigPolicy.getInstance();
191                                         double timer = 60;
192                                         try {
193                                                 timer = (double) configPolicy.getConfig().get("PCI_NEIGHBOR_CHANGE_CLUSTER_TIMEOUT_IN_SECS");
194                                         } catch (NullPointerException e) {
195                                                 log.info("Policy config not available. Using default timeout - 60 seconds");
196                                         }
197                                         if (!trigger) {
198                                                 try {
199                                                         Thread.sleep((long) timer * 1000);
200                                                 } catch (InterruptedException e) {
201                                                         log.error("Interrupted Exception while waiting for more notifications {}", e);
202                                                         Thread.currentThread().interrupt();
203                                                 }
204
205                                                 while (!queue.isEmpty()) {
206                                                         Map<CellPciPair, ArrayList<CellPciPair>> newNotification;
207                                                         newNotification = queue.poll();
208                                                         log.info("New notification from SDNR {}", newNotification);
209                                                         cluster = clusterUtils.modifyCluster(cluster, newNotification);
210
211                                                         // update cluster in DB
212                                                         clusterUtils.updateCluster(cluster);
213                                                         collisionConfusionResult = detect.detectCollisionConfusion(cluster);
214
215                                                 }
216
217                                         }
218                                 }
219                                 ArrayList<String> cellidList = new ArrayList<>();
220                                 ArrayList<String> cellIds = new ArrayList<>();
221
222                                 for (Map.Entry<String, ArrayList<Integer>> entry : collisionConfusionResult.entrySet()) {
223                                         String key = entry.getKey();
224                                         ArrayList<Integer> arr;
225                                         arr = entry.getValue();
226                                         if (!arr.isEmpty()) {
227                                                 Set<Integer> set = new HashSet<>(arr);
228                                                 if (((set.size() == 1) && !set.contains(0)) || (set.size() != 1)) {
229                                                         cellIds.add(key);
230
231                                                 }
232                                         }
233
234                                 }
235
236                                 for (String cell : cellIds) {
237                                         log.debug("cellidList entries: {}", cell);
238                                         cellidList.add(cell);
239                                 }
240                                 UUID transactionId;
241
242                                 Flag policyTriggerFlag = BeanUtil.getBean(Flag.class);
243                                 while (policyTriggerFlag.getHolder().equals("PM")) {
244                                         Thread.sleep(100);
245                                 }
246                                 policyTriggerFlag.setHolder("CHILD");
247                                 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() + 1);
248
249                                 FixedPciCellsRepository fixedPciCellsRepository = BeanUtil.getBean(FixedPciCellsRepository.class);
250                                 List<String> fixedPciCells = fixedPciCellsRepository.getFixedPciCells();
251
252                                 Timer timerOof = BeanUtil.getBean(Timer.class);
253                                 if (!timerOof.getIsTimer()) {
254                                         log.info("Starting timer");
255                                         timerOof.setIsTimer(true);
256                                         startTime = new Timestamp(System.currentTimeMillis());
257                                         timerOof.setStartTime(startTime);
258                                         timerOof.setCount(0);
259                                         log.info("startTime {}", startTime); 
260
261                                 }
262                                 int timerThreshold = (Configuration.getInstance().getOofTriggerCountTimer() * 60000);
263                                 int triggerCountThreshold = Configuration.getInstance().getOofTriggerCountThreshold();
264                                 log.info("Time threshold {}, triggerCountThreshold {}", timerThreshold, triggerCountThreshold);
265                                 log.info("oof trigger count {}", timerOof.getCount());
266                                 timerOof.setCount(timerOof.getCount() + 1);
267                                 Timestamp currentTime = new Timestamp(System.currentTimeMillis());
268                                 Long difference = currentTime.getTime() - timerOof.getStartTime().getTime();
269                                 if (difference < timerThreshold && timerOof.getCount() > triggerCountThreshold) {
270                                         log.info("difference {}", difference);
271
272                                         Either<List<AnrInput>, Integer> anrTriggerResponse = checkAnrTrigger();
273                                         if (anrTriggerResponse.isRight()) {
274                                                 log.info("ANR trigger response right {}", anrTriggerResponse.right().value());
275                                                 if (anrTriggerResponse.right().value() == 404) {
276                                                         log.info("No poor neighbors found");
277                                                 } else if (anrTriggerResponse.right().value() == 500) {
278                                                         log.info("Failed to fetch HO details from DB ");
279                                                 }
280                                                 transactionId = oof.triggerOof(cellidList, networkId, new ArrayList<>(),fixedPciCells);
281
282                                         } else {
283                                                 log.info("ANR trigger response left {}", anrTriggerResponse.left().value());
284                                                 List<AnrInput> anrInputList = anrTriggerResponse.left().value();
285                                                 log.info("Trigger oof for joint optimization");
286                                                 transactionId = oof.triggerOof(cellidList, networkId, anrInputList,fixedPciCells);
287
288                                         }
289
290                                 } else {
291
292                                         transactionId = oof.triggerOof(cellidList, networkId, new ArrayList<>(),fixedPciCells);
293
294                                         if (difference > timerThreshold) {
295                                                 timerOof.setIsTimer(false);
296                                                 timerOof.setCount(0);
297                                         }
298                                 }
299
300                                 long childThreadId = Thread.currentThread().getId();
301                                 childUtils.saveRequest(transactionId.toString(), childThreadId);
302                 while (!ChildThread.getResponseMap().containsKey(childThreadId)) {
303                     Thread.sleep(100);
304                 }
305
306                 AsyncResponseBody asynResponseBody = ChildThread.getResponseMap().get(childThreadId);
307                                 
308                                 List<PciSolutions> pciSolutionsList = asynResponseBody.getSolutions().getPciSolutions();
309
310                                 if (!pciSolutionsList.isEmpty())
311                                         for (PciSolutions pcisolutions : pciSolutionsList) {
312
313                                                 String cellId = pcisolutions.getCellId();
314                                                 int oldPci = SdnrRestClient.getPci(cellId);
315                                                 int newPci = pcisolutions.getPci();
316                                                 PciUpdate pciUpdate = new PciUpdate();
317                                                 pciUpdate.setCellId(cellId);
318                                                 pciUpdate.setOldPci(oldPci);
319                                                 pciUpdate.setNewPci(newPci);
320                                                 PciUpdateRepository pciUpdateRepository = BeanUtil.getBean(PciUpdateRepository.class);
321                                                 pciUpdateRepository.save(pciUpdate);
322                                         }
323
324                                 try {
325                                         childUtils.sendToPolicy(asynResponseBody);
326                                         policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() - 1);
327                                         if (policyTriggerFlag.getNumChilds() == 0) {
328                                                 policyTriggerFlag.setHolder("NONE");
329                                         }
330
331                                 } catch (ConfigDbNotFoundException e1) {
332                                         log.debug("Config DB is unreachable: {}", e1);
333                                 }
334
335                                 SonRequestsRepository sonRequestsRepository = BeanUtil.getBean(SonRequestsRepository.class);
336                                 sonRequestsRepository.deleteByChildThreadId(childThreadId);
337
338                                 List<String> childStatus = new ArrayList<>();
339                                 childStatus.add(Long.toString(Thread.currentThread().getId()));
340                                 childStatus.add("success");
341                                 try {
342                                         childStatusUpdate.put(childStatus);
343                                 } catch (InterruptedException e) {
344                                         log.debug("InterruptedException during childStatus update {}", e);
345                                         Thread.currentThread().interrupt();
346
347                                 }
348
349                                 if (!cluster.getCellPciNeighbourMap().isEmpty()) {
350
351                                         Either<List<String>, Integer> bufferedNotifications = getBufferedNotifications();
352
353                                         if (bufferedNotifications.isRight()) {
354                                                 log.info("No buffered notifications");
355                                                 done = true;
356                                         } else {
357                                                 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = getClusterMapsFromNotifications(
358                                                                 bufferedNotifications.left().value());
359                                                 for (Map<CellPciPair, ArrayList<CellPciPair>> bufferedClusterMap : clusterMaps) {
360                                                         cluster = clusterUtils.modifyCluster(cluster, bufferedClusterMap);
361                                                 }
362                                                 String cellPciNeighbourString = cluster.getPciNeighbourJson();
363                                                 UUID clusterId = cluster.getGraphId();
364                                                 ClusterDetailsRepository clusterDetailsRepository = BeanUtil
365                                                                 .getBean(ClusterDetailsRepository.class);
366                                                 clusterDetailsRepository.updateCluster(cellPciNeighbourString, clusterId.toString());
367                                         }
368                                 } else {
369                                         done = true;
370                                 }
371
372                         }
373
374                 } catch (OofNotFoundException e) {
375                         log.error("OOF not found, Removing flag and cleaning up");
376                         Flag policyTriggerFlag = BeanUtil.getBean(Flag.class);
377                         policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() - 1);
378                         if (policyTriggerFlag.getNumChilds() == 0) {
379                                 policyTriggerFlag.setHolder("NONE");
380                         }
381                 } catch (Exception e) {
382                         log.error("{}", e);
383
384                 }
385
386                 cleanup();
387     }
388
389     private List<Map<CellPciPair, ArrayList<CellPciPair>>> getClusterMapsFromNotifications(List<String> notifications) {
390         ObjectMapper mapper = new ObjectMapper();
391         List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = new ArrayList<>();
392         for (String notification : notifications) {
393             Map<CellPciPair, ArrayList<CellPciPair>> clusterMap = new HashMap<>();
394             ClusterMap clusterMapJson = new ClusterMap();
395             try {
396                 clusterMapJson = mapper.readValue(notification, ClusterMap.class);
397                 clusterMap.put(clusterMapJson.getCell(), clusterMapJson.getNeighbourList());
398
399                 log.debug("clusterMap{}", clusterMap);
400                 clusterMaps.add(clusterMap);
401             } catch (IOException e) {
402                 log.error("Error parsing the buffered notification, skipping {}", e);
403             }
404         }
405         return clusterMaps;
406     }
407
408     private Either<List<String>, Integer> getBufferedNotifications() {
409         log.info("Check if notifications are buffered");
410         BufferNotificationComponent bufferNotificationComponent = new BufferNotificationComponent();
411         ClusterDetailsComponent clusterDetailsComponent = new ClusterDetailsComponent();
412         String clusterId = clusterDetailsComponent.getClusterId(Thread.currentThread().getId());
413         List<String> bufferedNotifications = bufferNotificationComponent.getBufferedNotification(clusterId);
414         if (bufferedNotifications == null || bufferedNotifications.isEmpty()) {
415             return Either.right(404);
416         } else {
417             return Either.left(bufferedNotifications);
418         }
419
420     }
421
422     /**
423      * cleanup resources.
424      */
425     private void cleanup() {
426         log.info("cleaning up database and killing child thread");
427         List<String> childStatus = new ArrayList<>();
428         childStatus.add(Long.toString(Thread.currentThread().getId()));
429         childStatus.add("done");
430         try {
431             childStatusUpdate.put(childStatus);
432         } catch (InterruptedException e) {
433             log.debug("InterruptedException during cleanup{}", e);
434             Thread.currentThread().interrupt();
435
436         }
437         ClusterDetailsRepository clusterDetailsRepository = BeanUtil.getBean(ClusterDetailsRepository.class);
438         clusterDetailsRepository.deleteByChildThreadId(threadId.getChildThreadId());
439         log.info("Child thread :{} {}", Thread.currentThread().getId(), "completed");
440         MDC.remove("logFileName");
441
442     }
443
444     /**
445      * Buffer Notification.
446      */
447     public List<Map<CellPciPair, ArrayList<CellPciPair>>> bufferNotification() {
448
449         // Processing Buffered notifications
450
451         List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMapList = new ArrayList<>();
452
453         Configuration config = Configuration.getInstance();
454
455         int bufferTime = config.getBufferTime();
456
457         Timestamp currentTime = new Timestamp(System.currentTimeMillis());
458         log.debug("Current time {}", currentTime);
459
460         Timestamp laterTime = new Timestamp(System.currentTimeMillis());
461         log.debug("Later time {}", laterTime);
462
463         long difference = laterTime.getTime() - currentTime.getTime();
464         while (difference < bufferTime) {
465             try {
466                 Thread.sleep(1000);
467             } catch (InterruptedException e) {
468                 log.error("InterruptedException {}", e);
469                 Thread.currentThread().interrupt();
470
471             }
472             laterTime = new Timestamp(System.currentTimeMillis());
473             difference = laterTime.getTime() - currentTime.getTime();
474
475             log.debug("Timer has run for  seconds {}", difference);
476
477             if (!queue.isEmpty()) {
478                 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
479                 clusterMap = queue.poll();
480                 clusterMapList.add(clusterMap);
481             }
482
483         }
484         return clusterMapList;
485     }
486
487     /**
488      * Check if ANR to be triggered.
489      */
490     public Either<List<AnrInput>, Integer> checkAnrTrigger() {
491
492         List<AnrInput> anrInputList = new ArrayList<>();
493         Configuration configuration = Configuration.getInstance();
494         List<HoDetails> hoDetailsList;
495         Either<List<HandOverMetrics>, Integer> hoMetrics = hoMetricsComponent.getAll();
496         if (hoMetrics.isRight()) {
497             log.error("Error in getting HO details from db");
498             return Either.right(500);
499         }
500         List<HandOverMetrics> hoMetricsList = hoMetrics.left().value();
501         for (HandOverMetrics hoMetric : hoMetricsList) {
502             String hoDetailsListString = hoMetric.getHoDetails();
503             ObjectMapper mapper = new ObjectMapper();
504             try {
505                 hoDetailsList = mapper.readValue(hoDetailsListString, new TypeReference<ArrayList<HoDetails>>() {
506                 });
507             } catch (Exception e) {
508                 log.error("Exception in parsing HO metrics", hoDetailsListString, e);
509                 continue;
510             }
511             List<String> removeableNeighbors = new ArrayList<>();
512             log.info("Checking poor count for src cell {}", hoMetric.getSrcCellId());
513             for (HoDetails hoDetail : hoDetailsList) {
514                 if (hoDetail.getPoorCount() >= configuration.getPoorCountThreshold()) {
515                     removeableNeighbors.add(hoDetail.getDstCellId());
516                 }
517             }
518
519             if (!removeableNeighbors.isEmpty()) {
520                 AnrInput anrInput = new AnrInput(hoMetric.getSrcCellId(), removeableNeighbors);
521                 anrInputList.add(anrInput);
522             }
523         }
524         if (!anrInputList.isEmpty()) {
525             return Either.left(anrInputList);
526         }
527         return Either.right(404);
528     }
529 }