1 /*******************************************************************************
 
   2  *  ============LICENSE_START=======================================================
 
   4  *  ================================================================================
 
   5  *   Copyright (C) 2022 Huawei Canada Limited.
 
   6  *   Copyright (C) 2022 Huawei Technologies Co., Ltd.
 
   7  *  ==============================================================================
 
   8  *     Licensed under the Apache License, Version 2.0 (the "License");
 
   9  *     you may not use this file except in compliance with the License.
 
  10  *     You may obtain a copy of the License at
 
  12  *          http://www.apache.org/licenses/LICENSE-2.0
 
  14  *     Unless required by applicable law or agreed to in writing, software
 
  15  *     distributed under the License is distributed on an "AS IS" BASIS,
 
  16  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  *     See the License for the specific language governing permissions and
 
  18  *     limitations under the License.
 
  19  *     ============LICENSE_END=========================================================
 
  21  *******************************************************************************/
 
  22 package org.onap.slice.analysis.ms.service.ccvpn;
 
  24 import lombok.NonNull;
 
  25 import org.onap.slice.analysis.ms.aai.AaiService;
 
  27 import org.onap.slice.analysis.ms.models.Configuration;
 
  28 import org.slf4j.Logger;
 
  29 import org.slf4j.LoggerFactory;
 
  30 import org.springframework.beans.factory.annotation.Autowired;
 
  31 import org.springframework.stereotype.Component;
 
  33 import javax.annotation.PostConstruct;
 
  34 import javax.annotation.PreDestroy;
 
  36 import java.util.concurrent.BlockingQueue;
 
  37 import java.util.concurrent.ExecutorService;
 
  38 import java.util.concurrent.Executors;
 
  39 import java.util.concurrent.Future;
 
  40 import java.util.concurrent.LinkedBlockingQueue;
 
  41 import java.util.concurrent.ScheduledExecutorService;
 
  42 import java.util.concurrent.TimeUnit;
 
  45  * This class implements the CCVPN PM Closed-loop logical function.
 
  46  * A simple actor model design is implemented here.
 
  49 public class BandwidthEvaluator {
 
  50     private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
 
  51     private Configuration configuration;
 
  54     AaiService aaiService;
 
  57     CCVPNPmDatastore ccvpnPmDatastore;
 
  60     StrategyFactory strategyFactory;
 
  62     private Loop evaluationEventLoop;
 
  63     private Loop aaiEventLoop;
 
  65     private static final Event KILL_PILL = new SimpleEvent(null, 0);
 
  66     private static final int DEFAULT_EVAL_INTERVAL = 5;
 
  67     private static final String DEFAULT_STRATEGY_NAME = "FixedUpperBoundStrategy";
 
  69      * Interval of each round of evaluation, defined in config_all.json
 
  71     private static int evaluationInterval;
 
  74      * Bandwidth Evaluation and adjustment strategy.
 
  76     private static String strategyName;
 
  78     private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
 
  81      * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
 
  86         strategyName = (strategyName != null)? strategyName : DEFAULT_STRATEGY_NAME;
 
  87         evaluationInterval = (evaluationInterval == 0)? DEFAULT_EVAL_INTERVAL : evaluationInterval;
 
  88         EvaluationStrategy strategy = strategyFactory.getStrategy(strategyName);
 
  89         log.info("{} is utilized as the bandwidth evaluatior strategy", strategyName);
 
  94         evaluationEventLoop = new Loop("EvaluationLoop"){
 
  96             public void process(Event event) {
 
  97                 strategy.execute(event);
 
 102          * AAI data consumer loop
 
 104         aaiEventLoop = new Loop("AAIEventLoop"){
 
 106             public void process(Event event) {
 
 107                 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
 
 108                     log.debug("=== Processing new AAI network policy query at: {} ===", event.time());
 
 109                     String serviceId = (String) event.subject();
 
 110                     Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
 
 111                     if (maxBandwidthData.get("maxBandwidth") != null){
 
 112                         log.debug("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}",
 
 113                                 serviceId, maxBandwidthData.get("maxBandwidth"));
 
 114                         int bwValue = maxBandwidthData.get("maxBandwidth").intValue();
 
 115                         if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) == 0){
 
 116                             ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true);
 
 117                             log.debug("Provision bw of cll {} updated from 0 to {}, max bw is {}", serviceId, ccvpnPmDatastore.getProvBwOfSvc(serviceId), bwValue);
 
 118                         } else if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) != bwValue) {
 
 119                             log.debug("Service modification complete; serviceId: {} update prov bw from {} to {}", serviceId, ccvpnPmDatastore.getProvBwOfSvc(serviceId), bwValue);
 
 120                             ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true);
 
 121                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
 
 122                             log.debug("Service state of {} is changed to running, {}", serviceId, ccvpnPmDatastore.getStatusOfSvc(serviceId));
 
 125                     log.debug("=== Processing AAI network policy query complete ===");
 
 129         scheduleEvaluation();
 
 133      * Stop the bandwidth evaluator process including two actors and periodic usage check
 
 137         stopScheduleEvaluation();
 
 139         evaluationEventLoop.stop();
 
 143      * Start to schedule periodic usage check at fixed rate
 
 145     private void scheduleEvaluation(){
 
 146         executorPool.scheduleAtFixedRate(new Runnable() {
 
 149                     post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
 
 151             }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
 
 155      * Stop periodic bandwidth usage check
 
 157     private void stopScheduleEvaluation(){
 
 158         executorPool.shutdownNow();
 
 162      * Post/broadcast event between Loops
 
 163      * @param event event object
 
 165     public void post(@NonNull Event event){
 
 166         log.info("A new event triggered, type: {}, subject: {}, at time: {}",
 
 167                 event.type(), event.subject(), event.time());
 
 168         if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
 
 169             aaiEventLoop.add(event);
 
 170         } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
 
 171             evaluationEventLoop.add(event);
 
 172         } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
 
 173             evaluationEventLoop.add(event);
 
 177     // update configuration
 
 178     private void loadConfig() {
 
 179         configuration = Configuration.getInstance();
 
 180         evaluationInterval = configuration.getCcvpnEvalInterval();
 
 181         strategyName = configuration.getCcvpnEvalStrategy();
 
 182         log.info("Evaluation loop configs has been loaded. Strategy {}.", strategyName);
 
 186      * Inner loop implementation. Each loop acts like an actor.
 
 188     private abstract class Loop implements Runnable {
 
 189         private final String name;
 
 190         private volatile boolean running;
 
 191         private final BlockingQueue<Event> eventsQueue;
 
 192         private final ExecutorService executor;
 
 193         private volatile Future<?> dispatchFuture;
 
 196          * Constructor that accepts a loop name
 
 197          * @param name name of this loop
 
 201             executor = Executors.newSingleThreadExecutor();
 
 202             eventsQueue = new LinkedBlockingQueue<>();
 
 203             dispatchFuture = executor.submit(this);
 
 207          * Add new event to this loop
 
 211         public boolean add(Event evt) {
 
 212             return eventsQueue.add(evt);
 
 216          * Running loop that process event accordingly
 
 221             log.info("BandwidthEvaluator -- {} initiated", this.name);
 
 224                     Event event = eventsQueue.take();
 
 225                     if (event == KILL_PILL){
 
 229                 } catch (InterruptedException e){
 
 230                     log.warn("Process loop interrupted");
 
 231                 } catch (Exception | Error e){
 
 232                     log.warn("Process loop hit an error {}", e.getMessage());
 
 238          * Operation defined by subclass for different event processing
 
 239          * @param event incoming event
 
 241         abstract public void process(Event event);