1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2020 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
22 package org.onap.dcaegen2.services.sonhms.child;
24 import com.fasterxml.jackson.core.type.TypeReference;
25 import com.fasterxml.jackson.databind.ObjectMapper;
27 import fj.data.Either;
29 import java.io.IOException;
30 import java.sql.Timestamp;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
37 import java.util.UUID;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.LinkedBlockingQueue;
41 import org.onap.dcaegen2.services.sonhms.BufferNotificationComponent;
42 import org.onap.dcaegen2.services.sonhms.ClusterDetailsComponent;
43 import org.onap.dcaegen2.services.sonhms.ConfigPolicy;
44 import org.onap.dcaegen2.services.sonhms.Configuration;
45 import org.onap.dcaegen2.services.sonhms.HoMetricsComponent;
46 import org.onap.dcaegen2.services.sonhms.Timer;
47 import org.onap.dcaegen2.services.sonhms.dao.ClusterDetailsRepository;
48 import org.onap.dcaegen2.services.sonhms.dao.FixedPciCellsRepository;
49 import org.onap.dcaegen2.services.sonhms.dao.PciUpdateRepository;
50 import org.onap.dcaegen2.services.sonhms.dao.SonRequestsRepository;
51 import org.onap.dcaegen2.services.sonhms.dmaap.PolicyDmaapClient;
52 import org.onap.dcaegen2.services.sonhms.entity.HandOverMetrics;
53 import org.onap.dcaegen2.services.sonhms.entity.PciUpdate;
54 import org.onap.dcaegen2.services.sonhms.exceptions.ConfigDbNotFoundException;
55 import org.onap.dcaegen2.services.sonhms.exceptions.OofNotFoundException;
56 import org.onap.dcaegen2.services.sonhms.model.AnrInput;
57 import org.onap.dcaegen2.services.sonhms.model.CellPciPair;
58 import org.onap.dcaegen2.services.sonhms.model.ClusterMap;
59 import org.onap.dcaegen2.services.sonhms.model.Flag;
60 import org.onap.dcaegen2.services.sonhms.model.HoDetails;
61 import org.onap.dcaegen2.services.sonhms.model.ThreadId;
62 import org.onap.dcaegen2.services.sonhms.restclient.AsyncResponseBody;
63 import org.onap.dcaegen2.services.sonhms.restclient.PciSolutions;
64 import org.onap.dcaegen2.services.sonhms.restclient.SdnrRestClient;
65 import org.onap.dcaegen2.services.sonhms.utils.BeanUtil;
66 import org.onap.dcaegen2.services.sonhms.utils.ClusterUtils;
67 import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils;
68 import org.slf4j.Logger;
71 public class ChildThread implements Runnable {
73 private BlockingQueue<List<String>> childStatusUpdate;
74 private BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue = new LinkedBlockingQueue<>();
76 private static Map<Long, AsyncResponseBody> responseMap = new HashMap<>();
77 private Graph cluster;
78 private ThreadId threadId;
79 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
80 HoMetricsComponent hoMetricsComponent;
81 private static final Logger log = org.slf4j.LoggerFactory.getLogger(ChildThread.class);
82 private static Timestamp startTime;
86 * Constructor with parameters.
88 public ChildThread(BlockingQueue<List<String>> childStatusUpdate, Graph cluster,
89 BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue, ThreadId threadId,
90 HoMetricsComponent hoMetricsComponent) {
92 this.childStatusUpdate = childStatusUpdate;
94 this.threadId = threadId;
95 this.cluster = cluster;
96 this.hoMetricsComponent = hoMetricsComponent;
99 public ChildThread() {
104 * Puts notification in queue.
106 // change this interface to send cell and neighbours to keep it generic for sdnr
109 public void putInQueue(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
111 queue.put(clusterMap);
112 } catch (InterruptedException e) {
113 log.error(" The Thread is Interrupted", e);
114 Thread.currentThread().interrupt();
119 * Puts notification in queue with notify.
121 public void putInQueueWithNotify(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
122 synchronized (queue) {
124 queue.put(clusterMap);
126 } catch (InterruptedException e) {
127 log.error(" The Thread is Interrupted", e);
128 Thread.currentThread().interrupt();
136 * Puts response in queue.
138 public static void putResponse(Long threadId, AsyncResponseBody obj) {
139 synchronized (responseMap) {
140 responseMap.put(threadId, obj);
145 public static Map<Long, AsyncResponseBody> getResponseMap() {
149 public static Timestamp getLastInvokedOofTimeStamp() {
157 threadId.setChildThreadId(Thread.currentThread().getId());
158 synchronized (threadId) {
159 threadId.notifyAll();
162 MDC.put("logFileName", Thread.currentThread().getName());
163 log.info("Starting child thread");
165 StateOof oof = new StateOof(childStatusUpdate);
166 ClusterUtils clusterUtils = new ClusterUtils();
167 Detection detect = new Detection();
168 ChildThreadUtils childUtils = new ChildThreadUtils(ConfigPolicy.getInstance(), new PnfUtils(),
169 new PolicyDmaapClient(new DmaapUtils(), Configuration.getInstance()), new HoMetricsComponent());
172 String networkId = cluster.getNetworkId();
174 Boolean done = false;
176 Map<String, ArrayList<Integer>> collisionConfusionResult = new HashMap<String, ArrayList<Integer>>();
180 if (!cluster.getCellPciNeighbourMap().isEmpty()) {
182 if (cluster.getCollisionConfusionMap().isEmpty()) {
184 collisionConfusionResult = detect.detectCollisionConfusion(cluster);
186 collisionConfusionResult = cluster.getCollisionConfusionMap();
189 Boolean trigger = childUtils.triggerOrWait(collisionConfusionResult);
190 ConfigPolicy configPolicy = ConfigPolicy.getInstance();
193 timer = (double) configPolicy.getConfig().get("PCI_NEIGHBOR_CHANGE_CLUSTER_TIMEOUT_IN_SECS");
194 } catch (NullPointerException e) {
195 log.info("Policy config not available. Using default timeout - 60 seconds");
199 Thread.sleep((long) timer * 1000);
200 } catch (InterruptedException e) {
201 log.error("Interrupted Exception while waiting for more notifications {}", e);
202 Thread.currentThread().interrupt();
205 while (!queue.isEmpty()) {
206 Map<CellPciPair, ArrayList<CellPciPair>> newNotification;
207 newNotification = queue.poll();
208 log.info("New notification from SDNR {}", newNotification);
209 cluster = clusterUtils.modifyCluster(cluster, newNotification);
211 // update cluster in DB
212 clusterUtils.updateCluster(cluster);
213 collisionConfusionResult = detect.detectCollisionConfusion(cluster);
219 ArrayList<String> cellidList = new ArrayList<>();
220 ArrayList<String> cellIds = new ArrayList<>();
222 for (Map.Entry<String, ArrayList<Integer>> entry : collisionConfusionResult.entrySet()) {
223 String key = entry.getKey();
224 ArrayList<Integer> arr;
225 arr = entry.getValue();
226 if (!arr.isEmpty()) {
227 Set<Integer> set = new HashSet<>(arr);
228 if (((set.size() == 1) && !set.contains(0)) || (set.size() != 1)) {
236 for (String cell : cellIds) {
237 log.debug("cellidList entries: {}", cell);
238 cellidList.add(cell);
242 Flag policyTriggerFlag = BeanUtil.getBean(Flag.class);
243 while (policyTriggerFlag.getHolder().equals("PM")) {
246 policyTriggerFlag.setHolder("CHILD");
247 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() + 1);
249 FixedPciCellsRepository fixedPciCellsRepository = BeanUtil.getBean(FixedPciCellsRepository.class);
250 List<String> fixedPciCells = fixedPciCellsRepository.getFixedPciCells();
252 Timer timerOof = BeanUtil.getBean(Timer.class);
253 if (!timerOof.getIsTimer()) {
254 log.info("Starting timer");
255 timerOof.setIsTimer(true);
256 startTime = new Timestamp(System.currentTimeMillis());
257 timerOof.setStartTime(startTime);
258 timerOof.setCount(0);
259 log.info("startTime {}", startTime);
262 int timerThreshold = (Configuration.getInstance().getOofTriggerCountTimer() * 60000);
263 int triggerCountThreshold = Configuration.getInstance().getOofTriggerCountThreshold();
264 log.info("Time threshold {}, triggerCountThreshold {}", timerThreshold, triggerCountThreshold);
265 log.info("oof trigger count {}", timerOof.getCount());
266 timerOof.setCount(timerOof.getCount() + 1);
267 Timestamp currentTime = new Timestamp(System.currentTimeMillis());
268 Long difference = currentTime.getTime() - timerOof.getStartTime().getTime();
269 if (difference < timerThreshold && timerOof.getCount() > triggerCountThreshold) {
270 log.info("difference {}", difference);
272 Either<List<AnrInput>, Integer> anrTriggerResponse = checkAnrTrigger();
273 if (anrTriggerResponse.isRight()) {
274 log.info("ANR trigger response right {}", anrTriggerResponse.right().value());
275 if (anrTriggerResponse.right().value() == 404) {
276 log.info("No poor neighbors found");
277 } else if (anrTriggerResponse.right().value() == 500) {
278 log.info("Failed to fetch HO details from DB ");
280 transactionId = oof.triggerOof(cellidList, networkId, new ArrayList<>(),fixedPciCells);
283 log.info("ANR trigger response left {}", anrTriggerResponse.left().value());
284 List<AnrInput> anrInputList = anrTriggerResponse.left().value();
285 log.info("Trigger oof for joint optimization");
286 transactionId = oof.triggerOof(cellidList, networkId, anrInputList,fixedPciCells);
292 transactionId = oof.triggerOof(cellidList, networkId, new ArrayList<>(),fixedPciCells);
294 if (difference > timerThreshold) {
295 timerOof.setIsTimer(false);
296 timerOof.setCount(0);
300 long childThreadId = Thread.currentThread().getId();
301 childUtils.saveRequest(transactionId.toString(), childThreadId);
302 while (!ChildThread.getResponseMap().containsKey(childThreadId)) {
306 AsyncResponseBody asynResponseBody = ChildThread.getResponseMap().get(childThreadId);
308 List<PciSolutions> pciSolutionsList = asynResponseBody.getSolutions().getPciSolutions();
310 if (!pciSolutionsList.isEmpty())
311 for (PciSolutions pcisolutions : pciSolutionsList) {
313 String cellId = pcisolutions.getCellId();
314 int oldPci = SdnrRestClient.getPci(cellId);
315 int newPci = pcisolutions.getPci();
316 PciUpdate pciUpdate = new PciUpdate();
317 pciUpdate.setCellId(cellId);
318 pciUpdate.setOldPci(oldPci);
319 pciUpdate.setNewPci(newPci);
320 PciUpdateRepository pciUpdateRepository = BeanUtil.getBean(PciUpdateRepository.class);
321 pciUpdateRepository.save(pciUpdate);
325 childUtils.sendToPolicy(asynResponseBody);
326 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() - 1);
327 if (policyTriggerFlag.getNumChilds() == 0) {
328 policyTriggerFlag.setHolder("NONE");
331 } catch (ConfigDbNotFoundException e1) {
332 log.debug("Config DB is unreachable: {}", e1);
335 SonRequestsRepository sonRequestsRepository = BeanUtil.getBean(SonRequestsRepository.class);
336 sonRequestsRepository.deleteByChildThreadId(childThreadId);
338 List<String> childStatus = new ArrayList<>();
339 childStatus.add(Long.toString(Thread.currentThread().getId()));
340 childStatus.add("success");
342 childStatusUpdate.put(childStatus);
343 } catch (InterruptedException e) {
344 log.debug("InterruptedException during childStatus update {}", e);
345 Thread.currentThread().interrupt();
349 if (!cluster.getCellPciNeighbourMap().isEmpty()) {
351 Either<List<String>, Integer> bufferedNotifications = getBufferedNotifications();
353 if (bufferedNotifications.isRight()) {
354 log.info("No buffered notifications");
357 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = getClusterMapsFromNotifications(
358 bufferedNotifications.left().value());
359 for (Map<CellPciPair, ArrayList<CellPciPair>> bufferedClusterMap : clusterMaps) {
360 cluster = clusterUtils.modifyCluster(cluster, bufferedClusterMap);
362 String cellPciNeighbourString = cluster.getPciNeighbourJson();
363 UUID clusterId = cluster.getGraphId();
364 ClusterDetailsRepository clusterDetailsRepository = BeanUtil
365 .getBean(ClusterDetailsRepository.class);
366 clusterDetailsRepository.updateCluster(cellPciNeighbourString, clusterId.toString());
374 } catch (OofNotFoundException e) {
375 log.error("OOF not found, Removing flag and cleaning up");
376 Flag policyTriggerFlag = BeanUtil.getBean(Flag.class);
377 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() - 1);
378 if (policyTriggerFlag.getNumChilds() == 0) {
379 policyTriggerFlag.setHolder("NONE");
381 } catch (Exception e) {
389 private List<Map<CellPciPair, ArrayList<CellPciPair>>> getClusterMapsFromNotifications(List<String> notifications) {
390 ObjectMapper mapper = new ObjectMapper();
391 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = new ArrayList<>();
392 for (String notification : notifications) {
393 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap = new HashMap<>();
394 ClusterMap clusterMapJson = new ClusterMap();
396 clusterMapJson = mapper.readValue(notification, ClusterMap.class);
397 clusterMap.put(clusterMapJson.getCell(), clusterMapJson.getNeighbourList());
399 log.debug("clusterMap{}", clusterMap);
400 clusterMaps.add(clusterMap);
401 } catch (IOException e) {
402 log.error("Error parsing the buffered notification, skipping {}", e);
408 private Either<List<String>, Integer> getBufferedNotifications() {
409 log.info("Check if notifications are buffered");
410 BufferNotificationComponent bufferNotificationComponent = new BufferNotificationComponent();
411 ClusterDetailsComponent clusterDetailsComponent = new ClusterDetailsComponent();
412 String clusterId = clusterDetailsComponent.getClusterId(Thread.currentThread().getId());
413 List<String> bufferedNotifications = bufferNotificationComponent.getBufferedNotification(clusterId);
414 if (bufferedNotifications == null || bufferedNotifications.isEmpty()) {
415 return Either.right(404);
417 return Either.left(bufferedNotifications);
425 private void cleanup() {
426 log.info("cleaning up database and killing child thread");
427 List<String> childStatus = new ArrayList<>();
428 childStatus.add(Long.toString(Thread.currentThread().getId()));
429 childStatus.add("done");
431 childStatusUpdate.put(childStatus);
432 } catch (InterruptedException e) {
433 log.debug("InterruptedException during cleanup{}", e);
434 Thread.currentThread().interrupt();
437 ClusterDetailsRepository clusterDetailsRepository = BeanUtil.getBean(ClusterDetailsRepository.class);
438 clusterDetailsRepository.deleteByChildThreadId(threadId.getChildThreadId());
439 log.info("Child thread :{} {}", Thread.currentThread().getId(), "completed");
440 MDC.remove("logFileName");
445 * Buffer Notification.
447 public List<Map<CellPciPair, ArrayList<CellPciPair>>> bufferNotification() {
449 // Processing Buffered notifications
451 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMapList = new ArrayList<>();
453 Configuration config = Configuration.getInstance();
455 int bufferTime = config.getBufferTime();
457 Timestamp currentTime = new Timestamp(System.currentTimeMillis());
458 log.debug("Current time {}", currentTime);
460 Timestamp laterTime = new Timestamp(System.currentTimeMillis());
461 log.debug("Later time {}", laterTime);
463 long difference = laterTime.getTime() - currentTime.getTime();
464 while (difference < bufferTime) {
467 } catch (InterruptedException e) {
468 log.error("InterruptedException {}", e);
469 Thread.currentThread().interrupt();
472 laterTime = new Timestamp(System.currentTimeMillis());
473 difference = laterTime.getTime() - currentTime.getTime();
475 log.debug("Timer has run for seconds {}", difference);
477 if (!queue.isEmpty()) {
478 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
479 clusterMap = queue.poll();
480 clusterMapList.add(clusterMap);
484 return clusterMapList;
488 * Check if ANR to be triggered.
490 public Either<List<AnrInput>, Integer> checkAnrTrigger() {
492 List<AnrInput> anrInputList = new ArrayList<>();
493 Configuration configuration = Configuration.getInstance();
494 List<HoDetails> hoDetailsList;
495 Either<List<HandOverMetrics>, Integer> hoMetrics = hoMetricsComponent.getAll();
496 if (hoMetrics.isRight()) {
497 log.error("Error in getting HO details from db");
498 return Either.right(500);
500 List<HandOverMetrics> hoMetricsList = hoMetrics.left().value();
501 for (HandOverMetrics hoMetric : hoMetricsList) {
502 String hoDetailsListString = hoMetric.getHoDetails();
503 ObjectMapper mapper = new ObjectMapper();
505 hoDetailsList = mapper.readValue(hoDetailsListString, new TypeReference<ArrayList<HoDetails>>() {
507 } catch (Exception e) {
508 log.error("Exception in parsing HO metrics", hoDetailsListString, e);
511 List<String> removeableNeighbors = new ArrayList<>();
512 log.info("Checking poor count for src cell {}", hoMetric.getSrcCellId());
513 for (HoDetails hoDetail : hoDetailsList) {
514 if (hoDetail.getPoorCount() >= configuration.getPoorCountThreshold()) {
515 removeableNeighbors.add(hoDetail.getDstCellId());
519 if (!removeableNeighbors.isEmpty()) {
520 AnrInput anrInput = new AnrInput(hoMetric.getSrcCellId(), removeableNeighbors);
521 anrInputList.add(anrInput);
524 if (!anrInputList.isEmpty()) {
525 return Either.left(anrInputList);
527 return Either.right(404);