Fix bugs and formatting issues
[dcaegen2/services/son-handler.git] / src / main / java / org / onap / dcaegen2 / services / sonhms / child / ChildThread.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  son-handler
4  *  ================================================================================
5  *   Copyright (C) 2019 Wipro Limited.
6  *   ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *  
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *  
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *  
20  *******************************************************************************/
21
22 package org.onap.dcaegen2.services.sonhms.child;
23
24 import com.fasterxml.jackson.databind.ObjectMapper;
25
26 import fj.data.Either;
27
28 import java.io.IOException;
29 import java.sql.Timestamp;
30 import java.util.ArrayList;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.UUID;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
39
40 import org.onap.dcaegen2.services.sonhms.BufferNotificationComponent;
41 import org.onap.dcaegen2.services.sonhms.ClusterDetailsComponent;
42 import org.onap.dcaegen2.services.sonhms.ConfigPolicy;
43 import org.onap.dcaegen2.services.sonhms.Configuration;
44 import org.onap.dcaegen2.services.sonhms.HoMetricsComponent;
45 import org.onap.dcaegen2.services.sonhms.dao.ClusterDetailsRepository;
46 import org.onap.dcaegen2.services.sonhms.dao.SonRequestsRepository;
47 import org.onap.dcaegen2.services.sonhms.dmaap.PolicyDmaapClient;
48 import org.onap.dcaegen2.services.sonhms.exceptions.ConfigDbNotFoundException;
49 import org.onap.dcaegen2.services.sonhms.model.AnrInput;
50 import org.onap.dcaegen2.services.sonhms.model.CellPciPair;
51 import org.onap.dcaegen2.services.sonhms.model.ClusterMap;
52 import org.onap.dcaegen2.services.sonhms.model.Flag;
53 import org.onap.dcaegen2.services.sonhms.model.HoDetails;
54 import org.onap.dcaegen2.services.sonhms.model.ThreadId;
55 import org.onap.dcaegen2.services.sonhms.restclient.AsyncResponseBody;
56 import org.onap.dcaegen2.services.sonhms.utils.BeanUtil;
57 import org.onap.dcaegen2.services.sonhms.utils.ClusterUtils;
58 import org.slf4j.Logger;
59 import org.slf4j.MDC;
60
61 public class ChildThread implements Runnable {
62
63     private BlockingQueue<List<String>> childStatusUpdate;
64     private BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue = new LinkedBlockingQueue<>();
65
66     private static Map<Long, AsyncResponseBody> responseMap = new HashMap<>();
67     private Graph cluster;
68     private ThreadId threadId;
69     Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
70     HoMetricsComponent hoMetricsComponent;
71     private static final Logger log = org.slf4j.LoggerFactory.getLogger(ChildThread.class);
72
73     /**
74      * Constructor with parameters.
75      */
76     public ChildThread(BlockingQueue<List<String>> childStatusUpdate, Graph cluster,
77             BlockingQueue<Map<CellPciPair, ArrayList<CellPciPair>>> queue, ThreadId threadId,
78             HoMetricsComponent hoMetricsComponent) {
79         super();
80         this.childStatusUpdate = childStatusUpdate;
81         this.queue = queue;
82         this.threadId = threadId;
83         this.cluster = cluster;
84         this.hoMetricsComponent = hoMetricsComponent;
85     }
86
87     public ChildThread() {
88
89     }
90
91     /**
92      * Puts notification in queue.
93      */
94     // change this interface to send cell and neighbours to keep it generic for sdnr
95     // and fm
96
97     public void putInQueue(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
98         try {
99             queue.put(clusterMap);
100         } catch (InterruptedException e) {
101             log.error(" The Thread is Interrupted", e);
102             Thread.currentThread().interrupt();
103         }
104     }
105
106     /**
107      * Puts notification in queue with notify.
108      */
109     public void putInQueueWithNotify(Map<CellPciPair, ArrayList<CellPciPair>> clusterMap) {
110         synchronized (queue) {
111             try {
112                 queue.put(clusterMap);
113                 queue.notifyAll();
114             } catch (InterruptedException e) {
115                 log.error(" The Thread is Interrupted", e);
116                 Thread.currentThread().interrupt();
117             }
118
119         }
120
121     }
122
123     /**
124      * Puts response in queue.
125      */
126     public static void putResponse(Long threadId, AsyncResponseBody obj) {
127         synchronized (responseMap) {
128             responseMap.put(threadId, obj);
129         }
130
131     }
132
133     public static Map<Long, AsyncResponseBody> getResponseMap() {
134         return responseMap;
135     }
136
137     @Override
138     public void run() {
139
140         threadId.setChildThreadId(Thread.currentThread().getId());
141         synchronized (threadId) {
142             threadId.notifyAll();
143         }
144
145         MDC.put("logFileName", Thread.currentThread().getName());
146         log.info("Starting child thread");
147
148         StateOof oof = new StateOof(childStatusUpdate);
149         ClusterUtils clusterUtils = new ClusterUtils();
150         Detection detect = new Detection();
151         ChildThreadUtils childUtils = new ChildThreadUtils(ConfigPolicy.getInstance(), new PnfUtils(),
152                 new PolicyDmaapClient());
153
154         try {
155             String networkId = cluster.getNetworkId();
156
157             Boolean done = false;
158
159             Map<String, ArrayList<Integer>> collisionConfusionResult;
160             while (!done) {
161                 if (cluster.getCollisionConfusionMap().isEmpty()) {
162
163                     collisionConfusionResult = detect.detectCollisionConfusion(cluster);
164                 } else {
165                     collisionConfusionResult = cluster.getCollisionConfusionMap();
166                 }
167
168                 Boolean trigger = childUtils.triggerOrWait(collisionConfusionResult);
169                 ConfigPolicy configPolicy = ConfigPolicy.getInstance();
170                 int timer = 60;
171                 try {
172                     timer = (int) configPolicy.getConfig().get("PCI_NEIGHBOR_CHANGE_CLUSTER_TIMEOUT_IN_SECS");
173                 } catch (NullPointerException e) {
174                     log.info("Policy config not available. Using default timeout - 60 seconds");
175                 }
176                 if (!trigger) {
177                     try {
178                         Thread.sleep((long) timer * 1000);
179                     } catch (InterruptedException e) {
180                         log.error("Interrupted Exception while waiting for more notifications {}", e);
181                         Thread.currentThread().interrupt();
182                     }
183
184                     while (!queue.isEmpty()) {
185                         Map<CellPciPair, ArrayList<CellPciPair>> newNotification;
186                         newNotification = queue.poll();
187                         log.info("New notification from SDNR {}", newNotification);
188                         cluster = clusterUtils.modifyCluster(cluster, newNotification);
189
190                         // update cluster in DB
191                         clusterUtils.updateCluster(cluster);
192                         collisionConfusionResult = detect.detectCollisionConfusion(cluster);
193
194                     }
195
196                 }
197                 ArrayList<String> cellidList = new ArrayList<>();
198                 ArrayList<String> cellIds = new ArrayList<>();
199
200                 for (Map.Entry<String, ArrayList<Integer>> entry : collisionConfusionResult.entrySet()) {
201                     String key = entry.getKey();
202                     ArrayList<Integer> arr;
203                     arr = entry.getValue();
204                     if (!arr.isEmpty()) {
205                         Set<Integer> set = new HashSet<>(arr);
206                         if (((set.size() == 1) && !set.contains(0)) || (set.size() != 1)) {
207                             cellIds.add(key);
208
209                         }
210                     }
211
212                 }
213
214                 for (String cell : cellIds) {
215                     log.debug("cellidList entries: {}", cell);
216                     cellidList.add(cell);
217                 }
218                 UUID transactionId;
219                 
220                 Flag policyTriggerFlag = BeanUtil.getBean(Flag.class);
221                 while (policyTriggerFlag.getHolder().equals("PM")) {
222                     Thread.sleep(100);
223                 }
224                 policyTriggerFlag.setHolder("CHILD");
225                 policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() + 1);
226                 
227                 Either<List<AnrInput>, Integer> anrTriggerResponse = checkAnrTrigger(cellidList);
228                 if (anrTriggerResponse.isRight()) {
229
230                     if (anrTriggerResponse.right().value() == 404) {
231                         log.debug("No poor neighbors found");
232                     } else if (anrTriggerResponse.right().value() == 500) {
233                         log.debug("Failed to fetch HO details from DB ");
234                     }
235
236                     transactionId = oof.triggerOof(cellidList, networkId, new ArrayList<>());
237                 } else {
238                     List<AnrInput> anrInputList = anrTriggerResponse.left().value();
239                     log.info("Trigger oof for joint optimization");
240                     transactionId = oof.triggerOof(cellidList, networkId, anrInputList);
241                 }
242                 long childThreadId = Thread.currentThread().getId();
243                 childUtils.saveRequest(transactionId.toString(), childThreadId);
244                 while (!ChildThread.getResponseMap().containsKey(childThreadId)) {
245                     Thread.sleep(100);
246                 }
247
248                 AsyncResponseBody asynResponseBody = ChildThread.getResponseMap().get(childThreadId);
249
250                 
251
252                 try {
253                     childUtils.sendToPolicy(asynResponseBody);
254                     policyTriggerFlag.setNumChilds(policyTriggerFlag.getNumChilds() - 1);
255                     if (policyTriggerFlag.getNumChilds() == 0) {
256                         policyTriggerFlag.setHolder("NONE");
257                     }
258                     
259
260                 } catch (ConfigDbNotFoundException e1) {
261                     log.debug("Config DB is unreachable: {}", e1);
262                 }
263
264                 SonRequestsRepository sonRequestsRepository = BeanUtil.getBean(SonRequestsRepository.class);
265                 sonRequestsRepository.deleteByChildThreadId(childThreadId);
266
267                 List<String> childStatus = new ArrayList<>();
268                 childStatus.add(Long.toString(Thread.currentThread().getId()));
269                 childStatus.add("success");
270                 try {
271                     childStatusUpdate.put(childStatus);
272                 } catch (InterruptedException e) {
273                     log.debug("InterruptedException during childStatus update {}", e);
274                     Thread.currentThread().interrupt();
275
276                 }
277
278                 Either<List<String>, Integer> bufferedNotifications = getBufferedNotifications();
279
280                 if (bufferedNotifications.isRight()) {
281                     log.info("No buffered notifications");
282                     done = true;
283                 } else {
284                     List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = getClusterMapsFromNotifications(
285                             bufferedNotifications.left().value());
286                     for (Map<CellPciPair, ArrayList<CellPciPair>> bufferedClusterMap : clusterMaps) {
287                         cluster = clusterUtils.modifyCluster(cluster, bufferedClusterMap);
288                     }
289                     String cellPciNeighbourString = cluster.getPciNeighbourJson();
290                     UUID clusterId = cluster.getGraphId();
291                     ClusterDetailsRepository clusterDetailsRepository = BeanUtil
292                             .getBean(ClusterDetailsRepository.class);
293                     clusterDetailsRepository.updateCluster(cellPciNeighbourString, clusterId.toString());
294                 }
295
296             }
297
298         } catch (Exception e) {
299             log.error("{}", e);
300         }
301
302         cleanup();
303     }
304
305     private List<Map<CellPciPair, ArrayList<CellPciPair>>> getClusterMapsFromNotifications(List<String> notifications) {
306         ObjectMapper mapper = new ObjectMapper();
307         List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMaps = new ArrayList<>();
308         for (String notification : notifications) {
309             Map<CellPciPair, ArrayList<CellPciPair>> clusterMap = new HashMap<>();
310             ClusterMap clusterMapJson = new ClusterMap();
311             try {
312                 clusterMapJson = mapper.readValue(notification, ClusterMap.class);
313                 clusterMap.put(clusterMapJson.getCell(), clusterMapJson.getNeighbourList());
314
315                 log.debug("clusterMap{}", clusterMap);
316                 clusterMaps.add(clusterMap);
317             } catch (IOException e) {
318                 log.error("Error parsing the buffered notification, skipping {}", e);
319             }
320         }
321         return clusterMaps;
322     }
323
324     private Either<List<String>, Integer> getBufferedNotifications() {
325         log.info("Check if notifications are buffered");
326         BufferNotificationComponent bufferNotificationComponent = new BufferNotificationComponent();
327         ClusterDetailsComponent clusterDetailsComponent = new ClusterDetailsComponent();
328         String clusterId = clusterDetailsComponent.getClusterId(Thread.currentThread().getId());
329         List<String> bufferedNotifications = bufferNotificationComponent.getBufferedNotification(clusterId);
330         if (bufferedNotifications == null || bufferedNotifications.isEmpty()) {
331             return Either.right(404);
332         } else {
333             return Either.left(bufferedNotifications);
334         }
335
336     }
337
338     /**
339      * cleanup resources.
340      */
341     private void cleanup() {
342         log.info("cleaning up database and killing child thread");
343         List<String> childStatus = new ArrayList<>();
344         childStatus.add(Long.toString(Thread.currentThread().getId()));
345         childStatus.add("done");
346         try {
347             childStatusUpdate.put(childStatus);
348         } catch (InterruptedException e) {
349             log.debug("InterruptedException during cleanup{}", e);
350             Thread.currentThread().interrupt();
351
352         }
353         ClusterDetailsRepository clusterDetailsRepository = BeanUtil.getBean(ClusterDetailsRepository.class);
354         clusterDetailsRepository.deleteByChildThreadId(threadId.getChildThreadId());
355         log.info("Child thread :{} {}", Thread.currentThread().getId(), "completed");
356         MDC.remove("logFileName");
357
358     }
359
360     /**
361      * Buffer Notification.
362      */
363     public List<Map<CellPciPair, ArrayList<CellPciPair>>> bufferNotification() {
364
365         // Processing Buffered notifications
366
367         List<Map<CellPciPair, ArrayList<CellPciPair>>> clusterMapList = new ArrayList<>();
368
369         Configuration config = Configuration.getInstance();
370
371         int bufferTime = config.getBufferTime();
372
373         Timestamp currentTime = new Timestamp(System.currentTimeMillis());
374         log.debug("Current time {}", currentTime);
375
376         Timestamp laterTime = new Timestamp(System.currentTimeMillis());
377         log.debug("Later time {}", laterTime);
378
379         long difference = laterTime.getTime() - currentTime.getTime();
380         while (difference < bufferTime) {
381             try {
382                 Thread.sleep(1000);
383             } catch (InterruptedException e) {
384                 log.error("InterruptedException {}", e);
385                 Thread.currentThread().interrupt();
386
387             }
388             laterTime = new Timestamp(System.currentTimeMillis());
389             difference = laterTime.getTime() - currentTime.getTime();
390
391             log.debug("Timer has run for  seconds {}", difference);
392
393             if (!queue.isEmpty()) {
394                 Map<CellPciPair, ArrayList<CellPciPair>> clusterMap;
395                 clusterMap = queue.poll();
396                 clusterMapList.add(clusterMap);
397             }
398
399         }
400         return clusterMapList;
401     }
402
403     /**
404      * Check if ANR to be triggered.
405      */
406     public Either<List<AnrInput>, Integer> checkAnrTrigger(List<String> cellidList) {
407
408         List<AnrInput> anrInputList = new ArrayList<>();
409         Configuration configuration = Configuration.getInstance();
410         int poorThreshold = configuration.getPoorThreshold();
411         List<HoDetails> hoDetailsList;
412         Either<List<HoDetails>, Integer> response;
413         for (String cellId : cellidList) {
414             response = hoMetricsComponent.getHoMetrics(cellId);
415             List<String> removeableNeighbors = new ArrayList<>();
416             if (response.isLeft()) {
417                 hoDetailsList = response.left().value();
418                 for (HoDetails hoDetail : hoDetailsList) {
419                     if (hoDetail.getSuccessRate() < poorThreshold) {
420                         removeableNeighbors.add(hoDetail.getDstCellId());
421                     }
422                 }
423             } else {
424                 if (response.right().value() == 400) {
425                     log.error("Error in getting HO details from db");
426                     return Either.right(500);
427                 } else {
428                     log.info("no HO metrics found");
429                 }
430             }
431
432             if (!removeableNeighbors.isEmpty()) {
433                 AnrInput anrInput = new AnrInput(cellId, removeableNeighbors);
434                 anrInputList.add(anrInput);
435             }
436         }
437         if (!anrInputList.isEmpty()) {
438             return Either.left(anrInputList);
439         }
440         return Either.right(404);
441     }
442 }