1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
22 package org.onap.dcaegen2.services.sonhms.child;
24 import com.fasterxml.jackson.databind.ObjectMapper;
26 import fj.data.Either;
28 import java.io.IOException;
29 import java.sql.Timestamp;
30 import java.util.ArrayList;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
36 import java.util.UUID;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
40 import org.onap.dcaegen2.services.sonhms.BufferNotificationComponent;
41 import org.onap.dcaegen2.services.sonhms.ClusterDetailsComponent;
42 import org.onap.dcaegen2.services.sonhms.ConfigPolicy;
43 import org.onap.dcaegen2.services.sonhms.Configuration;
44 import org.onap.dcaegen2.services.sonhms.HoMetricsComponent;
45 import org.onap.dcaegen2.services.sonhms.dao.ClusterDetailsRepository;
46 import org.onap.dcaegen2.services.sonhms.dao.SonRequestsRepository;
47 import org.onap.dcaegen2.services.sonhms.dmaap.PolicyDmaapClient;
48 import org.onap.dcaegen2.services.sonhms.exceptions.ConfigDbNotFoundException;
49 import org.onap.dcaegen2.services.sonhms.model.AnrInput;
50 import org.onap.dcaegen2.services.sonhms.model.CellPciPair;
51 import org.onap.dcaegen2.services.sonhms.model.ClusterMap;
52 import org.onap.dcaegen2.services.sonhms.model.Flag;
53 import org.onap.dcaegen2.services.sonhms.model.HoDetails;
54 import org.onap.dcaegen2.services.sonhms.model.ThreadId;
55 import org.onap.dcaegen2.services.sonhms.restclient.AsyncResponseBody;
56 import org.onap.dcaegen2.services.sonhms.utils.BeanUtil;
57 import org.onap.dcaegen2.services.sonhms.utils.ClusterUtils;
58 import org.slf4j.Logger;
61 public class ChildThread implements Runnable {
63 private BlockingQueue<List<String>> childStatusUpdate;
64 private BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue = new LinkedBlockingQueue<>();
66 private static Map<Long, AsyncResponseBody> responseMap = new HashMap<>();
67 private Graph cluster;
68 private ThreadId threadId;
69 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
70 HoMetricsComponent hoMetricsComponent;
71 private static final Logger log = org.slf4j.LoggerFactory.getLogger(ChildThread.class);
74 * Constructor with parameters.
76 public ChildThread(BlockingQueue<List<String>> childStatusUpdate, Graph cluster,
77 BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue, ThreadId threadId,
78 HoMetricsComponent hoMetricsComponent) {
80 this.childStatusUpdate = childStatusUpdate;
82 this.threadId = threadId;
83 this.cluster = cluster;
84 this.hoMetricsComponent = hoMetricsComponent;
87 public ChildThread() {
92 * Puts notification in queue.
94 // change this interface to send cell and neighbours to keep it generic for sdnr
97 public void putInQueue(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
99 queue.put(clusterMap);
100 } catch (InterruptedException e) {
101 log.error(" The Thread is Interrupted", e);
102 Thread.currentThread().interrupt();
107 * Puts notification in queue with notify.
109 public void putInQueueWithNotify(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
110 synchronized (queue) {
112 queue.put(clusterMap);
114 } catch (InterruptedException e) {
115 log.error(" The Thread is Interrupted", e);
116 Thread.currentThread().interrupt();
124 * Puts response in queue.
126 public static void putResponse(Long threadId, AsyncResponseBody obj) {
127 synchronized (responseMap) {
128 responseMap.put(threadId, obj);
133 public static Map<Long, AsyncResponseBody> getResponseMap() {
140 threadId.setChildThreadId(Thread.currentThread().getId());
141 synchronized (threadId) {
142 threadId.notifyAll();
145 MDC.put("logFileName", Thread.currentThread().getName());
146 log.info("Starting child thread");
148 StateOof oof = new StateOof(childStatusUpdate);
149 ClusterUtils clusterUtils = new ClusterUtils();
150 Detection detect = new Detection();
151 ChildThreadUtils childUtils = new ChildThreadUtils(ConfigPolicy.getInstance(), new PnfUtils(),
152 new PolicyDmaapClient());
155 String networkId = cluster.getNetworkId();
157 Boolean done = false;
159 Map<String, ArrayList<Integer>> collisionConfusionResult;
161 if (cluster.getCollisionConfusionMap().isEmpty()) {
163 collisionConfusionResult = detect.detectCollisionConfusion(cluster);
165 collisionConfusionResult = cluster.getCollisionConfusionMap();
168 Boolean trigger = childUtils.triggerOrWait(collisionConfusionResult);
169 ConfigPolicy configPolicy = ConfigPolicy.getInstance();
172 timer = (int) configPolicy.getConfig().get("PCI_NEIGHBOR_CHANGE_CLUSTER_TIMEOUT_IN_SECS");
173 } catch (NullPointerException e) {
174 log.info("Policy config not available. Using default timeout - 60 seconds");
178 Thread.sleep((long) timer * 1000);
179 } catch (InterruptedException e) {
180 log.error("Interrupted Exception while waiting for more notifications {}", e);
181 Thread.currentThread().interrupt();
184 while (!queue.isEmpty()) {
185 Map<CellPciPair, ArrayList<CellPciPair>> newNotification;
186 newNotification = queue.poll();
187 log.info("New notification from SDNR {}", newNotification);
188 cluster = clusterUtils.modifyCluster(cluster, newNotification);
190 // update cluster in DB
191 clusterUtils.updateCluster(cluster);
192 collisionConfusionResult = detect.detectCollisionConfusion(cluster);
197 ArrayList<String> cellidList = new ArrayList<>();
198 ArrayList<String> cellIds = new ArrayList<>();
200 for (Map.Entry<String, ArrayList<Integer>> entry : collisionConfusionResult.entrySet()) {
201 String key = entry.getKey();
202 ArrayList<Integer> arr;
203 arr = entry.getValue();
204 if (!arr.isEmpty()) {
205 Set<Integer> set = new HashSet<>(arr);
206 if (((set.size() == 1) && !set.contains(0)) || (set.size() != 1)) {
214 for (String cell : cellIds) {
215 log.debug("cellidList entries: {}", cell);
216 cellidList.add(cell);
220 Flag policyTriggerFlag = BeanUtil.getBean(Flag.class);
221 while (policyTriggerFlag.getHolder().equals("PM")) {
224 policyTriggerFlag.setHolder("CHILD");
225 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() + 1);
227 Either<List<AnrInput>, Integer> anrTriggerResponse = checkAnrTrigger(cellidList);
228 if (anrTriggerResponse.isRight()) {
230 if (anrTriggerResponse.right().value() == 404) {
231 log.debug("No poor neighbors found");
232 } else if (anrTriggerResponse.right().value() == 500) {
233 log.debug("Failed to fetch HO details from DB ");
236 transactionId = oof.triggerOof(cellidList, networkId, new ArrayList<>());
238 List<AnrInput> anrInputList = anrTriggerResponse.left().value();
239 log.info("Trigger oof for joint optimization");
240 transactionId = oof.triggerOof(cellidList, networkId, anrInputList);
242 long childThreadId = Thread.currentThread().getId();
243 childUtils.saveRequest(transactionId.toString(), childThreadId);
244 while (!ChildThread.getResponseMap().containsKey(childThreadId)) {
248 AsyncResponseBody asynResponseBody = ChildThread.getResponseMap().get(childThreadId);
253 childUtils.sendToPolicy(asynResponseBody);
254 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() - 1);
255 if (policyTriggerFlag.getNumChilds() == 0) {
256 policyTriggerFlag.setHolder("NONE");
260 } catch (ConfigDbNotFoundException e1) {
261 log.debug("Config DB is unreachable: {}", e1);
264 SonRequestsRepository sonRequestsRepository = BeanUtil.getBean(SonRequestsRepository.class);
265 sonRequestsRepository.deleteByChildThreadId(childThreadId);
267 List<String> childStatus = new ArrayList<>();
268 childStatus.add(Long.toString(Thread.currentThread().getId()));
269 childStatus.add("success");
271 childStatusUpdate.put(childStatus);
272 } catch (InterruptedException e) {
273 log.debug("InterruptedException during childStatus update {}", e);
274 Thread.currentThread().interrupt();
278 Either<List<String>, Integer> bufferedNotifications = getBufferedNotifications();
280 if (bufferedNotifications.isRight()) {
281 log.info("No buffered notifications");
284 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = getClusterMapsFromNotifications(
285 bufferedNotifications.left().value());
286 for (Map<CellPciPair, ArrayList<CellPciPair>> bufferedClusterMap : clusterMaps) {
287 cluster = clusterUtils.modifyCluster(cluster, bufferedClusterMap);
289 String cellPciNeighbourString = cluster.getPciNeighbourJson();
290 UUID clusterId = cluster.getGraphId();
291 ClusterDetailsRepository clusterDetailsRepository = BeanUtil
292 .getBean(ClusterDetailsRepository.class);
293 clusterDetailsRepository.updateCluster(cellPciNeighbourString, clusterId.toString());
298 } catch (Exception e) {
305 private List<Map<CellPciPair, ArrayList<CellPciPair>>> getClusterMapsFromNotifications(List<String> notifications) {
306 ObjectMapper mapper = new ObjectMapper();
307 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = new ArrayList<>();
308 for (String notification : notifications) {
309 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap = new HashMap<>();
310 ClusterMap clusterMapJson = new ClusterMap();
312 clusterMapJson = mapper.readValue(notification, ClusterMap.class);
313 clusterMap.put(clusterMapJson.getCell(), clusterMapJson.getNeighbourList());
315 log.debug("clusterMap{}", clusterMap);
316 clusterMaps.add(clusterMap);
317 } catch (IOException e) {
318 log.error("Error parsing the buffered notification, skipping {}", e);
324 private Either<List<String>, Integer> getBufferedNotifications() {
325 log.info("Check if notifications are buffered");
326 BufferNotificationComponent bufferNotificationComponent = new BufferNotificationComponent();
327 ClusterDetailsComponent clusterDetailsComponent = new ClusterDetailsComponent();
328 String clusterId = clusterDetailsComponent.getClusterId(Thread.currentThread().getId());
329 List<String> bufferedNotifications = bufferNotificationComponent.getBufferedNotification(clusterId);
330 if (bufferedNotifications == null || bufferedNotifications.isEmpty()) {
331 return Either.right(404);
333 return Either.left(bufferedNotifications);
341 private void cleanup() {
342 log.info("cleaning up database and killing child thread");
343 List<String> childStatus = new ArrayList<>();
344 childStatus.add(Long.toString(Thread.currentThread().getId()));
345 childStatus.add("done");
347 childStatusUpdate.put(childStatus);
348 } catch (InterruptedException e) {
349 log.debug("InterruptedException during cleanup{}", e);
350 Thread.currentThread().interrupt();
353 ClusterDetailsRepository clusterDetailsRepository = BeanUtil.getBean(ClusterDetailsRepository.class);
354 clusterDetailsRepository.deleteByChildThreadId(threadId.getChildThreadId());
355 log.info("Child thread :{} {}", Thread.currentThread().getId(), "completed");
356 MDC.remove("logFileName");
361 * Buffer Notification.
363 public List<Map<CellPciPair, ArrayList<CellPciPair>>> bufferNotification() {
365 // Processing Buffered notifications
367 List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMapList = new ArrayList<>();
369 Configuration config = Configuration.getInstance();
371 int bufferTime = config.getBufferTime();
373 Timestamp currentTime = new Timestamp(System.currentTimeMillis());
374 log.debug("Current time {}", currentTime);
376 Timestamp laterTime = new Timestamp(System.currentTimeMillis());
377 log.debug("Later time {}", laterTime);
379 long difference = laterTime.getTime() - currentTime.getTime();
380 while (difference < bufferTime) {
383 } catch (InterruptedException e) {
384 log.error("InterruptedException {}", e);
385 Thread.currentThread().interrupt();
388 laterTime = new Timestamp(System.currentTimeMillis());
389 difference = laterTime.getTime() - currentTime.getTime();
391 log.debug("Timer has run for seconds {}", difference);
393 if (!queue.isEmpty()) {
394 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
395 clusterMap = queue.poll();
396 clusterMapList.add(clusterMap);
400 return clusterMapList;
404 * Check if ANR to be triggered.
406 public Either<List<AnrInput>, Integer> checkAnrTrigger(List<String> cellidList) {
408 List<AnrInput> anrInputList = new ArrayList<>();
409 Configuration configuration = Configuration.getInstance();
410 int poorThreshold = configuration.getPoorThreshold();
411 List<HoDetails> hoDetailsList;
412 Either<List<HoDetails>, Integer> response;
413 for (String cellId : cellidList) {
414 response = hoMetricsComponent.getHoMetrics(cellId);
415 List<String> removeableNeighbors = new ArrayList<>();
416 if (response.isLeft()) {
417 hoDetailsList = response.left().value();
418 for (HoDetails hoDetail : hoDetailsList) {
419 if (hoDetail.getSuccessRate() < poorThreshold) {
420 removeableNeighbors.add(hoDetail.getDstCellId());
424 if (response.right().value() == 400) {
425 log.error("Error in getting HO details from db");
426 return Either.right(500);
428 log.info("no HO metrics found");
432 if (!removeableNeighbors.isEmpty()) {
433 AnrInput anrInput = new AnrInput(cellId, removeableNeighbors);
434 anrInputList.add(anrInput);
437 if (!anrInputList.isEmpty()) {
438 return Either.left(anrInputList);
440 return Either.right(404);