1 /*******************************************************************************
 
   2  *  ============LICENSE_START=======================================================
 
   4  *  ================================================================================
 
   5  *   Copyright (C) 2022 Huawei Canada Limited.
 
   6  *  ==============================================================================
 
   7  *     Licensed under the Apache License, Version 2.0 (the "License");
 
   8  *     you may not use this file except in compliance with the License.
 
   9  *     You may obtain a copy of the License at
 
  11  *          http://www.apache.org/licenses/LICENSE-2.0
 
  13  *     Unless required by applicable law or agreed to in writing, software
 
  14  *     distributed under the License is distributed on an "AS IS" BASIS,
 
  15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  *     See the License for the specific language governing permissions and
 
  17  *     limitations under the License.
 
  18  *     ============LICENSE_END=========================================================
 
  20  *******************************************************************************/
 
  21 package org.onap.slice.analysis.ms.service.ccvpn;
 
  23 import com.google.gson.JsonObject;
 
  24 import lombok.NonNull;
 
  25 import org.onap.slice.analysis.ms.aai.AaiService;
 
  27 import org.onap.slice.analysis.ms.models.Configuration;
 
  28 import org.onap.slice.analysis.ms.service.PolicyService;
 
  29 import org.slf4j.Logger;
 
  30 import org.slf4j.LoggerFactory;
 
  31 import org.springframework.beans.factory.annotation.Autowired;
 
  32 import org.springframework.stereotype.Component;
 
  34 import javax.annotation.PostConstruct;
 
  35 import javax.annotation.PreDestroy;
 
  36 import java.util.ArrayList;
 
  37 import java.util.Arrays;
 
  38 import java.util.List;
 
  40 import java.util.TreeMap;
 
  41 import java.util.concurrent.BlockingQueue;
 
  42 import java.util.concurrent.ConcurrentMap;
 
  43 import java.util.concurrent.ExecutorService;
 
  44 import java.util.concurrent.Executors;
 
  45 import java.util.concurrent.Future;
 
  46 import java.util.concurrent.LinkedBlockingQueue;
 
  47 import java.util.concurrent.ScheduledExecutorService;
 
  48 import java.util.concurrent.TimeUnit;
 
  49 import java.util.stream.Collectors;
 
  52 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 
  55  * This class implements the CCVPN PM Closed-loop logical function.
 
  56  * A simple actor model design is implemented here.
 
  59 public class BandwidthEvaluator {
 
  60     private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
 
  61     private Configuration configuration;
 
  64     AaiService aaiService;
 
  67     CCVPNPmDatastore ccvpnPmDatastore;
 
  70     PolicyService policyService;
 
  72     private Loop evaluationEventLoop;
 
  73     private Loop aaiEventLoop;
 
  75     private static final Event KILL_PILL = new SimpleEvent(null, 0);
 
  76     private static final int DEFAULT_EVAL_INTERVAL = 5;
 
  77     private static final String SERVICE_INSTANCE_LOCATION_ID = "service-instance-location-id";
 
  78     private static final String BANDWIDTH_TOTAL = "bandwidth-total";
 
  81      * Interval of each round of evaluation, defined in config_all.json
 
  83     private static int evaluationInterval;
 
  86      * Percentage threshold of bandwidth adjustment.
 
  88     private static double threshold;
 
  91      * Precision of bandwidth evaluation and adjustment.
 
  93     private static double precision; // in Mbps;
 
  94     private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
 
  97      * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
 
 103          * Evalution main loop
 
 105         evaluationEventLoop = new Loop("EvaluationLoop"){
 
 107             public void process(Event event) {
 
 108                 if (event.type() == SimpleEvent.Type.PERIODIC_CHECK && isPeriodicCheckOn()){
 
 109                     log.info("=== Processing new periodic check request: {} ===", event.time());
 
 110                     Map<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> usedBwMap = ccvpnPmDatastore.getUsedBwMap();
 
 111                     Map<String, Integer> candidate = new TreeMap<>();
 
 112                     for(Map.Entry<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> entry: usedBwMap.entrySet()) {
 
 113                         String serviceId = entry.getKey().getCllId();
 
 114                         Object[] usedBws = entry.getValue().tryReadToArray();
 
 116                         if (usedBws == null) {
 
 117                             // No enough data for evaluating
 
 118                             log.debug("CCVPN Evaluator Output: service {}, not enough data to evaluate", serviceId);
 
 121                         if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0) {
 
 122                             // Max bandwidth not cached yet
 
 123                             log.debug("CCVPN Evaluator Output: service {}, max bandwidth not cached, wait for next round", serviceId);
 
 124                             post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, serviceId));
 
 127                         double avg = Arrays.stream(usedBws)
 
 128                                 .mapToInt(o -> (int) o)
 
 131                         if (needAdjust(serviceId, avg, ccvpnPmDatastore.getMaxBwOfSvc(serviceId))) {
 
 132                             log.debug("CCVPN Evaluator Output: service {}, need adjustment, putting into candidate list", serviceId);
 
 133                             int newBw = (int) (Math.ceil((avg / threshold) * 1.2 / precision) * precision);
 
 134                             candidate.put(serviceId, Math.max(candidate.getOrDefault(serviceId, 0), newBw));
 
 137                     // check svc under maintenance
 
 138                     Map<String , ServiceState> svcUnderMaintenance = getServicesUnderMaintenance();
 
 139                     for (Map.Entry<String, ServiceState> entry: svcUnderMaintenance.entrySet()){
 
 140                         candidate.putIfAbsent(entry.getKey(), 0);
 
 142                     // fetch the maxbandwidth info if underMaintenance; otherwise send modification request
 
 143                     for(Map.Entry<String, Integer> entry: candidate.entrySet()) {
 
 144                         if (isServiceUnderMaintenance(entry.getKey())) {
 
 145                             if (entry.getValue() == 0){
 
 146                                 log.debug("CCVPN Evaluator Output: service {}," +
 
 147                                         " are in maintenance state, fetching bandwidth info from AAI", entry.getKey());
 
 149                                 log.debug("CCVPN Evaluator Output: candidate {}," +
 
 150                                         " need adjustment, but skipped due to maintenance state", entry.getKey());
 
 152                             post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, entry.getKey()));
 
 155                         log.debug("CCVPN Evaluator Output: candidate {}," +
 
 156                                 " need adjustment, sending request to policy", entry.getKey());
 
 157                         ccvpnPmDatastore.updateSvcState(entry.getKey(), ServiceState.UNDER_MAINTENANCE);
 
 158                         sendModifyRequest(entry.getKey(), entry.getValue(), RequestOwner.DCAE);
 
 160                     log.info("=== Processing periodic check complete ===");
 
 162                 } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK && isOnDemandCheckOn()) {
 
 163                     log.info("=== Processing new on-demand check request: {} ===", event.time());
 
 164                     JsonObject payload = (JsonObject) event.subject();
 
 165                     String serviceId = payload.get(SERVICE_INSTANCE_LOCATION_ID).getAsString();
 
 166                     if (!isServiceUnderMaintenance(serviceId)){
 
 167                         int newBandwidth = payload.get(BANDWIDTH_TOTAL).getAsInt();
 
 168                         Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
 
 169                         if (maxBandwidthData.get("maxBandwidth") != null
 
 170                         && maxBandwidthData.get("maxBandwidth") != newBandwidth){
 
 171                             log.debug("CCVPN Evaluator Output: on-demand adjustment request for service: {} processed," +
 
 172                                     " sending request to policy", serviceId);
 
 173                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.UNDER_MAINTENANCE);
 
 174                             sendModifyRequest(serviceId, newBandwidth, RequestOwner.UUI);
 
 177                         log.debug("CCVPN Evaluator Output: service {}," +
 
 178                                 " received on-demand request, but skipped due to maintenance state", serviceId);
 
 180                     log.info("=== Processing on-demand check complete ===");
 
 184             private void sendModifyRequest(String cllId, Integer newBandwidth, RequestOwner owner) {
 
 185                 log.info("Sending modification request to policy. RequestOwner: {} - Service: {} change to bw: {}",
 
 186                         owner, cllId, newBandwidth);
 
 187                 policyService.sendOnsetMessageToPolicy(
 
 188                         policyService.formPolicyOnsetMessageForCCVPN(cllId, newBandwidth, owner)
 
 192             private boolean needAdjust(String serivceId, double currentAverageUsage, int maxBandwidth){
 
 193                 log.debug("CCVPN Service Usage Analysis: usage: {}, threshold: {}, maxbw {}", currentAverageUsage, threshold, maxBandwidth);
 
 194                 return currentAverageUsage > threshold * maxBandwidth;
 
 197             private boolean isServiceUnderMaintenance(String serivceId) {
 
 198                 return ccvpnPmDatastore.getStatusOfSvc(serivceId) == ServiceState.UNDER_MAINTENANCE;
 
 201             private Map<String, ServiceState> getServicesUnderMaintenance(){
 
 202                 return ccvpnPmDatastore.getSvcStatusMap().entrySet()
 
 204                         .filter(e -> e.getValue() == ServiceState.UNDER_MAINTENANCE)
 
 205                         .collect(Collectors.toMap(p -> p.getKey(), p -> p.getValue()));
 
 210          * AAI data consumer loop
 
 212         aaiEventLoop = new Loop("AAIEventLoop"){
 
 214             public void process(Event event) {
 
 215                 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
 
 216                     log.info("=== Processing new AAI network policy query at: {} ===", event.time());
 
 217                     String serviceId = (String) event.subject();
 
 218                     Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
 
 219                     if (maxBandwidthData.get("maxBandwidth") != null){
 
 220                         log.debug("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}",
 
 221                                 serviceId, maxBandwidthData.get("maxBandwidth"));
 
 222                         int bwValue = maxBandwidthData.get("maxBandwidth").intValue();
 
 223                         if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0){
 
 224                             ccvpnPmDatastore.updateMaxBw(serviceId, bwValue, true);
 
 225                         } else if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) != bwValue) {
 
 226                             log.debug("Service modification complete; serviceId: {} with new bandwidth: {}", serviceId, bwValue);
 
 227                             ccvpnPmDatastore.updateMaxBw(serviceId, bwValue, true);
 
 228                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
 
 231                     log.info("=== Processing AAI network policy query complete ===");
 
 235         scheduleEvaluation();
 
 239      * Stop the bandwidth evaluator process including two actors and periodic usage check
 
 243         stopScheduleEvaluation();
 
 245         evaluationEventLoop.stop();
 
 249      * Start to schedule periodic usage check at fixed rate
 
 251     private void scheduleEvaluation(){
 
 252         executorPool.scheduleAtFixedRate(new Runnable() {
 
 255                     post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
 
 257             }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
 
 261      * Stop periodic bandwidth usage check
 
 263     private void stopScheduleEvaluation(){
 
 264         executorPool.shutdownNow();
 
 268      * Post/broadcast event between Loops
 
 269      * @param event event object
 
 271     public void post(@NonNull Event event){
 
 272         log.debug("A new event triggered, type: {}, subject: {}, at time: {}",
 
 273                 event.type(), event.subject(), event.time());
 
 274         if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
 
 275             aaiEventLoop.add(event);
 
 276         } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
 
 277             evaluationEventLoop.add(event);
 
 278         } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
 
 279             evaluationEventLoop.add(event);
 
 283     private void loadConfig() {
 
 284         configuration = Configuration.getInstance();
 
 285         evaluationInterval = configuration.getCcvpnEvalInterval();
 
 286         threshold = configuration.getCcvpnEvalThreshold();
 
 287         precision = configuration.getCcvpnEvalPrecision(); // in Mbps;
 
 290     private boolean isPeriodicCheckOn() {
 
 291         configuration = Configuration.getInstance();
 
 292         return configuration.isCcvpnEvalPeriodicCheckOn();
 
 295     private boolean isOnDemandCheckOn() {
 
 296         configuration = Configuration.getInstance();
 
 297         return configuration.isCcvpnEvalOnDemandCheckOn();
 
 301      * Inner loop implementation. Each loop acts like an actor.
 
 303     private abstract class Loop implements Runnable {
 
 304         private final String name;
 
 305         private volatile boolean running;
 
 306         private final BlockingQueue<Event> eventsQueue;
 
 307         private final ExecutorService executor;
 
 308         private volatile Future<?> dispatchFuture;
 
 311          * Constructor that accepts a loop name
 
 312          * @param name name of this loop
 
 316             executor = Executors.newSingleThreadExecutor();
 
 317             eventsQueue = new LinkedBlockingQueue<>();
 
 318             dispatchFuture = executor.submit(this);
 
 322          * Add new event to this loop
 
 326         public boolean add(Event evt) {
 
 327             return eventsQueue.add(evt);
 
 331          * Running loop that process event accordingly
 
 336             log.info("BandwidthEvaluator -- {} initiated", this.name);
 
 339                     Event event = eventsQueue.take();
 
 340                     if (event == KILL_PILL){
 
 344                 } catch (InterruptedException e){
 
 345                     log.warn("Process loop interrupted");
 
 346                 } catch (Exception | Error e){
 
 347                     log.warn("Process loop hit an error {}", e.getMessage());
 
 353          * Operation defined by subclass for different event processing
 
 354          * @param event incoming event
 
 356         abstract public void process(Event event);