1 /*******************************************************************************
 
   2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
 
   6  * =================================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   8  * in compliance with the License. You may obtain a copy of the License at
 
  10  * http://www.apache.org/licenses/LICENSE-2.0
 
  12  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  14  * or implied. See the License for the specific language governing permissions and limitations under
 
  16  * ============LICENSE_END==========================================================================
 
  17  ******************************************************************************/
 
  18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl;
 
  20 import java.util.Iterator;
 
  21 import java.util.concurrent.ConcurrentHashMap;
 
  22 import java.util.concurrent.Executors;
 
  23 import java.util.concurrent.ScheduledExecutorService;
 
  24 import java.util.concurrent.ScheduledFuture;
 
  25 import java.util.concurrent.TimeUnit;
 
  26 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.ONFCoreNetworkElementRepresentation;
 
  27 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.netconf.container.AllPm;
 
  28 import org.onap.ccsdk.features.sdnr.wt.devicemanager.performancemanager.impl.database.service.MicrowaveHistoricalPerformanceWriterService;
 
  29 import org.slf4j.Logger;
 
  30 import org.slf4j.LoggerFactory;
 
  32 public class PerformanceManagerTask implements Runnable {
 
  34     private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagerTask.class);
 
  35     private static final String LOGMARKER = "PMTick";
 
  37     private int tickCounter = 0;
 
  39     private final ConcurrentHashMap<String, ONFCoreNetworkElementRepresentation> queue = new ConcurrentHashMap<>();
 
  40     private final MicrowaveHistoricalPerformanceWriterService databaseService;
 
  41     private final ScheduledExecutorService scheduler;
 
  42     private final long seconds;
 
  44     private ScheduledFuture<?> taskHandle = null;
 
  45     private Iterator<ONFCoreNetworkElementRepresentation> neIterator = null;
 
  46     private ONFCoreNetworkElementRepresentation actualNE = null;
 
  49      * Constructor of PM Task
 
  50      * @param seconds seconds to call PM Task
 
  51      * @param databaseService DB Service to load PM data to
 
  54     public PerformanceManagerTask(long seconds, MicrowaveHistoricalPerformanceWriterService databaseService) {
 
  56         LOG.debug("Init task {}", PerformanceManagerTask.class.getSimpleName());
 
  57         this.seconds = seconds;
 
  58         this.databaseService = databaseService;
 
  59         this.scheduler = Executors.newSingleThreadScheduledExecutor();
 
  67         LOG.info("PM task created");
 
  68         taskHandle = this.scheduler.scheduleAtFixedRate(this, 0, seconds, TimeUnit.SECONDS);
 
  69         LOG.info("PM task scheduled");
 
  76         LOG.info("Stop {}", PerformanceManagerImpl.class.getSimpleName());
 
  77         if (taskHandle != null) {
 
  78             taskHandle.cancel(true);
 
  80                 scheduler.awaitTermination(10, TimeUnit.SECONDS);
 
  81              } catch (InterruptedException e) {
 
  82                  LOG.debug("Schdule stopped.",e);
 
  83                  // Restore interrupted state...
 
  84                  Thread.currentThread().interrupt();
 
  90      * Add NE/Mountpoint to PM Processig
 
  91      * @param mountPointNodeName to be added
 
  92      * @param ne that is connected to the mountpoint
 
  94     public void registration(String mountPointNodeName, ONFCoreNetworkElementRepresentation ne) {
 
  95         queue.put(mountPointNodeName, ne);
 
  99      * Remove mountpoint/NE from PM process
 
 100      * @param mountPointNodeName that has to be removed
 
 102     public void deRegistration(String mountPointNodeName) {
 
 103         LOG.debug("Deregister {}",mountPointNodeName);
 
 104         ONFCoreNetworkElementRepresentation removedNE = queue.remove(mountPointNodeName);
 
 106         if ( removedNE == null) {
 
 107             LOG.warn("Couldn't delete {}",mountPointNodeName);
 
 112     /*--------------------------------------------------------------
 
 113      * Task to read PM data from NE
 
 117      * Task runner to read all performance data from Network Elements.
 
 118      * Catch exceptions to make sure, that the Task is not stopped.
 
 123         LOG.debug("{} start {} Start with mountpoint {}",LOGMARKER, tickCounter, actualNE == null ? "No NE" : actualNE.getMountPointNodeName());
 
 125         //Proceed to next NE/Interface
 
 128         LOG.debug("{} {} Next interface to handle {}", LOGMARKER, tickCounter,
 
 129                 actualNE == null ? "No NE/IF" : actualNE.getMountPointNodeName() + " " + actualNE.pmStatusToString());
 
 131         if (actualNE != null) {
 
 133                 LOG.debug("{} Start to read PM from NE ({})", LOGMARKER, tickCounter);
 
 134                 AllPm allPm = actualNE.getHistoricalPM();
 
 135                 LOG.debug("{} {} Got PM list. Start write to DB", LOGMARKER, tickCounter);
 
 136                 databaseService.writePM(allPm);
 
 137                 LOG.debug("{} {} PM List end.", LOGMARKER, tickCounter);
 
 138             } catch (Exception e) {
 
 139                 LOG.warn("{} {} PM read/write failed. Write log entry {}", LOGMARKER, tickCounter, e);
 
 140                 String msg = e.getMessage();
 
 141                 if (msg == null || msg.isEmpty()) {
 
 142                     if (e.getCause() != null) {
 
 143                         msg = e.getCause().toString();
 
 145                     if (msg == null || msg.isEmpty()){
 
 146                         msg = "No message or cause";
 
 149                 databaseService.writePMLog(actualNE.getMountPointNodeName(), actualNE.pmStatusToString(), msg);
 
 153         LOG.debug("{} end {}",LOGMARKER, tickCounter);
 
 158      * Reset queue to start from beginning
 
 160     private void resetQueue() {
 
 166      * Get then next interface in the list.
 
 167      * First try to find a next on the actual NE.
 
 168      * If not available search next interface at a NE
 
 169      * Special Situations to handle: Empty queue, NEs, but no interfaces
 
 171     private void getNextInterface() {
 
 172         boolean started = false;
 
 175         LOG.debug("{} {} getNextInterface enter. Queue size {} ", LOGMARKER, tickCounter, queue.size());
 
 177         if (actualNE != null && !queue.containsValue(actualNE)) {
 
 178             LOG.debug("{} {} NE Removed duringprocessing A",LOGMARKER, tickCounter);
 
 184             if (loopCounter++ >= 1000) {
 
 185                 LOG.error("{} {} Problem in PM iteration. endless condition reached", LOGMARKER, tickCounter);
 
 190             LOG.debug("{} {} Loop ne {}:neiterator {}:Interfaceiterator:{} Loop:{}",
 
 193                     actualNE == null? "null" : actualNE.getMountPointNodeName(),
 
 194                     neIterator == null ? "null" : neIterator.hasNext(),
 
 195                     actualNE == null ? "null" : actualNE.hasNext(),
 
 198             if (actualNE != null && actualNE.hasNext()) {
 
 199                 // Yes, there is an interface, deliver back
 
 200                 LOG.debug("{} {} getNextInterface yes A",LOGMARKER, tickCounter);
 
 205                 // No element in neInterfaceInterator .. get next NE and try
 
 206                 if (neIterator != null && neIterator.hasNext()) {
 
 208                     LOG.debug("{} {} Next NE A",LOGMARKER, tickCounter);
 
 209                     actualNE = neIterator.next();
 
 210                     actualNE.resetPMIterator();
 
 213                     // Goto start condition 1) first entry 2) end of queue reached
 
 214                     LOG.debug("{} {} Reset",LOGMARKER, tickCounter);
 
 217                     if (queue.isEmpty()) {
 
 218                         LOG.debug("{} {} no nextInterfac. queue empty",LOGMARKER, tickCounter);
 
 220                     } else if (!started){
 
 221                         LOG.debug("{} {} getNextInterface start condition. Get interator.",LOGMARKER, tickCounter);
 
 222                         neIterator = queue.values().iterator();
 
 225                         LOG.debug("{} {} no nextInterface",LOGMARKER, tickCounter);
 
 232         if (actualNE != null && !queue.containsValue(actualNE)) {
 
 233             LOG.debug("{} {} NE Removed duringprocessing B",LOGMARKER, tickCounter);