a7847b0eb10a90a0e724ca9d6e8bcf23ccb9ea4c
[dcaegen2/services.git] /
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  slice-analysis-ms
4  *  ================================================================================
5  *   Copyright (C) 2022 Huawei Canada Limited.
6  *  ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *
20  *******************************************************************************/
21 package org.onap.slice.analysis.ms.service.ccvpn;
22
23 import com.google.gson.JsonObject;
24 import lombok.NonNull;
25 import org.onap.slice.analysis.ms.aai.AaiService;
26
27 import org.onap.slice.analysis.ms.models.Configuration;
28 import org.onap.slice.analysis.ms.service.PolicyService;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.stereotype.Component;
33
34 import javax.annotation.PostConstruct;
35 import javax.annotation.PreDestroy;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.TreeMap;
41 import java.util.concurrent.BlockingQueue;
42 import java.util.concurrent.ConcurrentMap;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.LinkedBlockingQueue;
47 import java.util.concurrent.ScheduledExecutorService;
48 import java.util.concurrent.TimeUnit;
49
50
51 import static java.util.concurrent.Executors.newSingleThreadExecutor;
52
53 /**
54  * This class implements the CCVPN PM Closed-loop logical function.
55  * A simple actor model design is implemented here.
56  */
57 @Component
58 public class BandwidthEvaluator {
59     private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
60     private Configuration configuration;
61
62     @Autowired
63     AaiService aaiService;
64
65     @Autowired
66     CCVPNPmDatastore ccvpnPmDatastore;
67
68     @Autowired
69     PolicyService policyService;
70
71     private Loop evaluationEventLoop;
72     private Loop aaiEventLoop;
73
74     private static final Event KILL_PILL = new SimpleEvent(null, 0);
75     private static final int DEFAULT_EVAL_INTERVAL = 5;
76     private static final String SERVICE_INSTANCE_LOCATION_ID = "service-instance-location-id";
77     private static final String BANDWIDTH_TOTAL = "bandwidth-total";
78
79     /**
80      * Interval of each round of evaluation, defined in config_all.json
81      */
82     private static int evaluationInterval;
83
84     /**
85      * Percentage threshold of bandwidth adjustment.
86      */
87     private static double threshold;
88
89     /**
90      * Precision of bandwidth evaluation and adjustment.
91      */
92     private static double precision; // in Mbps;
93     private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
94
95     /**
96      * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
97      */
98     @PostConstruct
99     public void init() {
100         loadConfig();
101         /**
102          * Evalution main loop
103          */
104         evaluationEventLoop = new Loop("EvaluationLoop"){
105             @Override
106             public void process(Event event) {
107                 if (event.type() == SimpleEvent.Type.PERIODIC_CHECK && isPeriodicCheckOn()){
108                     log.info("Received new periodic check request: {}", event.time());
109                     Map<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> usedBwMap = ccvpnPmDatastore.getUsedBwMap();
110                     Map<String, Integer> candidate = new TreeMap<>();
111                     for(Map.Entry<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> entry: usedBwMap.entrySet()) {
112                         String serviceId = entry.getKey().getCllId();
113                         Object[] usedBws = entry.getValue().tryReadToArray();
114
115                         if (usedBws == null) {
116                             // No enough data for evaluating
117                             continue;
118                         }
119                         if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0) {
120                             // Max bandwidth not cached yet
121                             post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, serviceId));
122                             continue;
123                         }
124                         double avg = Arrays.stream(usedBws)
125                                 .mapToInt(o -> (int) o)
126                                 .summaryStatistics()
127                                 .getAverage();
128                         if (needAdjust(serviceId, avg, ccvpnPmDatastore.getMaxBwOfSvc(serviceId))) {
129                             int newBw = (int) (Math.ceil((avg / threshold) * 1.2 / precision) * precision);
130                             candidate.put(serviceId, Math.max(candidate.getOrDefault(serviceId, 0), newBw));
131                         }
132                     }
133                     for(Map.Entry<String, Integer> entry: candidate.entrySet()) {
134                         if (isServiceUnderMaintenance(entry.getKey())) {
135                             post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, entry.getKey()));
136                             continue;
137                         }
138                         ccvpnPmDatastore.updateSvcState(entry.getKey(), ServiceState.UNDER_MAINTENANCE);
139                         sendModifyRequest(entry.getKey(), entry.getValue(), RequestOwner.DCAE);
140                     }
141
142                 } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK && isOnDemandCheckOn()) {
143                     log.info("Received new on-demand check request: {}", event.time());
144                     JsonObject payload = (JsonObject) event.subject();
145                     String serviceId = payload.get(SERVICE_INSTANCE_LOCATION_ID).getAsString();
146                     if (!isServiceUnderMaintenance(serviceId)){
147                         int newBandwidth = payload.get(BANDWIDTH_TOTAL).getAsInt();
148                         Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
149                         int oldBandwidth = maxBandwidthData.get("maxBandwidth");
150                         if (newBandwidth != oldBandwidth) {
151                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.UNDER_MAINTENANCE);
152                             sendModifyRequest(serviceId, newBandwidth, RequestOwner.UUI);
153                         }
154                     }
155                 }
156             }
157
158             private void sendModifyRequest(String cllId, Integer newBandwidth, RequestOwner owner) {
159                 policyService.sendOnsetMessageToPolicy(
160                         policyService.formPolicyOnsetMessageForCCVPN(cllId, newBandwidth, owner)
161                 );
162             }
163
164             private boolean needAdjust(String serivceId, double currentAverageUsage, int maxBandwidth){
165                 return currentAverageUsage > threshold * maxBandwidth;
166             }
167
168             private boolean isServiceUnderMaintenance(String serivceId) {
169                 return ccvpnPmDatastore.getStatusOfSvc(serivceId) == ServiceState.UNDER_MAINTENANCE;
170             }
171         };
172
173         /**
174          * AAI data consumer loop
175          */
176         aaiEventLoop = new Loop("AAIEventLoop"){
177             @Override
178             public void process(Event event) {
179                 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
180                     log.info("Received new AAI network policy query at: {}", event.time());
181                     String serviceId = (String) event.subject();
182                     Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
183                     int bwVal = maxBandwidthData.get("maxBandwidth");
184                     if (maxBandwidthData != null){
185                         if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0){
186                             ccvpnPmDatastore.updateMaxBw(serviceId, bwVal, true);
187                         } else if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) != bwVal) {
188                             ccvpnPmDatastore.updateMaxBw(serviceId, bwVal, true);
189                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
190                         }
191                     }
192                 }
193             }
194         };
195         scheduleEvaluation();
196     }
197
198     /**
199      * Stop the bandwidth evaluator process including two actors and periodic usage check
200      */
201     @PreDestroy
202     public void stop(){
203         stopScheduleEvaluation();
204         aaiEventLoop.stop();
205         evaluationEventLoop.stop();
206     }
207
208     /**
209      * Start to schedule periodic usage check at fixed rate
210      */
211     private void scheduleEvaluation(){
212         executorPool.scheduleAtFixedRate(new Runnable() {
213                 @Override
214                 public void run() {
215                     post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
216                 }
217             }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
218     }
219
220     /**
221      * Stop periodic bandwidth usage check
222      */
223     private void stopScheduleEvaluation(){
224         executorPool.shutdownNow();
225     }
226
227     /**
228      * Post/broadcast event between Loops
229      * @param event event object
230      */
231     public void post(@NonNull Event event){
232         if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
233             aaiEventLoop.add(event);
234         } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
235             evaluationEventLoop.add(event);
236         } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
237             evaluationEventLoop.add(event);
238         }
239     }
240
241     private void loadConfig() {
242         configuration = Configuration.getInstance();
243         evaluationInterval = configuration.getCcvpnEvalInterval();
244         threshold = configuration.getCcvpnEvalThreshold();
245         precision = configuration.getCcvpnEvalPrecision(); // in Mbps;
246     }
247
248     private boolean isPeriodicCheckOn() {
249         configuration = Configuration.getInstance();
250         return configuration.isCcvpnEvalPeriodicCheckOn();
251     }
252
253     private boolean isOnDemandCheckOn() {
254         configuration = Configuration.getInstance();
255         return configuration.isCcvpnEvalOnDemandCheckOn();
256     }
257
258     /**
259      * Inner loop implementation. Each loop acts like an actor.
260      */
261     private abstract class Loop implements Runnable {
262         private final String name;
263         private volatile boolean running;
264         private final BlockingQueue<Event> eventsQueue;
265         private final ExecutorService executor;
266         private volatile Future<?> dispatchFuture;
267
268         /**
269          * Constructor that accepts a loop name
270          * @param name name of this loop
271          */
272         Loop(String name){
273             this.name = name;
274             executor = Executors.newSingleThreadExecutor();
275             eventsQueue = new LinkedBlockingQueue<>();
276             dispatchFuture = executor.submit(this);
277         }
278
279         /**
280          * Add new event to this loop
281          * @param evt Event
282          * @return true
283          */
284         public boolean add(Event evt) {
285             return eventsQueue.add(evt);
286         }
287
288         /**
289          * Running loop that process event accordingly
290          */
291         @Override
292         public void run(){
293             running = true;
294             log.info("BandwidthEvaluator -- {} initiated", this.name);
295             while (running){
296                 try{
297                     Event event = eventsQueue.take();
298                     if (event == KILL_PILL){
299                         break;
300                     }
301                     process(event);
302                 } catch (InterruptedException e){
303                     log.warn("Process loop interrupted");
304                 } catch (Exception | Error e){
305                     log.warn("Process loop hit an error {}", e.getMessage());
306                 }
307             }
308         }
309
310         /**
311          * Operation defined by subclass for different event processing
312          * @param event incoming event
313          */
314         abstract public void process(Event event);
315
316         /**
317          * Stop this loop
318          */
319         public void stop(){
320             running = false;
321             add(KILL_PILL);
322         }
323     }
324 }