a85df63a32e648858ef3b1e7266fc70d3d06f827
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl;
19
20 import java.util.Iterator;
21 import java.util.Optional;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.DataProvider;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.ne.service.NetworkElement;
29 import org.onap.ccsdk.features.sdnr.wt.devicemanager.ne.service.PerformanceDataProvider;
30 import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService;
31 import org.onap.ccsdk.features.sdnr.wt.devicemanager.types.PerformanceDataLtp;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class PerformanceManagerTask implements Runnable {
36
37     private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagerTask.class);
38     private static final String LOGMARKER = "PMTick";
39
40     private int tickCounter = 0;
41
42     private final ConcurrentHashMap<String, PerformanceDataProvider> queue = new ConcurrentHashMap<>();
43     private final DataProvider databaseService;
44     private final ScheduledExecutorService scheduler;
45     private final long seconds;
46
47     private ScheduledFuture<?> taskHandle = null;
48     private Iterator<PerformanceDataProvider> neIterator = null;
49     private PerformanceDataProvider actualNE = null;
50     private final NetconfNetworkElementService netconfNetworkElementService;
51
52     /**
53      * Constructor of PM Task
54      * @param seconds seconds to call PM Task
55      * @param microwaveHistoricalPerformanceWriterService DB Service to load PM data to
56      * @param netconfNetworkElementService to write into log
57      */
58
59     public PerformanceManagerTask(long seconds,
60             DataProvider microwaveHistoricalPerformanceWriterService,
61             NetconfNetworkElementService netconfNetworkElementService) {
62
63         LOG.debug("Init task {}", PerformanceManagerTask.class.getSimpleName());
64         this.seconds = seconds;
65         this.databaseService = microwaveHistoricalPerformanceWriterService;
66         this.scheduler = Executors.newSingleThreadScheduledExecutor();
67         this.netconfNetworkElementService = netconfNetworkElementService;
68
69     }
70
71     /**
72      * Start PM Task
73      */
74     public void start() {
75         LOG.info("PM task created");
76         taskHandle = this.scheduler.scheduleAtFixedRate(this, 0, seconds, TimeUnit.SECONDS);
77         LOG.info("PM task scheduled");
78     }
79
80     /**
81      * Stop everything
82      */
83     public void stop() {
84         LOG.info("Stop {}", PerformanceManagerImpl.class.getSimpleName());
85         if (taskHandle != null) {
86             taskHandle.cancel(true);
87             try {
88                 scheduler.awaitTermination(10, TimeUnit.SECONDS);
89              } catch (InterruptedException e) {
90                  LOG.debug("Schdule stopped.",e);
91                  // Restore interrupted state...
92                  Thread.currentThread().interrupt();
93              }
94         }
95     }
96
97     /**
98      * Add NE/Mountpoint to PM Processig
99      * @param mountPointNodeName to be added
100      * @param ne that is connected to the mountpoint
101      */
102     public void registration(String mountPointNodeName, NetworkElement ne) {
103
104         Optional<PerformanceDataProvider> oPmNe = ne.getService(PerformanceDataProvider.class);
105         if (oPmNe.isPresent()) {
106             queue.put(mountPointNodeName, oPmNe.get());
107         }
108     }
109
110     /**
111      * Remove mountpoint/NE from PM process
112      * @param mountPointNodeName that has to be removed
113      */
114     public void deRegistration(String mountPointNodeName) {
115         LOG.debug("Deregister {}",mountPointNodeName);
116         PerformanceDataProvider removedNE = queue.remove(mountPointNodeName);
117
118         if ( removedNE == null) {
119             LOG.warn("Couldn't delete {}",mountPointNodeName);
120         }
121     }
122
123
124     /*--------------------------------------------------------------
125      * Task to read PM data from NE
126      */
127
128     /**
129      * Task runner to read all performance data from Network Elements.
130      * Catch exceptions to make sure, that the Task is not stopped.
131      */
132     @Override
133     public void run() {
134
135         String mountpointName = "No NE";
136         if (actualNE != null && actualNE.getAcessor().isPresent()) {
137             mountpointName = actualNE.getAcessor().get().getNodeId().getValue();
138         }
139         LOG.debug("{} start {} Start with mountpoint {}",LOGMARKER, tickCounter, mountpointName);
140
141         //Proceed to next NE/Interface
142         getNextInterface(mountpointName);
143
144         LOG.debug("{} {} Next interface to handle {}", LOGMARKER, tickCounter,
145                 actualNE == null ? "No NE/IF" : actualNE.pmStatusToString());
146
147         if (actualNE != null) {
148             try {
149                 LOG.debug("{} Start to read PM from NE ({})", LOGMARKER, tickCounter);
150                 Optional<PerformanceDataLtp> allPm = actualNE.getLtpHistoricalPerformanceData();
151                 if (allPm.isPresent()) {
152                     LOG.debug("{} {} Got PM list. Start write to DB", LOGMARKER, tickCounter);
153                     databaseService.doWritePerformanceData(allPm.get().getList());
154                 }
155                 LOG.debug("{} {} PM List end.", LOGMARKER, tickCounter);
156             } catch (Exception e) {
157                 StringBuffer msg = new StringBuffer();
158                 msg.append(e.getMessage());
159                 msg.append(" - ");
160                 msg.append(e.getCause().toString());
161                 msg.append(" - ");
162                 msg.append(actualNE.pmStatusToString());
163                 String msgString = msg.toString();
164                 LOG.warn("{} {} PM read/write failed. Write log entry {}", LOGMARKER, tickCounter, msgString);
165                 netconfNetworkElementService.writeToEventLog(mountpointName, "PM Problem", msgString);
166             }
167         }
168
169         LOG.debug("{} end {}",LOGMARKER, tickCounter);
170         tickCounter++;
171     }
172
173     /**
174      * Reset queue to start from beginning
175      */
176     private void resetQueue() {
177         actualNE = null;
178         neIterator = null;
179     }
180
181     /**
182      * Get then next interface in the list.
183      * First try to find a next on the actual NE.
184      * If not available search next interface at a NE
185      * Special Situations to handle: Empty queue, NEs, but no interfaces
186      */
187     private void getNextInterface(String mountpointName) {
188         boolean started = false;
189         int loopCounter = 0;
190
191         LOG.debug("{} {} getNextInterface enter. Queue size {} ", LOGMARKER, tickCounter, queue.size());
192
193         if (actualNE != null && !queue.containsValue(actualNE)) {
194             LOG.debug("{} {} NE Removed duringprocessing A",LOGMARKER, tickCounter);
195             resetQueue();
196         }
197
198         while (true) {
199
200             if (loopCounter++ >= 1000) {
201                 LOG.error("{} {} Problem in PM iteration. endless condition reached", LOGMARKER, tickCounter);
202                 resetQueue();
203                 break;
204             }
205
206             LOG.debug("{} {} Loop ne {}:neiterator {}:Interfaceiterator:{} Loop:{}",
207                     LOGMARKER,
208                     tickCounter,
209                     actualNE == null? "null" : mountpointName,
210                     neIterator == null ? "null" : neIterator.hasNext(),
211                     actualNE == null ? "null" : actualNE.hasNext(),
212                     loopCounter);
213
214             if (actualNE != null && actualNE.hasNext()) {
215                 // Yes, there is an interface, deliver back
216                 LOG.debug("{} {} getNextInterface yes A",LOGMARKER, tickCounter);
217                 actualNE.next();
218                 break;
219
220             } else {
221                 // No element in neInterfaceInterator .. get next NE and try
222                 if (neIterator != null && neIterator.hasNext()) {
223                     // Set a new NE
224                     LOG.debug("{} {} Next NE A",LOGMARKER, tickCounter);
225                     actualNE = neIterator.next();
226                     actualNE.resetPMIterator();
227
228                 } else {
229                     // Goto start condition 1) first entry 2) end of queue reached
230                     LOG.debug("{} {} Reset",LOGMARKER, tickCounter);
231                     resetQueue();
232
233                     if (queue.isEmpty()) {
234                         LOG.debug("{} {} no nextInterfac. queue empty",LOGMARKER, tickCounter);
235                         break;
236                     } else if (!started){
237                         LOG.debug("{} {} getNextInterface start condition. Get interator.",LOGMARKER, tickCounter);
238                         neIterator = queue.values().iterator();
239                         started = true;
240                     } else {
241                         LOG.debug("{} {} no nextInterface",LOGMARKER, tickCounter);
242                         break;
243                     }
244                 }
245             }
246         } //while
247
248         if (actualNE != null && !queue.containsValue(actualNE)) {
249             LOG.debug("{} {} NE Removed duringprocessing B",LOGMARKER, tickCounter);
250             resetQueue();
251         }
252
253     }
254 }