ef2a0b8dbdac3d8f7440f3c5d294531705c991eb
[ccsdk/features.git] /
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl;
19
20 import java.util.Iterator;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.container.AllPm;
28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public class PerformanceManagerTask implements Runnable {
33
34     private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagerTask.class);
35     private static final String LOGMARKER = "PMTick";
36
37     private int tickCounter = 0;
38
39     private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> queue = new ConcurrentHashMap<>();
40     private final MicrowaveHistoricalPerformanceWriterService databaseService;
41     private final ScheduledExecutorService scheduler;
42     private final long seconds;
43
44     private ScheduledFuture<?> taskHandle = null;
45     private Iterator<ONFCoreNetworkElementRepresentation> neIterator = null;
46     private ONFCoreNetworkElementRepresentation actualNE = null;
47
48     /**
49      * Constructor of PM Task
50      * @param seconds seconds to call PM Task
51      * @param databaseService DB Service to load PM data to
52      */
53
54     public PerformanceManagerTask(long seconds, MicrowaveHistoricalPerformanceWriterService databaseService) {
55
56         LOG.debug("Init task {}", PerformanceManagerTask.class.getSimpleName());
57         this.seconds = seconds;
58         this.databaseService = databaseService;
59         this.scheduler = Executors.newSingleThreadScheduledExecutor();
60
61     }
62
63     /**
64      * Start PM Task
65      */
66     public void start() {
67         LOG.info("PM task created");
68         taskHandle = this.scheduler.scheduleAtFixedRate(this, 0, seconds, TimeUnit.SECONDS);
69         LOG.info("PM task scheduled");
70     }
71
72     /**
73      * Stop everything
74      */
75     public void stop() {
76         LOG.info("Stop {}", PerformanceManagerImpl.class.getSimpleName());
77         if (taskHandle != null) {
78             taskHandle.cancel(true);
79             try {
80                 scheduler.awaitTermination(10, TimeUnit.SECONDS);
81              } catch (InterruptedException e) {
82                  LOG.debug("Schdule stopped.",e);
83                  // Restore interrupted state...
84                  Thread.currentThread().interrupt();
85              }
86         }
87     }
88
89     /**
90      * Add NE/Mountpoint to PM Processig
91      * @param mountPointNodeName to be added
92      * @param ne that is connected to the mountpoint
93      */
94     public void registration(String mountPointNodeName, ONFCoreNetworkElementRepresentation ne) {
95         queue.put(mountPointNodeName, ne);
96     }
97
98     /**
99      * Remove mountpoint/NE from PM process
100      * @param mountPointNodeName that has to be removed
101      */
102     public void deRegistration(String mountPointNodeName) {
103         LOG.debug("Deregister {}",mountPointNodeName);
104         ONFCoreNetworkElementRepresentation removedNE = queue.remove(mountPointNodeName);
105
106         if ( removedNE == null) {
107             LOG.warn("Couldn't delete {}",mountPointNodeName);
108         }
109     }
110
111
112     /*--------------------------------------------------------------
113      * Task to read PM data from NE
114      */
115
116     /**
117      * Task runner to read all performance data from Network Elements.
118      * Catch exceptions to make sure, that the Task is not stopped.
119      */
120     @Override
121     public void run() {
122
123         LOG.debug("{} start {} Start with mountpoint {}",LOGMARKER, tickCounter, actualNE == null ? "No NE" : actualNE.getMountPointNodeName());
124
125         //Proceed to next NE/Interface
126         getNextInterface();
127
128         LOG.debug("{} {} Next interface to handle {}", LOGMARKER, tickCounter,
129                 actualNE == null ? "No NE/IF" : actualNE.getMountPointNodeName() + " " + actualNE.pmStatusToString());
130
131         if (actualNE != null) {
132             try {
133                 LOG.debug("{} Start to read PM from NE ({})", LOGMARKER, tickCounter);
134                 AllPm allPm = actualNE.getHistoricalPM();
135                 LOG.debug("{} {} Got PM list. Start write to DB", LOGMARKER, tickCounter);
136                 databaseService.writePM(allPm);
137                 LOG.debug("{} {} PM List end.", LOGMARKER, tickCounter);
138             } catch (Exception e) {
139                 LOG.warn("{} {} PM read/write failed. Write log entry {}", LOGMARKER, tickCounter, e);
140                 String msg = e.getMessage();
141                 if (msg == null || msg.isEmpty()) {
142                     if (e.getCause() != null) {
143                         msg = e.getCause().toString();
144                     }
145                     if (msg == null || msg.isEmpty()){
146                         msg = "No message or cause";
147                     }
148                 }
149                 databaseService.writePMLog(actualNE.getMountPointNodeName(), actualNE.pmStatusToString(), msg);
150             }
151         }
152
153         LOG.debug("{} end {}",LOGMARKER, tickCounter);
154         tickCounter++;
155     }
156
157     /**
158      * Reset queue to start from beginning
159      */
160     private void resetQueue() {
161         actualNE = null;
162         neIterator = null;
163     }
164
165     /**
166      * Get then next interface in the list.
167      * First try to find a next on the actual NE.
168      * If not available search next interface at a NE
169      * Special Situations to handle: Empty queue, NEs, but no interfaces
170      */
171     private void getNextInterface() {
172         boolean started = false;
173         int loopCounter = 0;
174
175         LOG.debug("{} {} getNextInterface enter. Queue size {} ", LOGMARKER, tickCounter, queue.size());
176
177         if (actualNE != null && !queue.containsValue(actualNE)) {
178             LOG.debug("{} {} NE Removed duringprocessing A",LOGMARKER, tickCounter);
179             resetQueue();
180         }
181
182         while (true) {
183
184             if (loopCounter++ >= 1000) {
185                 LOG.error("{} {} Problem in PM iteration. endless condition reached", LOGMARKER, tickCounter);
186                 resetQueue();
187                 break;
188             }
189
190             LOG.debug("{} {} Loop ne {}:neiterator {}:Interfaceiterator:{} Loop:{}",
191                     LOGMARKER,
192                     tickCounter,
193                     actualNE == null? "null" : actualNE.getMountPointNodeName(),
194                     neIterator == null ? "null" : neIterator.hasNext(),
195                     actualNE == null ? "null" : actualNE.hasNext(),
196                     loopCounter);
197
198             if (actualNE != null && actualNE.hasNext()) {
199                 // Yes, there is an interface, deliver back
200                 LOG.debug("{} {} getNextInterface yes A",LOGMARKER, tickCounter);
201                 actualNE.next();
202                 break;
203
204             } else {
205                 // No element in neInterfaceInterator .. get next NE and try
206                 if (neIterator != null && neIterator.hasNext()) {
207                     // Set a new NE
208                     LOG.debug("{} {} Next NE A",LOGMARKER, tickCounter);
209                     actualNE = neIterator.next();
210                     actualNE.resetPMIterator();
211
212                 } else {
213                     // Goto start condition 1) first entry 2) end of queue reached
214                     LOG.debug("{} {} Reset",LOGMARKER, tickCounter);
215                     resetQueue();
216
217                     if (queue.isEmpty()) {
218                         LOG.debug("{} {} no nextInterfac. queue empty",LOGMARKER, tickCounter);
219                         break;
220                     } else if (!started){
221                         LOG.debug("{} {} getNextInterface start condition. Get interator.",LOGMARKER, tickCounter);
222                         neIterator = queue.values().iterator();
223                         started = true;
224                     } else {
225                         LOG.debug("{} {} no nextInterface",LOGMARKER, tickCounter);
226                         break;
227                     }
228                 }
229             }
230         } //while
231
232         if (actualNE != null && !queue.containsValue(actualNE)) {
233             LOG.debug("{} {} NE Removed duringprocessing B",LOGMARKER, tickCounter);
234             resetQueue();
235         }
236
237     }
238 }