1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2022 Huawei Canada Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
21 package org.onap.slice.analysis.ms.service.ccvpn;
23 import com.google.gson.JsonObject;
24 import lombok.NonNull;
25 import org.onap.slice.analysis.ms.aai.AaiService;
27 import org.onap.slice.analysis.ms.models.Configuration;
28 import org.onap.slice.analysis.ms.service.PolicyService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.stereotype.Component;
34 import javax.annotation.PostConstruct;
35 import javax.annotation.PreDestroy;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
40 import java.util.TreeMap;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.LinkedBlockingQueue;
47 import java.util.concurrent.ScheduledExecutorService;
48 import java.util.concurrent.TimeUnit;
49 import java.util.stream.Collectors;
52 import static java.util.concurrent.Executors.newSingleThreadExecutor;
55 * This class implements the CCVPN PM Closed-loop logical function.
56 * A simple actor model design is implemented here.
59 public class BandwidthEvaluator {
60 private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
61 private Configuration configuration;
64 AaiService aaiService;
67 CCVPNPmDatastore ccvpnPmDatastore;
70 PolicyService policyService;
72 private Loop evaluationEventLoop;
73 private Loop aaiEventLoop;
75 private static final Event KILL_PILL = new SimpleEvent(null, 0);
76 private static final int DEFAULT_EVAL_INTERVAL = 5;
77 private static final String SERVICE_INSTANCE_LOCATION_ID = "service-instance-location-id";
78 private static final String BANDWIDTH_TOTAL = "bandwidth-total";
81 * Interval of each round of evaluation, defined in config_all.json
83 private static int evaluationInterval;
86 * Percentage threshold of bandwidth adjustment.
88 private static double threshold;
91 * Precision of bandwidth evaluation and adjustment.
93 private static double precision; // in Mbps;
94 private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
97 * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
103 * Evalution main loop
105 evaluationEventLoop = new Loop("EvaluationLoop"){
107 public void process(Event event) {
108 if (event.type() == SimpleEvent.Type.PERIODIC_CHECK && isPeriodicCheckOn()){
109 log.info("=== Processing new periodic check request: {} ===", event.time());
110 Map<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> usedBwMap = ccvpnPmDatastore.getUsedBwMap();
111 Map<String, Integer> candidate = new TreeMap<>();
112 for(Map.Entry<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> entry: usedBwMap.entrySet()) {
113 String serviceId = entry.getKey().getCllId();
114 Object[] usedBws = entry.getValue().tryReadToArray();
116 if (usedBws == null) {
117 // No enough data for evaluating
118 log.debug("CCVPN Evaluator Output: service {}, not enough data to evaluate", serviceId);
121 if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0) {
122 // Max bandwidth not cached yet
123 log.debug("CCVPN Evaluator Output: service {}, max bandwidth not cached, wait for next round", serviceId);
124 post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, serviceId));
127 double avg = Arrays.stream(usedBws)
128 .mapToInt(o -> (int) o)
131 if (needAdjust(serviceId, avg, ccvpnPmDatastore.getMaxBwOfSvc(serviceId))) {
132 log.debug("CCVPN Evaluator Output: service {}, need adjustment, putting into candidate list", serviceId);
133 int newBw = (int) (Math.ceil((avg / threshold) * 1.2 / precision) * precision);
134 candidate.put(serviceId, Math.max(candidate.getOrDefault(serviceId, 0), newBw));
137 // check svc under maintenance
138 Map<String , ServiceState> svcUnderMaintenance = getServicesUnderMaintenance();
139 for (Map.Entry<String, ServiceState> entry: svcUnderMaintenance.entrySet()){
140 candidate.putIfAbsent(entry.getKey(), 0);
142 // fetch the maxbandwidth info if underMaintenance; otherwise send modification request
143 for(Map.Entry<String, Integer> entry: candidate.entrySet()) {
144 if (isServiceUnderMaintenance(entry.getKey())) {
145 if (entry.getValue() == 0){
146 log.debug("CCVPN Evaluator Output: service {}," +
147 " are in maintenance state, fetching bandwidth info from AAI", entry.getKey());
149 log.debug("CCVPN Evaluator Output: candidate {}," +
150 " need adjustment, but skipped due to maintenance state", entry.getKey());
152 post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, entry.getKey()));
155 log.debug("CCVPN Evaluator Output: candidate {}," +
156 " need adjustment, sending request to policy", entry.getKey());
157 ccvpnPmDatastore.updateSvcState(entry.getKey(), ServiceState.UNDER_MAINTENANCE);
158 sendModifyRequest(entry.getKey(), entry.getValue(), RequestOwner.DCAE);
160 log.info("=== Processing periodic check complete ===");
162 } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK && isOnDemandCheckOn()) {
163 log.info("=== Processing new on-demand check request: {} ===", event.time());
164 JsonObject payload = (JsonObject) event.subject();
165 String serviceId = payload.get(SERVICE_INSTANCE_LOCATION_ID).getAsString();
166 if (!isServiceUnderMaintenance(serviceId)){
167 int newBandwidth = payload.get(BANDWIDTH_TOTAL).getAsInt();
168 Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
169 if (maxBandwidthData.get("maxBandwidth") != null
170 && maxBandwidthData.get("maxBandwidth") != newBandwidth){
171 log.debug("CCVPN Evaluator Output: on-demand adjustment request for service: {} processed," +
172 " sending request to policy", serviceId);
173 ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.UNDER_MAINTENANCE);
174 sendModifyRequest(serviceId, newBandwidth, RequestOwner.UUI);
177 log.debug("CCVPN Evaluator Output: service {}," +
178 " received on-demand request, but skipped due to maintenance state", serviceId);
180 log.info("=== Processing on-demand check complete ===");
184 private void sendModifyRequest(String cllId, Integer newBandwidth, RequestOwner owner) {
185 log.info("Sending modification request to policy. RequestOwner: {} - Service: {} change to bw: {}",
186 owner, cllId, newBandwidth);
187 policyService.sendOnsetMessageToPolicy(
188 policyService.formPolicyOnsetMessageForCCVPN(cllId, newBandwidth, owner)
192 private boolean needAdjust(String serivceId, double currentAverageUsage, int maxBandwidth){
193 log.debug("CCVPN Service Usage Analysis: usage: {}, threshold: {}, maxbw {}", currentAverageUsage, threshold, maxBandwidth);
194 return currentAverageUsage > threshold * maxBandwidth;
197 private boolean isServiceUnderMaintenance(String serivceId) {
198 return ccvpnPmDatastore.getStatusOfSvc(serivceId) == ServiceState.UNDER_MAINTENANCE;
201 private Map<String, ServiceState> getServicesUnderMaintenance(){
202 return ccvpnPmDatastore.getSvcStatusMap().entrySet()
204 .filter(e -> e.getValue() == ServiceState.UNDER_MAINTENANCE)
205 .collect(Collectors.toMap(p -> p.getKey(), p -> p.getValue()));
210 * AAI data consumer loop
212 aaiEventLoop = new Loop("AAIEventLoop"){
214 public void process(Event event) {
215 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
216 log.info("=== Processing new AAI network policy query at: {} ===", event.time());
217 String serviceId = (String) event.subject();
218 Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
219 if (maxBandwidthData.get("maxBandwidth") != null){
220 log.debug("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}",
221 serviceId, maxBandwidthData.get("maxBandwidth"));
222 int bwValue = maxBandwidthData.get("maxBandwidth").intValue();
223 if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0){
224 ccvpnPmDatastore.updateMaxBw(serviceId, bwValue, true);
225 } else if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) != bwValue) {
226 log.debug("Service modification complete; serviceId: {} with new bandwidth: {}", serviceId, bwValue);
227 ccvpnPmDatastore.updateMaxBw(serviceId, bwValue, true);
228 ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
231 log.info("=== Processing AAI network policy query complete ===");
235 scheduleEvaluation();
239 * Stop the bandwidth evaluator process including two actors and periodic usage check
243 stopScheduleEvaluation();
245 evaluationEventLoop.stop();
249 * Start to schedule periodic usage check at fixed rate
251 private void scheduleEvaluation(){
252 executorPool.scheduleAtFixedRate(new Runnable() {
255 post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
257 }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
261 * Stop periodic bandwidth usage check
263 private void stopScheduleEvaluation(){
264 executorPool.shutdownNow();
268 * Post/broadcast event between Loops
269 * @param event event object
271 public void post(@NonNull Event event){
272 log.debug("A new event triggered, type: {}, subject: {}, at time: {}",
273 event.type(), event.subject(), event.time());
274 if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
275 aaiEventLoop.add(event);
276 } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
277 evaluationEventLoop.add(event);
278 } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
279 evaluationEventLoop.add(event);
283 private void loadConfig() {
284 configuration = Configuration.getInstance();
285 evaluationInterval = configuration.getCcvpnEvalInterval();
286 threshold = configuration.getCcvpnEvalThreshold();
287 precision = configuration.getCcvpnEvalPrecision(); // in Mbps;
290 private boolean isPeriodicCheckOn() {
291 configuration = Configuration.getInstance();
292 return configuration.isCcvpnEvalPeriodicCheckOn();
295 private boolean isOnDemandCheckOn() {
296 configuration = Configuration.getInstance();
297 return configuration.isCcvpnEvalOnDemandCheckOn();
301 * Inner loop implementation. Each loop acts like an actor.
303 private abstract class Loop implements Runnable {
304 private final String name;
305 private volatile boolean running;
306 private final BlockingQueue<Event> eventsQueue;
307 private final ExecutorService executor;
308 private volatile Future<?> dispatchFuture;
311 * Constructor that accepts a loop name
312 * @param name name of this loop
316 executor = Executors.newSingleThreadExecutor();
317 eventsQueue = new LinkedBlockingQueue<>();
318 dispatchFuture = executor.submit(this);
322 * Add new event to this loop
326 public boolean add(Event evt) {
327 return eventsQueue.add(evt);
331 * Running loop that process event accordingly
336 log.info("BandwidthEvaluator -- {} initiated", this.name);
339 Event event = eventsQueue.take();
340 if (event == KILL_PILL){
344 } catch (InterruptedException e){
345 log.warn("Process loop interrupted");
346 } catch (Exception | Error e){
347 log.warn("Process loop hit an error {}", e.getMessage());
353 * Operation defined by subclass for different event processing
354 * @param event incoming event
356 abstract public void process(Event event);