7ca100d138f473b73a6e603f775c54c4c51d50a4
[dcaegen2/services.git] /
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  slice-analysis-ms
4  *  ================================================================================
5  *   Copyright (C) 2022 Huawei Canada Limited.
6  *   Copyright (C) 2022 Huawei Technologies Co., Ltd.
7  *  ==============================================================================
8  *     Licensed under the Apache License, Version 2.0 (the "License");
9  *     you may not use this file except in compliance with the License.
10  *     You may obtain a copy of the License at
11  *
12  *          http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *     Unless required by applicable law or agreed to in writing, software
15  *     distributed under the License is distributed on an "AS IS" BASIS,
16  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *     See the License for the specific language governing permissions and
18  *     limitations under the License.
19  *     ============LICENSE_END=========================================================
20  *
21  *******************************************************************************/
22 package org.onap.slice.analysis.ms.service.ccvpn;
23
24 import lombok.NonNull;
25 import org.onap.slice.analysis.ms.aai.AaiService;
26
27 import org.onap.slice.analysis.ms.models.Configuration;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.springframework.beans.factory.annotation.Autowired;
31 import org.springframework.stereotype.Component;
32
33 import javax.annotation.PostConstruct;
34 import javax.annotation.PreDestroy;
35 import java.util.Map;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.LinkedBlockingQueue;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.TimeUnit;
43
44 /**
45  * This class implements the CCVPN PM Closed-loop logical function.
46  * A simple actor model design is implemented here.
47  */
48 @Component
49 public class BandwidthEvaluator {
50     private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
51     private Configuration configuration;
52
53     @Autowired
54     AaiService aaiService;
55
56     @Autowired
57     CCVPNPmDatastore ccvpnPmDatastore;
58
59     @Autowired
60     StrategyFactory strategyFactory;
61
62     private Loop evaluationEventLoop;
63     private Loop aaiEventLoop;
64
65     private static final Event KILL_PILL = new SimpleEvent(null, 0);
66     private static final int DEFAULT_EVAL_INTERVAL = 5;
67     private static final String DEFAULT_STRATEGY_NAME = "FixedUpperBoundStrategy";
68     /**
69      * Interval of each round of evaluation, defined in config_all.json
70      */
71     private static int evaluationInterval;
72
73     /**
74      * Bandwidth Evaluation and adjustment strategy.
75      */
76     private static String strategyName;
77
78     private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
79
80     /**
81      * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
82      */
83     @PostConstruct
84     public void init() {
85         loadConfig();
86         strategyName = (strategyName != null)? strategyName : DEFAULT_STRATEGY_NAME;
87         evaluationInterval = (evaluationInterval == 0)? DEFAULT_EVAL_INTERVAL : evaluationInterval;
88         EvaluationStrategy strategy = strategyFactory.getStrategy(strategyName);
89         log.info("{} is utilized as the bandwidth evaluatior strategy", strategyName);
90
91         /**
92          * Evalution main loop
93          */
94         evaluationEventLoop = new Loop("EvaluationLoop"){
95             @Override
96             public void process(Event event) {
97                 strategy.execute(event);
98             }
99         };
100
101         /**
102          * AAI data consumer loop
103          */
104         aaiEventLoop = new Loop("AAIEventLoop"){
105             @Override
106             public void process(Event event) {
107                 if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
108                     log.debug("=== Processing new AAI network policy query at: {} ===", event.time());
109                     String serviceId = (String) event.subject();
110                     Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
111                     if (maxBandwidthData.get("maxBandwidth") != null){
112                         log.info("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}",
113                                 serviceId, maxBandwidthData.get("maxBandwidth"));
114                         int bwValue = maxBandwidthData.get("maxBandwidth").intValue();
115                         if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) == 0){
116                             ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true);
117                         } else if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) != bwValue) {
118                             log.info("Service modification complete; serviceId: {} with new bandwidth: {}", serviceId, bwValue);
119                             ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true);
120                             ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
121                             log.debug("Service state of {} is changed to running", serviceId);
122                         }
123                     }
124                     log.debug("=== Processing AAI network policy query complete ===");
125                 }
126             }
127         };
128         scheduleEvaluation();
129     }
130
131     /**
132      * Stop the bandwidth evaluator process including two actors and periodic usage check
133      */
134     @PreDestroy
135     public void stop(){
136         stopScheduleEvaluation();
137         aaiEventLoop.stop();
138         evaluationEventLoop.stop();
139     }
140
141     /**
142      * Start to schedule periodic usage check at fixed rate
143      */
144     private void scheduleEvaluation(){
145         executorPool.scheduleAtFixedRate(new Runnable() {
146                 @Override
147                 public void run() {
148                     post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
149                 }
150             }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
151     }
152
153     /**
154      * Stop periodic bandwidth usage check
155      */
156     private void stopScheduleEvaluation(){
157         executorPool.shutdownNow();
158     }
159
160     /**
161      * Post/broadcast event between Loops
162      * @param event event object
163      */
164     public void post(@NonNull Event event){
165         log.info("A new event triggered, type: {}, subject: {}, at time: {}",
166                 event.type(), event.subject(), event.time());
167         if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
168             aaiEventLoop.add(event);
169         } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
170             evaluationEventLoop.add(event);
171         } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
172             evaluationEventLoop.add(event);
173         }
174     }
175
176     // update configuration
177     private void loadConfig() {
178         configuration = Configuration.getInstance();
179         evaluationInterval = configuration.getCcvpnEvalInterval();
180         strategyName = configuration.getCcvpnEvalStrategy();
181         log.info("Evaluation loop configs has been loaded. Strategy {}.", strategyName);
182     }
183
184     /**
185      * Inner loop implementation. Each loop acts like an actor.
186      */
187     private abstract class Loop implements Runnable {
188         private final String name;
189         private volatile boolean running;
190         private final BlockingQueue<Event> eventsQueue;
191         private final ExecutorService executor;
192         private volatile Future<?> dispatchFuture;
193
194         /**
195          * Constructor that accepts a loop name
196          * @param name name of this loop
197          */
198         Loop(String name){
199             this.name = name;
200             executor = Executors.newSingleThreadExecutor();
201             eventsQueue = new LinkedBlockingQueue<>();
202             dispatchFuture = executor.submit(this);
203         }
204
205         /**
206          * Add new event to this loop
207          * @param evt Event
208          * @return true
209          */
210         public boolean add(Event evt) {
211             return eventsQueue.add(evt);
212         }
213
214         /**
215          * Running loop that process event accordingly
216          */
217         @Override
218         public void run(){
219             running = true;
220             log.info("BandwidthEvaluator -- {} initiated", this.name);
221             while (running){
222                 try{
223                     Event event = eventsQueue.take();
224                     if (event == KILL_PILL){
225                         break;
226                     }
227                     process(event);
228                 } catch (InterruptedException e){
229                     log.warn("Process loop interrupted");
230                 } catch (Exception | Error e){
231                     log.warn("Process loop hit an error {}", e.getMessage());
232                 }
233             }
234         }
235
236         /**
237          * Operation defined by subclass for different event processing
238          * @param event incoming event
239          */
240         abstract public void process(Event event);
241
242         /**
243          * Stop this loop
244          */
245         public void stop(){
246             running = false;
247             add(KILL_PILL);
248         }
249     }
250 }