4339180991019bdd4e6b67afe887f1eec5ff24f0
[dcaegen2/services.git] /
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  slice-analysis-ms
4  *  ================================================================================
5  *   Copyright (C) 2022 Huawei Canada Limited.
6  *  ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *
20  *******************************************************************************/
21 package org.onap.slice.analysis.ms.service.ccvpn;
22
23 import lombok.NonNull;
24 import org.onap.slice.analysis.ms.aai.AaiService;
25
26 import org.onap.slice.analysis.ms.models.Configuration;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import org.springframework.beans.factory.annotation.Autowired;
30 import org.springframework.stereotype.Component;
31
32 import javax.annotation.PostConstruct;
33 import javax.annotation.PreDestroy;
34 import java.util.Map;
35 import java.util.concurrent.BlockingQueue;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.ScheduledExecutorService;
41 import java.util.concurrent.TimeUnit;
42
43 /**
44  * This class implements the CCVPN PM Closed-loop logical function.
45  * A simple actor model design is implemented here.
46  */
47 @Component
48 public class BandwidthEvaluator {
49     private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
50     private Configuration configuration;
51
52     @Autowired
53     AaiService aaiService;
54
55     @Autowired
56     CCVPNPmDatastore ccvpnPmDatastore;
57
58     @Autowired
59     StrategyFactory strategyFactory;
60
61     private Loop evaluationEventLoop;
62     private Loop aaiEventLoop;
63
64     private static final Event KILL_PILL = new SimpleEvent(null, 0);
65     private static final int DEFAULT_EVAL_INTERVAL = 5;
66     private static final String DEFAULT_STRATEGY_NAME = "FixedUpperBoundStrategy";
67
68     /**
69      * Interval of each round of evaluation, defined in config_all.json
70      */
71     private static int evaluationInterval;
72
73     /**
74      * Bandwidth Evaluation and adjustment strategy.
75      */
76     private static String strategyName;
77
78     private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
79
80     /**
81      * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
82      */
83     @PostConstruct
84     public void init() {
85         loadConfig();
86         strategyName = (strategyName != null)? strategyName : DEFAULT_STRATEGY_NAME;
87         evaluationInterval = (evaluationInterval == 0)? DEFAULT_EVAL_INTERVAL : evaluationInterval;
88         EvaluationStrategy strategy = strategyFactory.getStrategy(strategyName);
89
90         /**
91          * Evalution main loop
92          */
93         evaluationEventLoop = new Loop("EvaluationLoop"){
94             @Override
95             public void process(Event event) {
96                 strategy.execute(event);
97             }
98         };
99
100         /**
101          * AAI data consumer loop
102          */
103         aaiEventLoop = new Loop("AAIEventLoop"){
104             @Override
105             public void process(Event event) {
106                 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
107                     log.debug("=== Processing new AAI network policy query at: {} ===", event.time());
108                     String serviceId = (String) event.subject();
109                     Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
110                     if (maxBandwidthData.get("maxBandwidth") != null){
111                         log.info("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}",
112                                 serviceId, maxBandwidthData.get("maxBandwidth"));
113                         int bwValue = maxBandwidthData.get("maxBandwidth").intValue();
114                         if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) == 0){
115                             ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true);
116                         } else if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) != bwValue) {
117                             log.info("Service modification complete; serviceId: {} with new bandwidth: {}", serviceId, bwValue);
118                             ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true);
119                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
120                         }
121                     }
122                     log.debug("=== Processing AAI network policy query complete ===");
123                 }
124             }
125         };
126         scheduleEvaluation();
127     }
128
129     /**
130      * Stop the bandwidth evaluator process including two actors and periodic usage check
131      */
132     @PreDestroy
133     public void stop(){
134         stopScheduleEvaluation();
135         aaiEventLoop.stop();
136         evaluationEventLoop.stop();
137     }
138
139     /**
140      * Start to schedule periodic usage check at fixed rate
141      */
142     private void scheduleEvaluation(){
143         executorPool.scheduleAtFixedRate(new Runnable() {
144                 @Override
145                 public void run() {
146                     post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
147                 }
148             }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
149     }
150
151     /**
152      * Stop periodic bandwidth usage check
153      */
154     private void stopScheduleEvaluation(){
155         executorPool.shutdownNow();
156     }
157
158     /**
159      * Post/broadcast event between Loops
160      * @param event event object
161      */
162     public void post(@NonNull Event event){
163         log.debug("A new event triggered, type: {}, subject: {}, at time: {}",
164                 event.type(), event.subject(), event.time());
165         if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
166             aaiEventLoop.add(event);
167         } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
168             evaluationEventLoop.add(event);
169         } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
170             evaluationEventLoop.add(event);
171         }
172     }
173
174     private void loadConfig() {
175         configuration = Configuration.getInstance();
176         evaluationInterval = configuration.getCcvpnEvalInterval();
177         strategyName = configuration.getCcvpnEvalStrategy();
178     }
179
180     /**
181      * Inner loop implementation. Each loop acts like an actor.
182      */
183     private abstract class Loop implements Runnable {
184         private final String name;
185         private volatile boolean running;
186         private final BlockingQueue<Event> eventsQueue;
187         private final ExecutorService executor;
188         private volatile Future<?> dispatchFuture;
189
190         /**
191          * Constructor that accepts a loop name
192          * @param name name of this loop
193          */
194         Loop(String name){
195             this.name = name;
196             executor = Executors.newSingleThreadExecutor();
197             eventsQueue = new LinkedBlockingQueue<>();
198             dispatchFuture = executor.submit(this);
199         }
200
201         /**
202          * Add new event to this loop
203          * @param evt Event
204          * @return true
205          */
206         public boolean add(Event evt) {
207             return eventsQueue.add(evt);
208         }
209
210         /**
211          * Running loop that process event accordingly
212          */
213         @Override
214         public void run(){
215             running = true;
216             log.info("BandwidthEvaluator -- {} initiated", this.name);
217             while (running){
218                 try{
219                     Event event = eventsQueue.take();
220                     if (event == KILL_PILL){
221                         break;
222                     }
223                     process(event);
224                 } catch (InterruptedException e){
225                     log.warn("Process loop interrupted");
226                 } catch (Exception | Error e){
227                     log.warn("Process loop hit an error {}", e.getMessage());
228                 }
229             }
230         }
231
232         /**
233          * Operation defined by subclass for different event processing
234          * @param event incoming event
235          */
236         abstract public void process(Event event);
237
238         /**
239          * Stop this loop
240          */
241         public void stop(){
242             running = false;
243             add(KILL_PILL);
244         }
245     }
246 }