[SLICEANALYSIS] Bugfix
[dcaegen2/services.git] / components / slice-analysis-ms / src / main / java / org / onap / slice / analysis / ms / service / ccvpn / BandwidthEvaluator.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  slice-analysis-ms
4  *  ================================================================================
5  *   Copyright (C) 2022 Huawei Canada Limited.
6  *  ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *
20  *******************************************************************************/
21 package org.onap.slice.analysis.ms.service.ccvpn;
22
23 import com.google.gson.JsonObject;
24 import lombok.NonNull;
25 import org.onap.slice.analysis.ms.aai.AaiService;
26
27 import org.onap.slice.analysis.ms.models.Configuration;
28 import org.onap.slice.analysis.ms.service.PolicyService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.stereotype.Component;
33
34 import javax.annotation.PostConstruct;
35 import javax.annotation.PreDestroy;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.TreeMap;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.LinkedBlockingQueue;
47 import java.util.concurrent.ScheduledExecutorService;
48 import java.util.concurrent.TimeUnit;
49 import java.util.stream.Collectors;
50
51
52 import static java.util.concurrent.Executors.newSingleThreadExecutor;
53
54 /**
55  * This class implements the CCVPN PM Closed-loop logical function.
56  * A simple actor model design is implemented here.
57  */
58 @Component
59 public class BandwidthEvaluator {
60     private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
61     private Configuration configuration;
62
63     @Autowired
64     AaiService aaiService;
65
66     @Autowired
67     CCVPNPmDatastore ccvpnPmDatastore;
68
69     @Autowired
70     PolicyService policyService;
71
72     private Loop evaluationEventLoop;
73     private Loop aaiEventLoop;
74
75     private static final Event KILL_PILL = new SimpleEvent(null, 0);
76     private static final int DEFAULT_EVAL_INTERVAL = 5;
77     private static final String SERVICE_INSTANCE_LOCATION_ID = "service-instance-location-id";
78     private static final String BANDWIDTH_TOTAL = "bandwidth-total";
79
80     /**
81      * Interval of each round of evaluation, defined in config_all.json
82      */
83     private static int evaluationInterval;
84
85     /**
86      * Percentage threshold of bandwidth adjustment.
87      */
88     private static double threshold;
89
90     /**
91      * Precision of bandwidth evaluation and adjustment.
92      */
93     private static double precision; // in Mbps;
94     private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
95
96     /**
97      * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
98      */
99     @PostConstruct
100     public void init() {
101         loadConfig();
102         /**
103          * Evalution main loop
104          */
105         evaluationEventLoop = new Loop("EvaluationLoop"){
106             @Override
107             public void process(Event event) {
108                 if (event.type() == SimpleEvent.Type.PERIODIC_CHECK && isPeriodicCheckOn()){
109                     log.info("=== Processing new periodic check request: {} ===", event.time());
110                     Map<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> usedBwMap = ccvpnPmDatastore.getUsedBwMap();
111                     Map<String, Integer> candidate = new TreeMap<>();
112                     for(Map.Entry<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> entry: usedBwMap.entrySet()) {
113                         String serviceId = entry.getKey().getCllId();
114                         Object[] usedBws = entry.getValue().tryReadToArray();
115
116                         if (usedBws == null) {
117                             // No enough data for evaluating
118                             log.debug("CCVPN Evaluator Output: service {}, not enough data to evaluate", serviceId);
119                             continue;
120                         }
121                         if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0) {
122                             // Max bandwidth not cached yet
123                             log.debug("CCVPN Evaluator Output: service {}, max bandwidth not cached, wait for next round", serviceId);
124                             post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, serviceId));
125                             continue;
126                         }
127                         double avg = Arrays.stream(usedBws)
128                                 .mapToInt(o -> (int) o)
129                                 .summaryStatistics()
130                                 .getAverage();
131                         if (needAdjust(serviceId, avg, ccvpnPmDatastore.getMaxBwOfSvc(serviceId))) {
132                             log.debug("CCVPN Evaluator Output: service {}, need adjustment, putting into candidate list", serviceId);
133                             int newBw = (int) (Math.ceil((avg / threshold) * 1.2 / precision) * precision);
134                             candidate.put(serviceId, Math.max(candidate.getOrDefault(serviceId, 0), newBw));
135                         }
136                     }
137                     // check svc under maintenance
138                     Map<String , ServiceState> svcUnderMaintenance = getServicesUnderMaintenance();
139                     for (Map.Entry<String, ServiceState> entry: svcUnderMaintenance.entrySet()){
140                         candidate.putIfAbsent(entry.getKey(), 0);
141                     }
142                     // fetch the maxbandwidth info if underMaintenance; otherwise send modification request
143                     for(Map.Entry<String, Integer> entry: candidate.entrySet()) {
144                         if (isServiceUnderMaintenance(entry.getKey())) {
145                             if (entry.getValue() == 0){
146                                 log.debug("CCVPN Evaluator Output: service {}," +
147                                         " are in maintenance state, fetching bandwidth info from AAI", entry.getKey());
148                             } else {
149                                 log.debug("CCVPN Evaluator Output: candidate {}," +
150                                         " need adjustment, but skipped due to maintenance state", entry.getKey());
151                             }
152                             post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, entry.getKey()));
153                             continue;
154                         }
155                         log.debug("CCVPN Evaluator Output: candidate {}," +
156                                 " need adjustment, sending request to policy", entry.getKey());
157                         ccvpnPmDatastore.updateSvcState(entry.getKey(), ServiceState.UNDER_MAINTENANCE);
158                         sendModifyRequest(entry.getKey(), entry.getValue(), RequestOwner.DCAE);
159                     }
160                     log.info("=== Processing periodic check complete ===");
161
162                 } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK && isOnDemandCheckOn()) {
163                     log.info("=== Processing new on-demand check request: {} ===", event.time());
164                     JsonObject payload = (JsonObject) event.subject();
165                     String serviceId = payload.get(SERVICE_INSTANCE_LOCATION_ID).getAsString();
166                     if (!isServiceUnderMaintenance(serviceId)){
167                         int newBandwidth = payload.get(BANDWIDTH_TOTAL).getAsInt();
168                         Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
169                         if (maxBandwidthData.get("maxBandwidth") != null
170                         && maxBandwidthData.get("maxBandwidth") != newBandwidth){
171                             log.debug("CCVPN Evaluator Output: on-demand adjustment request for service: {} processed," +
172                                     " sending request to policy", serviceId);
173                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.UNDER_MAINTENANCE);
174                             sendModifyRequest(serviceId, newBandwidth, RequestOwner.UUI);
175                         }
176                     } else {
177                         log.debug("CCVPN Evaluator Output: service {}," +
178                                 " received on-demand request, but skipped due to maintenance state", serviceId);
179                     }
180                     log.info("=== Processing on-demand check complete ===");
181                 }
182             }
183
184             private void sendModifyRequest(String cllId, Integer newBandwidth, RequestOwner owner) {
185                 log.info("Sending modification request to policy. RequestOwner: {} - Service: {} change to bw: {}",
186                         owner, cllId, newBandwidth);
187                 policyService.sendOnsetMessageToPolicy(
188                         policyService.formPolicyOnsetMessageForCCVPN(cllId, newBandwidth, owner)
189                 );
190             }
191
192             private boolean needAdjust(String serivceId, double currentAverageUsage, int maxBandwidth){
193                 log.debug("CCVPN Service Usage Analysis: usage: {}, threshold: {}, maxbw {}", currentAverageUsage, threshold, maxBandwidth);
194                 return currentAverageUsage > threshold * maxBandwidth;
195             }
196
197             private boolean isServiceUnderMaintenance(String serivceId) {
198                 return ccvpnPmDatastore.getStatusOfSvc(serivceId) == ServiceState.UNDER_MAINTENANCE;
199             }
200
201             private Map<String, ServiceState> getServicesUnderMaintenance(){
202                 return ccvpnPmDatastore.getSvcStatusMap().entrySet()
203                         .stream()
204                         .filter(e -> e.getValue() == ServiceState.UNDER_MAINTENANCE)
205                         .collect(Collectors.toMap(p -> p.getKey(), p -> p.getValue()));
206             }
207         };
208
209         /**
210          * AAI data consumer loop
211          */
212         aaiEventLoop = new Loop("AAIEventLoop"){
213             @Override
214             public void process(Event event) {
215                 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
216                     log.info("=== Processing new AAI network policy query at: {} ===", event.time());
217                     String serviceId = (String) event.subject();
218                     Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
219                     if (maxBandwidthData.get("maxBandwidth") != null){
220                         log.debug("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}",
221                                 serviceId, maxBandwidthData.get("maxBandwidth"));
222                         int bwValue = maxBandwidthData.get("maxBandwidth").intValue();
223                         if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0){
224                             ccvpnPmDatastore.updateMaxBw(serviceId, bwValue, true);
225                         } else if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) != bwValue) {
226                             log.debug("Service modification complete; serviceId: {} with new bandwidth: {}", serviceId, bwValue);
227                             ccvpnPmDatastore.updateMaxBw(serviceId, bwValue, true);
228                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
229                         }
230                     }
231                     log.info("=== Processing AAI network policy query complete ===");
232                 }
233             }
234         };
235         scheduleEvaluation();
236     }
237
238     /**
239      * Stop the bandwidth evaluator process including two actors and periodic usage check
240      */
241     @PreDestroy
242     public void stop(){
243         stopScheduleEvaluation();
244         aaiEventLoop.stop();
245         evaluationEventLoop.stop();
246     }
247
248     /**
249      * Start to schedule periodic usage check at fixed rate
250      */
251     private void scheduleEvaluation(){
252         executorPool.scheduleAtFixedRate(new Runnable() {
253                 @Override
254                 public void run() {
255                     post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
256                 }
257             }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
258     }
259
260     /**
261      * Stop periodic bandwidth usage check
262      */
263     private void stopScheduleEvaluation(){
264         executorPool.shutdownNow();
265     }
266
267     /**
268      * Post/broadcast event between Loops
269      * @param event event object
270      */
271     public void post(@NonNull Event event){
272         log.debug("A new event triggered, type: {}, subject: {}, at time: {}",
273                 event.type(), event.subject(), event.time());
274         if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
275             aaiEventLoop.add(event);
276         } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
277             evaluationEventLoop.add(event);
278         } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
279             evaluationEventLoop.add(event);
280         }
281     }
282
283     private void loadConfig() {
284         configuration = Configuration.getInstance();
285         evaluationInterval = configuration.getCcvpnEvalInterval();
286         threshold = configuration.getCcvpnEvalThreshold();
287         precision = configuration.getCcvpnEvalPrecision(); // in Mbps;
288     }
289
290     private boolean isPeriodicCheckOn() {
291         configuration = Configuration.getInstance();
292         return configuration.isCcvpnEvalPeriodicCheckOn();
293     }
294
295     private boolean isOnDemandCheckOn() {
296         configuration = Configuration.getInstance();
297         return configuration.isCcvpnEvalOnDemandCheckOn();
298     }
299
300     /**
301      * Inner loop implementation. Each loop acts like an actor.
302      */
303     private abstract class Loop implements Runnable {
304         private final String name;
305         private volatile boolean running;
306         private final BlockingQueue<Event> eventsQueue;
307         private final ExecutorService executor;
308         private volatile Future<?> dispatchFuture;
309
310         /**
311          * Constructor that accepts a loop name
312          * @param name name of this loop
313          */
314         Loop(String name){
315             this.name = name;
316             executor = Executors.newSingleThreadExecutor();
317             eventsQueue = new LinkedBlockingQueue<>();
318             dispatchFuture = executor.submit(this);
319         }
320
321         /**
322          * Add new event to this loop
323          * @param evt Event
324          * @return true
325          */
326         public boolean add(Event evt) {
327             return eventsQueue.add(evt);
328         }
329
330         /**
331          * Running loop that process event accordingly
332          */
333         @Override
334         public void run(){
335             running = true;
336             log.info("BandwidthEvaluator -- {} initiated", this.name);
337             while (running){
338                 try{
339                     Event event = eventsQueue.take();
340                     if (event == KILL_PILL){
341                         break;
342                     }
343                     process(event);
344                 } catch (InterruptedException e){
345                     log.warn("Process loop interrupted");
346                 } catch (Exception | Error e){
347                     log.warn("Process loop hit an error {}", e.getMessage());
348                 }
349             }
350         }
351
352         /**
353          * Operation defined by subclass for different event processing
354          * @param event incoming event
355          */
356         abstract public void process(Event event);
357
358         /**
359          * Stop this loop
360          */
361         public void stop(){
362             running = false;
363             add(KILL_PILL);
364         }
365     }
366 }