1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2022 Huawei Canada Limited.
6 * Copyright (C) 2022 Huawei Technologies Co., Ltd.
7 * ==============================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 *******************************************************************************/
22 package org.onap.slice.analysis.ms.service.ccvpn;
24 import lombok.NonNull;
25 import org.onap.slice.analysis.ms.aai.AaiService;
27 import org.onap.slice.analysis.ms.models.Configuration;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.springframework.beans.factory.annotation.Autowired;
31 import org.springframework.stereotype.Component;
33 import javax.annotation.PostConstruct;
34 import javax.annotation.PreDestroy;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.LinkedBlockingQueue;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.TimeUnit;
45 * This class implements the CCVPN PM Closed-loop logical function.
46 * A simple actor model design is implemented here.
49 public class BandwidthEvaluator {
50 private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
51 private Configuration configuration;
54 AaiService aaiService;
57 CCVPNPmDatastore ccvpnPmDatastore;
60 StrategyFactory strategyFactory;
62 private Loop evaluationEventLoop;
63 private Loop aaiEventLoop;
65 private static final Event KILL_PILL = new SimpleEvent(null, 0);
66 private static final int DEFAULT_EVAL_INTERVAL = 5;
67 private static final String DEFAULT_STRATEGY_NAME = "FixedUpperBoundStrategy";
69 * Interval of each round of evaluation, defined in config_all.json
71 private static int evaluationInterval;
74 * Bandwidth Evaluation and adjustment strategy.
76 private static String strategyName;
78 private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
81 * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
86 strategyName = (strategyName != null)? strategyName : DEFAULT_STRATEGY_NAME;
87 evaluationInterval = (evaluationInterval == 0)? DEFAULT_EVAL_INTERVAL : evaluationInterval;
88 EvaluationStrategy strategy = strategyFactory.getStrategy(strategyName);
89 log.info("{} is utilized as the bandwidth evaluatior strategy", strategyName);
94 evaluationEventLoop = new Loop("EvaluationLoop"){
96 public void process(Event event) {
97 strategy.execute(event);
102 * AAI data consumer loop
104 aaiEventLoop = new Loop("AAIEventLoop"){
106 public void process(Event event) {
107 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
108 log.debug("=== Processing new AAI network policy query at: {} ===", event.time());
109 String serviceId = (String) event.subject();
110 Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
111 if (maxBandwidthData.get("maxBandwidth") != null){
112 log.debug("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}",
113 serviceId, maxBandwidthData.get("maxBandwidth"));
114 int bwValue = maxBandwidthData.get("maxBandwidth").intValue();
115 if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) == 0){
116 ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true);
117 log.debug("Provision bw of cll {} updated from 0 to {}, max bw is {}", serviceId, ccvpnPmDatastore.getProvBwOfSvc(serviceId), bwValue);
118 } else if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) != bwValue) {
119 log.debug("Service modification complete; serviceId: {} update prov bw from {} to {}", serviceId, ccvpnPmDatastore.getProvBwOfSvc(serviceId), bwValue);
120 ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true);
121 ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
122 log.debug("Service state of {} is changed to running, {}", serviceId, ccvpnPmDatastore.getStatusOfSvc(serviceId));
125 log.debug("=== Processing AAI network policy query complete ===");
129 scheduleEvaluation();
133 * Stop the bandwidth evaluator process including two actors and periodic usage check
137 stopScheduleEvaluation();
139 evaluationEventLoop.stop();
143 * Start to schedule periodic usage check at fixed rate
145 private void scheduleEvaluation(){
146 executorPool.scheduleAtFixedRate(new Runnable() {
149 post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
151 }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
155 * Stop periodic bandwidth usage check
157 private void stopScheduleEvaluation(){
158 executorPool.shutdownNow();
162 * Post/broadcast event between Loops
163 * @param event event object
165 public void post(@NonNull Event event){
166 log.info("A new event triggered, type: {}, subject: {}, at time: {}",
167 event.type(), event.subject(), event.time());
168 if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
169 aaiEventLoop.add(event);
170 } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
171 evaluationEventLoop.add(event);
172 } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
173 evaluationEventLoop.add(event);
177 // update configuration
178 private void loadConfig() {
179 configuration = Configuration.getInstance();
180 evaluationInterval = configuration.getCcvpnEvalInterval();
181 strategyName = configuration.getCcvpnEvalStrategy();
182 log.info("Evaluation loop configs has been loaded. Strategy {}.", strategyName);
186 * Inner loop implementation. Each loop acts like an actor.
188 private abstract class Loop implements Runnable {
189 private final String name;
190 private volatile boolean running;
191 private final BlockingQueue<Event> eventsQueue;
192 private final ExecutorService executor;
193 private volatile Future<?> dispatchFuture;
196 * Constructor that accepts a loop name
197 * @param name name of this loop
201 executor = Executors.newSingleThreadExecutor();
202 eventsQueue = new LinkedBlockingQueue<>();
203 dispatchFuture = executor.submit(this);
207 * Add new event to this loop
211 public boolean add(Event evt) {
212 return eventsQueue.add(evt);
216 * Running loop that process event accordingly
221 log.info("BandwidthEvaluator -- {} initiated", this.name);
224 Event event = eventsQueue.take();
225 if (event == KILL_PILL){
229 } catch (InterruptedException e){
230 log.warn("Process loop interrupted");
231 } catch (Exception | Error e){
232 log.warn("Process loop hit an error {}", e.getMessage());
238 * Operation defined by subclass for different event processing
239 * @param event incoming event
241 abstract public void process(Event event);