1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2020 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
22 package org.onap.dcaegen2.services.sonhms.child;
24 import com.fasterxml.jackson.core.type.TypeReference;
25 import com.fasterxml.jackson.databind.ObjectMapper;
27 import fj.data.Either;
28 import org.json.JSONObject;
30 import java.io.IOException;
31 import java.sql.Timestamp;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
38 import java.util.UUID;
39 import java.util.concurrent.BlockingQueue;
40 import java.util.concurrent.LinkedBlockingQueue;
42 import org.onap.dcaegen2.services.sonhms.BufferNotificationComponent;
43 import org.onap.dcaegen2.services.sonhms.ClusterDetailsComponent;
44 import org.onap.dcaegen2.services.sonhms.ConfigPolicy;
45 import org.onap.dcaegen2.services.sonhms.Configuration;
46 import org.onap.dcaegen2.services.sonhms.HoMetricsComponent;
47 import org.onap.dcaegen2.services.sonhms.Timer;
48 import org.onap.dcaegen2.services.sonhms.dao.ClusterDetailsRepository;
49 import org.onap.dcaegen2.services.sonhms.dao.FixedPciCellsRepository;
50 import org.onap.dcaegen2.services.sonhms.dao.PciUpdateRepository;
51 import org.onap.dcaegen2.services.sonhms.dao.SonRequestsRepository;
52 import org.onap.dcaegen2.services.sonhms.dmaap.PolicyDmaapClient;
53 import org.onap.dcaegen2.services.sonhms.entity.HandOverMetrics;
54 import org.onap.dcaegen2.services.sonhms.entity.PciUpdate;
55 import org.onap.dcaegen2.services.sonhms.exceptions.ConfigDbNotFoundException;
56 import org.onap.dcaegen2.services.sonhms.exceptions.OofNotFoundException;
57 import org.onap.dcaegen2.services.sonhms.model.AnrInput;
58 import org.onap.dcaegen2.services.sonhms.model.CellPciPair;
59 import org.onap.dcaegen2.services.sonhms.model.ClusterMap;
60 import org.onap.dcaegen2.services.sonhms.model.Flag;
61 import org.onap.dcaegen2.services.sonhms.model.HoDetails;
62 import org.onap.dcaegen2.services.sonhms.model.ThreadId;
63 import org.onap.dcaegen2.services.sonhms.restclient.AsyncResponseBody;
64 import org.onap.dcaegen2.services.sonhms.restclient.PciSolutions;
65 import org.onap.dcaegen2.services.sonhms.restclient.SdnrRestClient;
66 import org.onap.dcaegen2.services.sonhms.utils.BeanUtil;
67 import org.onap.dcaegen2.services.sonhms.utils.ClusterUtils;
68 import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils;
69 import org.slf4j.Logger;
72 public class ChildThread implements Runnable {
74 private BlockingQueue<List<String>> childStatusUpdate;
75 private BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue = new LinkedBlockingQueue<>();
77 private static Map<Long, AsyncResponseBody> responseMap = new HashMap<>();
78 private Graph cluster;
79 private ThreadId threadId;
80 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
81 HoMetricsComponent hoMetricsComponent;
82 private static final Logger log = org.slf4j.LoggerFactory.getLogger(ChildThread.class);
83 private static Timestamp startTime;
84 private static String networkId;
88 * Constructor with parameters.
90 public ChildThread(BlockingQueue<List<String>> childStatusUpdate, Graph cluster,
91 BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue, ThreadId threadId,
92 HoMetricsComponent hoMetricsComponent) {
94 this.childStatusUpdate = childStatusUpdate;
96 this.threadId = threadId;
97 this.cluster = cluster;
98 this.hoMetricsComponent = hoMetricsComponent;
101 public ChildThread() {
106 * Puts notification in queue.
108 // change this interface to send cell and neighbours to keep it generic for sdnr
111 public void putInQueue(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
113 queue.put(clusterMap);
114 } catch (InterruptedException e) {
115 log.error(" The Thread is Interrupted", e);
116 Thread.currentThread().interrupt();
121 * Puts notification in queue with notify.
123 public void putInQueueWithNotify(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
124 synchronized (queue) {
126 queue.put(clusterMap);
128 } catch (InterruptedException e) {
129 log.error(" The Thread is Interrupted", e);
130 Thread.currentThread().interrupt();
138 * Puts response in queue.
140 public static void putResponse(Long threadId, AsyncResponseBody obj) {
141 synchronized (responseMap) {
142 responseMap.put(threadId, obj);
147 public static Map<Long, AsyncResponseBody> getResponseMap() {
151 public static Timestamp getLastInvokedOofTimeStamp() {
159 threadId.setChildThreadId(Thread.currentThread().getId());
160 synchronized (threadId) {
161 threadId.notifyAll();
164 MDC.put("logFileName", Thread.currentThread().getName());
165 log.info("Starting child thread");
167 StateOof oof = new StateOof(childStatusUpdate);
168 ClusterUtils clusterUtils = new ClusterUtils();
169 Detection detect = new Detection();
170 ChildThreadUtils childUtils = new ChildThreadUtils(ConfigPolicy.getInstance(), new PnfUtils(),
171 new PolicyDmaapClient(new DmaapUtils(), Configuration.getInstance()), new HoMetricsComponent());
174 networkId = cluster.getNetworkId();
175 if (cluster.getCellPciNeighbourMap().isEmpty()) {
176 FixedPciCellsRepository fixedPciCellsRepository = BeanUtil.getBean(FixedPciCellsRepository.class);
177 List<String> fixedPciCells = fixedPciCellsRepository.getFixedPciCells();
178 String cellId = fixedPciCells.get(0);
179 JSONObject cellData = SdnrRestClient.getCellData(cellId);
180 networkId = cellData.getJSONObject("Cell").getString("networkId");
183 Boolean done = false;
185 Map<String, ArrayList<Integer>> collisionConfusionResult = new HashMap<String, ArrayList<Integer>>();
189 if (!cluster.getCellPciNeighbourMap().isEmpty()) {
191 if (cluster.getCollisionConfusionMap().isEmpty()) {
193 collisionConfusionResult = detect.detectCollisionConfusion(cluster);
195 collisionConfusionResult = cluster.getCollisionConfusionMap();
198 Boolean trigger = childUtils.triggerOrWait(collisionConfusionResult);
199 ConfigPolicy configPolicy = ConfigPolicy.getInstance();
202 timer = (double) configPolicy.getConfig().get("PCI_NEIGHBOR_CHANGE_CLUSTER_TIMEOUT_IN_SECS");
203 } catch (NullPointerException e) {
204 log.info("Policy config not available. Using default timeout - 60 seconds");
208 Thread.sleep((long) timer * 1000);
209 } catch (InterruptedException e) {
210 log.error("Interrupted Exception while waiting for more notifications {}", e);
211 Thread.currentThread().interrupt();
214 while (!queue.isEmpty()) {
215 Map<CellPciPair, ArrayList<CellPciPair>> newNotification;
216 newNotification = queue.poll();
217 log.info("New notification from SDNR {}", newNotification);
218 cluster = clusterUtils.modifyCluster(cluster, newNotification);
220 // update cluster in DB
221 clusterUtils.updateCluster(cluster);
222 collisionConfusionResult = detect.detectCollisionConfusion(cluster);
228 ArrayList<String> cellidList = new ArrayList<>();
229 ArrayList<String> cellIds = new ArrayList<>();
231 for (Map.Entry<String, ArrayList<Integer>> entry : collisionConfusionResult.entrySet()) {
232 String key = entry.getKey();
233 ArrayList<Integer> arr;
234 arr = entry.getValue();
235 if (!arr.isEmpty()) {
236 Set<Integer> set = new HashSet<>(arr);
237 if (((set.size() == 1) && !set.contains(0)) || (set.size() != 1)) {
245 for (String cell : cellIds) {
246 log.debug("cellidList entries: {}", cell);
247 cellidList.add(cell);
251 Flag policyTriggerFlag = BeanUtil.getBean(Flag.class);
252 while (policyTriggerFlag.getHolder().equals("PM")) {
255 policyTriggerFlag.setHolder("CHILD");
256 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() + 1);
258 FixedPciCellsRepository fixedPciCellsRepository = BeanUtil.getBean(FixedPciCellsRepository.class);
259 List<String> fixedPciCells = fixedPciCellsRepository.getFixedPciCells();
261 Timer timerOof = BeanUtil.getBean(Timer.class);
262 if (!timerOof.getIsTimer()) {
263 log.info("Starting timer");
264 timerOof.setIsTimer(true);
265 startTime = new Timestamp(System.currentTimeMillis());
266 timerOof.setStartTime(startTime);
267 timerOof.setCount(0);
268 log.info("startTime {}", startTime);
271 int timerThreshold = (Configuration.getInstance().getOofTriggerCountTimer() * 60000);
272 int triggerCountThreshold = Configuration.getInstance().getOofTriggerCountThreshold();
273 log.info("Time threshold {}, triggerCountThreshold {}", timerThreshold, triggerCountThreshold);
274 log.info("oof trigger count {}", timerOof.getCount());
275 timerOof.setCount(timerOof.getCount() + 1);
276 Timestamp currentTime = new Timestamp(System.currentTimeMillis());
277 Long difference = currentTime.getTime() - timerOof.getStartTime().getTime();
278 if (difference < timerThreshold && timerOof.getCount() > triggerCountThreshold) {
279 log.info("difference {}", difference);
281 Either<List<AnrInput>, Integer> anrTriggerResponse = checkAnrTrigger();
282 if (anrTriggerResponse.isRight()) {
283 log.info("ANR trigger response right {}", anrTriggerResponse.right().value());
284 if (anrTriggerResponse.right().value() == 404) {
285 log.info("No poor neighbors found");
286 } else if (anrTriggerResponse.right().value() == 500) {
287 log.info("Failed to fetch HO details from DB ");
289 transactionId = oof.triggerOof(cellidList, networkId, new ArrayList<>(),fixedPciCells);
292 log.info("ANR trigger response left {}", anrTriggerResponse.left().value());
293 List<AnrInput> anrInputList = anrTriggerResponse.left().value();
294 log.info("Trigger oof for joint optimization");
295 transactionId = oof.triggerOof(cellidList, networkId, anrInputList,fixedPciCells);
301 transactionId = oof.triggerOof(cellidList, networkId, new ArrayList<>(),fixedPciCells);
303 if (difference > timerThreshold) {
304 timerOof.setIsTimer(false);
305 timerOof.setCount(0);
309 long childThreadId = Thread.currentThread().getId();
310 childUtils.saveRequest(transactionId.toString(), childThreadId);
311 while (!ChildThread.getResponseMap().containsKey(childThreadId)) {
315 AsyncResponseBody asynResponseBody = ChildThread.getResponseMap().get(childThreadId);
317 List<PciSolutions> pciSolutionsList = asynResponseBody.getSolutions().getPciSolutions();
319 if (!pciSolutionsList.isEmpty())
320 for (PciSolutions pcisolutions : pciSolutionsList) {
322 String cellId = pcisolutions.getCellId();
323 int oldPci = SdnrRestClient.getPci(cellId);
324 int newPci = pcisolutions.getPci();
325 PciUpdate pciUpdate = new PciUpdate();
326 pciUpdate.setCellId(cellId);
327 pciUpdate.setOldPci(oldPci);
328 pciUpdate.setNewPci(newPci);
329 PciUpdateRepository pciUpdateRepository = BeanUtil.getBean(PciUpdateRepository.class);
330 pciUpdateRepository.save(pciUpdate);
334 childUtils.sendToPolicy(asynResponseBody);
335 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() - 1);
336 if (policyTriggerFlag.getNumChilds() == 0) {
337 policyTriggerFlag.setHolder("NONE");
340 } catch (ConfigDbNotFoundException e1) {
341 log.debug("Config DB is unreachable: {}", e1);
344 SonRequestsRepository sonRequestsRepository = BeanUtil.getBean(SonRequestsRepository.class);
345 sonRequestsRepository.deleteByChildThreadId(childThreadId);
347 List<String> childStatus = new ArrayList<>();
348 childStatus.add(Long.toString(Thread.currentThread().getId()));
349 childStatus.add("success");
351 childStatusUpdate.put(childStatus);
352 } catch (InterruptedException e) {
353 log.debug("InterruptedException during childStatus update {}", e);
354 Thread.currentThread().interrupt();
358 if (!cluster.getCellPciNeighbourMap().isEmpty()) {
360 Either<List<String>, Integer> bufferedNotifications = getBufferedNotifications();
362 if (bufferedNotifications.isRight()) {
363 log.info("No buffered notifications");
366 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = getClusterMapsFromNotifications(
367 bufferedNotifications.left().value());
368 for (Map<CellPciPair, ArrayList<CellPciPair>> bufferedClusterMap : clusterMaps) {
369 cluster = clusterUtils.modifyCluster(cluster, bufferedClusterMap);
371 String cellPciNeighbourString = cluster.getPciNeighbourJson();
372 UUID clusterId = cluster.getGraphId();
373 ClusterDetailsRepository clusterDetailsRepository = BeanUtil
374 .getBean(ClusterDetailsRepository.class);
375 clusterDetailsRepository.updateCluster(cellPciNeighbourString, clusterId.toString());
383 } catch (OofNotFoundException e) {
384 log.error("OOF not found, Removing flag and cleaning up");
385 Flag policyTriggerFlag = BeanUtil.getBean(Flag.class);
386 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() - 1);
387 if (policyTriggerFlag.getNumChilds() == 0) {
388 policyTriggerFlag.setHolder("NONE");
390 } catch (Exception e) {
398 private List<Map<CellPciPair, ArrayList<CellPciPair>>> getClusterMapsFromNotifications(List<String> notifications) {
399 ObjectMapper mapper = new ObjectMapper();
400 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = new ArrayList<>();
401 for (String notification : notifications) {
402 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap = new HashMap<>();
403 ClusterMap clusterMapJson = new ClusterMap();
405 clusterMapJson = mapper.readValue(notification, ClusterMap.class);
406 clusterMap.put(clusterMapJson.getCell(), clusterMapJson.getNeighbourList());
408 log.debug("clusterMap{}", clusterMap);
409 clusterMaps.add(clusterMap);
410 } catch (IOException e) {
411 log.error("Error parsing the buffered notification, skipping {}", e);
417 private Either<List<String>, Integer> getBufferedNotifications() {
418 log.info("Check if notifications are buffered");
419 BufferNotificationComponent bufferNotificationComponent = new BufferNotificationComponent();
420 ClusterDetailsComponent clusterDetailsComponent = new ClusterDetailsComponent();
421 String clusterId = clusterDetailsComponent.getClusterId(Thread.currentThread().getId());
422 List<String> bufferedNotifications = bufferNotificationComponent.getBufferedNotification(clusterId);
423 if (bufferedNotifications == null || bufferedNotifications.isEmpty()) {
424 return Either.right(404);
426 return Either.left(bufferedNotifications);
434 private void cleanup() {
435 log.info("cleaning up database and killing child thread");
436 List<String> childStatus = new ArrayList<>();
437 childStatus.add(Long.toString(Thread.currentThread().getId()));
438 childStatus.add("done");
440 childStatusUpdate.put(childStatus);
441 } catch (InterruptedException e) {
442 log.debug("InterruptedException during cleanup{}", e);
443 Thread.currentThread().interrupt();
446 ClusterDetailsRepository clusterDetailsRepository = BeanUtil.getBean(ClusterDetailsRepository.class);
447 clusterDetailsRepository.deleteByChildThreadId(threadId.getChildThreadId());
448 log.info("Child thread :{} {}", Thread.currentThread().getId(), "completed");
449 MDC.remove("logFileName");
454 * Buffer Notification.
456 public List<Map<CellPciPair, ArrayList<CellPciPair>>> bufferNotification() {
458 // Processing Buffered notifications
460 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMapList = new ArrayList<>();
462 Configuration config = Configuration.getInstance();
464 int bufferTime = config.getBufferTime();
466 Timestamp currentTime = new Timestamp(System.currentTimeMillis());
467 log.debug("Current time {}", currentTime);
469 Timestamp laterTime = new Timestamp(System.currentTimeMillis());
470 log.debug("Later time {}", laterTime);
472 long difference = laterTime.getTime() - currentTime.getTime();
473 while (difference < bufferTime) {
476 } catch (InterruptedException e) {
477 log.error("InterruptedException {}", e);
478 Thread.currentThread().interrupt();
481 laterTime = new Timestamp(System.currentTimeMillis());
482 difference = laterTime.getTime() - currentTime.getTime();
484 log.debug("Timer has run for seconds {}", difference);
486 if (!queue.isEmpty()) {
487 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
488 clusterMap = queue.poll();
489 clusterMapList.add(clusterMap);
493 return clusterMapList;
497 * Check if ANR to be triggered.
499 public Either<List<AnrInput>, Integer> checkAnrTrigger() {
501 List<AnrInput> anrInputList = new ArrayList<>();
502 Configuration configuration = Configuration.getInstance();
503 List<HoDetails> hoDetailsList;
504 Either<List<HandOverMetrics>, Integer> hoMetrics = hoMetricsComponent.getAll();
505 if (hoMetrics.isRight()) {
506 log.error("Error in getting HO details from db");
507 return Either.right(500);
509 List<HandOverMetrics> hoMetricsList = hoMetrics.left().value();
510 for (HandOverMetrics hoMetric : hoMetricsList) {
511 String hoDetailsListString = hoMetric.getHoDetails();
512 ObjectMapper mapper = new ObjectMapper();
514 hoDetailsList = mapper.readValue(hoDetailsListString, new TypeReference<ArrayList<HoDetails>>() {
516 } catch (Exception e) {
517 log.error("Exception in parsing HO metrics", hoDetailsListString, e);
520 List<String> removeableNeighbors = new ArrayList<>();
521 log.info("Checking poor count for src cell {}", hoMetric.getSrcCellId());
522 for (HoDetails hoDetail : hoDetailsList) {
523 if (hoDetail.getPoorCount() >= configuration.getPoorCountThreshold()) {
524 removeableNeighbors.add(hoDetail.getDstCellId());
528 if (!removeableNeighbors.isEmpty()) {
529 AnrInput anrInput = new AnrInput(hoMetric.getSrcCellId(), removeableNeighbors);
530 anrInputList.add(anrInput);
533 if (!anrInputList.isEmpty()) {
534 return Either.left(anrInputList);
536 return Either.right(404);