1 package org.opendaylight.mwtn.performancemanager.impl;
3 import java.util.Iterator;
4 import java.util.concurrent.ConcurrentHashMap;
5 import java.util.concurrent.Executors;
6 import java.util.concurrent.ScheduledExecutorService;
7 import java.util.concurrent.ScheduledFuture;
8 import java.util.concurrent.TimeUnit;
10 import org.opendaylight.mwtn.base.netconf.AllPm;
11 import org.opendaylight.mwtn.base.netconf.ONFCoreNetworkElementRepresentation;
12 import org.opendaylight.mwtn.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
16 public class PerformanceManagerTask implements Runnable {
18 private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagerTask.class);
19 private static final String LOGMARKER = "PMTick";
21 private int tickCounter = 0;
23 private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> queue = new ConcurrentHashMap<>();
24 private final MicrowaveHistoricalPerformanceWriterService databaseService;
25 private final ScheduledExecutorService scheduler;
28 private ScheduledFuture<?> taskHandle = null;
29 private Iterator<ONFCoreNetworkElementRepresentation> neIterator = null;
30 private ONFCoreNetworkElementRepresentation actualNE = null;
33 * Constructor of PM Task
34 * @param seconds seconds to call PM Task
35 * @param databaseService DB Service to load PM data to
38 public PerformanceManagerTask(long seconds, MicrowaveHistoricalPerformanceWriterService databaseService) {
40 LOG.debug("Init task {}", PerformanceManagerTask.class.getSimpleName());
41 this.seconds = seconds;
42 this.databaseService = databaseService;
43 this.scheduler = Executors.newSingleThreadScheduledExecutor();
51 LOG.info("PM task created");
52 taskHandle = this.scheduler.scheduleAtFixedRate(this, 0, seconds, TimeUnit.SECONDS);
53 LOG.info("PM task scheduled");
60 LOG.info("Stop {}", PerformanceManagerImpl.class.getSimpleName());
61 if (taskHandle != null) {
62 taskHandle.cancel(true);
64 scheduler.awaitTermination(10, TimeUnit.SECONDS);
65 } catch (InterruptedException e) {
66 LOG.warn(e.toString());
72 * Add NE/Mountpoint to PM Processig
73 * @param mountPointNodeName to be added
74 * @param ne that is connected to the mountpoint
76 public void registration(String mountPointNodeName, ONFCoreNetworkElementRepresentation ne) {
77 queue.put(mountPointNodeName, ne);
81 * Remove mountpoint/NE from PM process
82 * @param mountPointNodeName that has to be removed
84 public void deRegistration(String mountPointNodeName) {
85 LOG.debug("Deregister {}",mountPointNodeName);
86 ONFCoreNetworkElementRepresentation removedNE = queue.remove(mountPointNodeName);
88 if ( removedNE == null) {
89 LOG.warn("Couldn't delete {}",mountPointNodeName);
94 /*--------------------------------------------------------------
95 * Task to read PM data from NE
99 * Task runner to read all performance data from Network Elements.
100 * Catch exceptions to make sure, that the Task is not stopped.
105 LOG.debug("{} start {} Start with mountpoint {}",LOGMARKER, tickCounter, actualNE == null ? "No NE" : actualNE.getMountPointNodeName());
107 //Proceed to next NE/Interface
110 LOG.debug("{} {} Next interface to handle {}", LOGMARKER, tickCounter,
111 actualNE == null ? "No NE/IF" : actualNE.getMountPointNodeName() + " " + actualNE.pmStatusToString());
113 if (actualNE != null) {
115 LOG.debug("{} Start to read PM from NE ({})", LOGMARKER, tickCounter);
116 AllPm allPm = actualNE.getHistoricalPM();
117 LOG.debug("{} {} Got PM list. Start write to DB", LOGMARKER, tickCounter);
118 databaseService.writePM(allPm);
119 LOG.debug("{} {} PM List end.", LOGMARKER, tickCounter);
120 } catch (Exception e) {
121 LOG.warn("{} {} PM read/write failed. Write log entry {}", LOGMARKER, tickCounter, e);
122 String msg = e.getMessage();
123 if (msg == null || msg.isEmpty()) {
124 if (e.getCause() != null) {
125 msg = e.getCause().toString();
127 if (msg == null || msg.isEmpty()){
128 msg = "No message or cause";
131 databaseService.writePMLog(actualNE.getMountPointNodeName(), actualNE.pmStatusToString(), msg);
135 LOG.debug("{} end {}",LOGMARKER, tickCounter);
140 * Reset queue to start from beginning
142 private void resetQueue() {
148 * Get then next interface in the list.
149 * First try to find a next on the actual NE.
150 * If not available search next interface at a NE
151 * Special Situations to handle: Empty queue, NEs, but no interfaces
153 private void getNextInterface() {
154 boolean started = false;
157 LOG.debug("{} {} getNextInterface enter. Queue size {} ", LOGMARKER, tickCounter, queue.size());
159 if ((actualNE != null) && !queue.containsValue(actualNE)) {
160 LOG.debug("{} {} NE Removed duringprocessing A",LOGMARKER, tickCounter);
166 if (loopCounter++ >= 1000) {
167 LOG.error("{} {} Problem in PM iteration. endless condition reached", LOGMARKER, tickCounter);
172 LOG.debug("{} {} Loop ne {}:neiterator {}:Interfaceiterator:{} Loop:{}",
175 actualNE == null? "null" : actualNE.getMountPointNodeName(),
176 neIterator == null ? "null" : neIterator.hasNext(),
177 actualNE == null ? "null" : actualNE.hasNext(),
180 if (actualNE != null && actualNE.hasNext()) {
181 // Yes, there is an interface, deliver back
182 LOG.debug("{} {} getNextInterface yes A",LOGMARKER, tickCounter);
187 // No element in neInterfaceInterator .. get next NE and try
188 if (neIterator != null && neIterator.hasNext()) {
190 LOG.debug("{} {} Next NE A",LOGMARKER, tickCounter);
191 actualNE = neIterator.next();
192 actualNE.resetPMIterator();
195 // Goto start condition 1) first entry 2) end of queue reached
196 LOG.debug("{} {} Reset",LOGMARKER, tickCounter);
199 if (queue.isEmpty()) {
200 LOG.debug("{} {} no nextInterfac. queue empty",LOGMARKER, tickCounter);
202 } else if (!started){
203 LOG.debug("{} {} getNextInterface start condition. Get interator.",LOGMARKER, tickCounter);
204 neIterator = queue.values().iterator();
207 LOG.debug("{} {} no nextInterface",LOGMARKER, tickCounter);
214 if ((actualNE != null) && !queue.containsValue(actualNE)) {
215 LOG.debug("{} {} NE Removed duringprocessing B",LOGMARKER, tickCounter);