1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2022 Huawei Canada Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
21 package org.onap.slice.analysis.ms.service.ccvpn;
23 import com.google.gson.JsonObject;
24 import lombok.NonNull;
25 import org.onap.slice.analysis.ms.aai.AaiService;
27 import org.onap.slice.analysis.ms.models.Configuration;
28 import org.onap.slice.analysis.ms.service.PolicyService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.stereotype.Component;
34 import javax.annotation.PostConstruct;
35 import javax.annotation.PreDestroy;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
40 import java.util.TreeMap;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.LinkedBlockingQueue;
47 import java.util.concurrent.ScheduledExecutorService;
48 import java.util.concurrent.TimeUnit;
51 import static java.util.concurrent.Executors.newSingleThreadExecutor;
54 * This class implements the CCVPN PM Closed-loop logical function.
55 * A simple actor model design is implemented here.
58 public class BandwidthEvaluator {
59 private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
60 private Configuration configuration;
63 AaiService aaiService;
66 CCVPNPmDatastore ccvpnPmDatastore;
69 PolicyService policyService;
71 private Loop evaluationEventLoop;
72 private Loop aaiEventLoop;
74 private static final Event KILL_PILL = new SimpleEvent(null, 0);
75 private static final int DEFAULT_EVAL_INTERVAL = 5;
76 private static final String SERVICE_INSTANCE_LOCATION_ID = "service-instance-location-id";
77 private static final String BANDWIDTH_TOTAL = "bandwidth-total";
80 * Interval of each round of evaluation, defined in config_all.json
82 private static int evaluationInterval;
85 * Percentage threshold of bandwidth adjustment.
87 private static double threshold;
90 * Precision of bandwidth evaluation and adjustment.
92 private static double precision; // in Mbps;
93 private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
96 * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
102 * Evalution main loop
104 evaluationEventLoop = new Loop("EvaluationLoop"){
106 public void process(Event event) {
107 if (event.type() == SimpleEvent.Type.PERIODIC_CHECK && isPeriodicCheckOn()){
108 log.info("Received new periodic check request: {}", event.time());
109 Map<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> usedBwMap = ccvpnPmDatastore.getUsedBwMap();
110 Map<String, Integer> candidate = new TreeMap<>();
111 for(Map.Entry<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> entry: usedBwMap.entrySet()) {
112 String serviceId = entry.getKey().getCllId();
113 Object[] usedBws = entry.getValue().tryReadToArray();
115 if (usedBws == null) {
116 // No enough data for evaluating
119 if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0) {
120 // Max bandwidth not cached yet
121 post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, serviceId));
124 double avg = Arrays.stream(usedBws)
125 .mapToInt(o -> (int) o)
128 if (needAdjust(serviceId, avg, ccvpnPmDatastore.getMaxBwOfSvc(serviceId))) {
129 int newBw = (int) (Math.ceil((avg / threshold) * 1.2 / precision) * precision);
130 candidate.put(serviceId, Math.max(candidate.getOrDefault(serviceId, 0), newBw));
133 for(Map.Entry<String, Integer> entry: candidate.entrySet()) {
134 if (isServiceUnderMaintenance(entry.getKey())) {
135 post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, entry.getKey()));
138 ccvpnPmDatastore.updateSvcState(entry.getKey(), ServiceState.UNDER_MAINTENANCE);
139 sendModifyRequest(entry.getKey(), entry.getValue(), RequestOwner.DCAE);
142 } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK && isOnDemandCheckOn()) {
143 log.info("Received new on-demand check request: {}", event.time());
144 JsonObject payload = (JsonObject) event.subject();
145 String serviceId = payload.get(SERVICE_INSTANCE_LOCATION_ID).getAsString();
146 if (!isServiceUnderMaintenance(serviceId)){
147 int newBandwidth = payload.get(BANDWIDTH_TOTAL).getAsInt();
148 Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
149 int oldBandwidth = maxBandwidthData.get("maxBandwidth");
150 if (newBandwidth != oldBandwidth) {
151 ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.UNDER_MAINTENANCE);
152 sendModifyRequest(serviceId, newBandwidth, RequestOwner.UUI);
158 private void sendModifyRequest(String cllId, Integer newBandwidth, RequestOwner owner) {
159 policyService.sendOnsetMessageToPolicy(
160 policyService.formPolicyOnsetMessageForCCVPN(cllId, newBandwidth, owner)
164 private boolean needAdjust(String serivceId, double currentAverageUsage, int maxBandwidth){
165 return currentAverageUsage > threshold * maxBandwidth;
168 private boolean isServiceUnderMaintenance(String serivceId) {
169 return ccvpnPmDatastore.getStatusOfSvc(serivceId) == ServiceState.UNDER_MAINTENANCE;
174 * AAI data consumer loop
176 aaiEventLoop = new Loop("AAIEventLoop"){
178 public void process(Event event) {
179 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
180 log.info("Received new AAI network policy query at: {}", event.time());
181 String serviceId = (String) event.subject();
182 Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
183 int bwVal = maxBandwidthData.get("maxBandwidth");
184 if (maxBandwidthData != null){
185 if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0){
186 ccvpnPmDatastore.updateMaxBw(serviceId, bwVal, true);
187 } else if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) != bwVal) {
188 ccvpnPmDatastore.updateMaxBw(serviceId, bwVal, true);
189 ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
195 scheduleEvaluation();
199 * Stop the bandwidth evaluator process including two actors and periodic usage check
203 stopScheduleEvaluation();
205 evaluationEventLoop.stop();
209 * Start to schedule periodic usage check at fixed rate
211 private void scheduleEvaluation(){
212 executorPool.scheduleAtFixedRate(new Runnable() {
215 post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
217 }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
221 * Stop periodic bandwidth usage check
223 private void stopScheduleEvaluation(){
224 executorPool.shutdownNow();
228 * Post/broadcast event between Loops
229 * @param event event object
231 public void post(@NonNull Event event){
232 if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
233 aaiEventLoop.add(event);
234 } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
235 evaluationEventLoop.add(event);
236 } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
237 evaluationEventLoop.add(event);
241 private void loadConfig() {
242 configuration = Configuration.getInstance();
243 evaluationInterval = configuration.getCcvpnEvalInterval();
244 threshold = configuration.getCcvpnEvalThreshold();
245 precision = configuration.getCcvpnEvalPrecision(); // in Mbps;
248 private boolean isPeriodicCheckOn() {
249 configuration = Configuration.getInstance();
250 return configuration.isCcvpnEvalPeriodicCheckOn();
253 private boolean isOnDemandCheckOn() {
254 configuration = Configuration.getInstance();
255 return configuration.isCcvpnEvalOnDemandCheckOn();
259 * Inner loop implementation. Each loop acts like an actor.
261 private abstract class Loop implements Runnable {
262 private final String name;
263 private volatile boolean running;
264 private final BlockingQueue<Event> eventsQueue;
265 private final ExecutorService executor;
266 private volatile Future<?> dispatchFuture;
269 * Constructor that accepts a loop name
270 * @param name name of this loop
274 executor = Executors.newSingleThreadExecutor();
275 eventsQueue = new LinkedBlockingQueue<>();
276 dispatchFuture = executor.submit(this);
280 * Add new event to this loop
284 public boolean add(Event evt) {
285 return eventsQueue.add(evt);
289 * Running loop that process event accordingly
294 log.info("BandwidthEvaluator -- {} initiated", this.name);
297 Event event = eventsQueue.take();
298 if (event == KILL_PILL){
302 } catch (InterruptedException e){
303 log.warn("Process loop interrupted");
304 } catch (Exception | Error e){
305 log.warn("Process loop hit an error {}", e.getMessage());
311 * Operation defined by subclass for different event processing
312 * @param event incoming event
314 abstract public void process(Event event);