Son-handler microservice seed code
[dcaegen2/services/son-handler.git] / src / main / java / com / wipro / www / sonhms / child / ChildThread.java
1 /*******************************************************************************
2  * ============LICENSE_START=======================================================
3  * pcims
4  *  ================================================================================
5  *  Copyright (C) 2018 Wipro Limited.
6  *  ==============================================================================
7  *   Licensed under the Apache License, Version 2.0 (the "License");
8  *   you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *        http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *   Unless required by applicable law or agreed to in writing, software
14  *   distributed under the License is distributed on an "AS IS" BASIS,
15  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *   See the License for the specific language governing permissions and
17  *   limitations under the License.
18  *   ============LICENSE_END=========================================================
19  ******************************************************************************/
20
21 package com.wipro.www.sonhms.child;
22
23
24 import com.wipro.www.sonhms.Configuration;
25 import com.wipro.www.sonhms.dao.ClusterDetailsRepository;
26 import com.wipro.www.sonhms.model.FapServiceList;
27 import com.wipro.www.sonhms.model.ThreadId;
28 import com.wipro.www.sonhms.restclient.AsyncResponseBody;
29 import com.wipro.www.sonhms.utils.BeanUtil;
30
31 import java.sql.Timestamp;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.UUID;
37 import java.util.concurrent.BlockingQueue;
38 import java.util.concurrent.LinkedBlockingQueue;
39 import org.slf4j.Logger;
40 import org.slf4j.MDC;
41
42
43 public class ChildThread implements Runnable {
44
45     private BlockingQueue<List<String>> childStatusUpdate;
46     private BlockingQueue<FapServiceList> queue = new LinkedBlockingQueue<>();
47     // static BlockingQueue<AsyncResponseBody> asynchronousResponse = new
48     // LinkedBlockingQueue<>();
49     private static Map<Long, AsyncResponseBody> responseMap = new HashMap<>();
50     private Graph cluster;
51     private ThreadId threadId;
52     FapServiceList fapServiceList = new FapServiceList();
53     private static final Logger log = org.slf4j.LoggerFactory.getLogger(ChildThread.class);
54
55     /**
56      * Constructor with parameters.
57      */
58     public ChildThread(BlockingQueue<List<String>> childStatusUpdate, Graph cluster,
59             BlockingQueue<FapServiceList> queue, ThreadId threadId) {
60         super();
61         this.childStatusUpdate = childStatusUpdate;
62         this.queue = queue;
63         this.threadId = threadId;
64         this.cluster = cluster;
65     }
66
67     /**
68      * Puts notification in queue.
69      */
70     public void putInQueue(FapServiceList fapserviceList) {
71         try {
72             queue.put(fapserviceList);
73         } catch (InterruptedException e) {
74             log.error(" The Thread is Interrupted", e);
75             Thread.currentThread().interrupt();
76         }
77     }
78
79     /**
80      * Puts notification in queue with notify.
81      */
82     public void putInQueueWithNotify(FapServiceList fapserviceList) {
83         synchronized (queue) {
84             try {
85                 queue.put(fapserviceList);
86                 queue.notifyAll();
87             } catch (InterruptedException e) {
88                 log.error(" The Thread is Interrupted", e);
89                 Thread.currentThread().interrupt();
90             }
91
92         }
93
94     }
95
96     /**
97      * Puts response in queue.
98      */
99     public static void putResponse(Long threadId, AsyncResponseBody obj) {
100         synchronized (responseMap) {
101             responseMap.put(threadId, obj);
102         }
103
104     }
105
106     public static Map<Long, AsyncResponseBody> getResponseMap() {
107         return responseMap;
108     }
109
110     @Override
111     public void run() {
112
113         threadId.setChildThreadId(Thread.currentThread().getId());
114         synchronized (threadId) {
115             threadId.notifyAll();
116         }
117
118         MDC.put("logFileName", Thread.currentThread().getName());
119         log.debug("Starting child thread");
120
121         try {
122             fapServiceList = queue.take();
123             if (log.isDebugEnabled()) {
124                 log.debug("fapServicelist: {}", fapServiceList);
125             }
126         } catch (InterruptedException e1) {
127             log.error("InterruptedException is {}", e1);
128             Thread.currentThread().interrupt();
129         }
130
131         ClusterFormation clusterFormation = new ClusterFormation(queue);
132         StateOof oof = new StateOof(childStatusUpdate);
133         ClusterModification clusterModification = new ClusterModification();
134         Detection detect = new Detection();
135
136         try {
137             String networkId = fapServiceList.getCellConfig().getLte().getRan().getNeighborListInUse()
138                     .getLteNeighborListInUseLteCell().get(0).getPlmnid();
139
140             Boolean done = false;
141
142             while (!done) {
143
144                 Map<String, ArrayList<Integer>> collisionConfusionResult = detect.detectCollisionConfusion(cluster);
145                 Boolean trigger = clusterFormation.triggerOrWait(collisionConfusionResult);
146
147                 if (!trigger) {
148                     collisionConfusionResult = clusterFormation.waitForNotification(collisionConfusionResult, cluster);
149                 }
150                 oof.triggerOof(collisionConfusionResult, networkId);
151
152                 if (isNotificationsBuffered()) {
153                     List<FapServiceList> fapServiceLists = bufferNotification();
154                     for (FapServiceList fapService : fapServiceLists) {
155                         cluster = clusterModification.clustermod(cluster, fapService);
156                     }
157                     String cellPciNeighbourString = cluster.getPciNeighbourJson();
158                     UUID clusterId = cluster.getGraphId();
159                     ClusterDetailsRepository clusterDetailsRepository = BeanUtil
160                             .getBean(ClusterDetailsRepository.class);
161                     clusterDetailsRepository.updateCluster(cellPciNeighbourString, clusterId.toString());
162
163                 } else {
164                     done = true;
165                 }
166
167             }
168
169         } catch (Exception e) {
170             log.error("{}", e);
171         }
172
173         cleanup();
174     }
175
176     private boolean isNotificationsBuffered() {
177         synchronized (queue) {
178
179             try {
180                 while (queue.isEmpty()) {
181                     queue.wait();
182                 }
183             } catch (InterruptedException e) {
184                 Thread.currentThread().interrupt();
185                 return false;
186             }
187         }
188         return true;
189     }
190
191     /**
192      * cleanup resources.
193      */
194     private void cleanup() {
195         log.debug("cleaning up database and killing child thread");
196         ClusterDetailsRepository clusterDetailsRepository = BeanUtil.getBean(ClusterDetailsRepository.class);
197         clusterDetailsRepository.deleteByChildThreadId(threadId.getChildThreadId());
198         log.debug("Child thread :{} {}", Thread.currentThread().getId(), "completed");
199         MDC.remove("logFileName");
200
201     }
202
203     /**
204      * Buffer Notification.
205      */
206     public List<FapServiceList> bufferNotification() {
207
208         // Processing Buffered notifications
209
210         List<FapServiceList> fapServiceLists = new ArrayList<>();
211
212         Configuration config = Configuration.getInstance();
213
214         int bufferTime = config.getBufferTime();
215
216         Timestamp currentTime = new Timestamp(System.currentTimeMillis());
217         log.debug("Current time {}", currentTime);
218
219         Timestamp laterTime = new Timestamp(System.currentTimeMillis());
220         log.debug("Later time {}", laterTime);
221
222         long difference = laterTime.getTime() - currentTime.getTime();
223         while (difference < bufferTime) {
224             try {
225                 Thread.sleep(1000);
226             } catch (InterruptedException e) {
227                 log.error("InterruptedException {}", e);
228                 Thread.currentThread().interrupt();
229
230             }
231             laterTime = new Timestamp(System.currentTimeMillis());
232             difference = laterTime.getTime() - currentTime.getTime();
233
234             log.debug("Timer has run for  seconds {}", difference);
235
236             if (!queue.isEmpty()) {
237                 FapServiceList fapService;
238                 fapService = queue.poll();
239                 fapServiceLists.add(fapService);
240             }
241         }
242         return fapServiceLists;
243     }
244
245 }