[DCAEGEN2] Release dcaegen2-services-kpi-computation-ms container
[dcaegen2/services.git] / components / slice-analysis-ms / src / main / java / org / onap / slice / analysis / ms / service / ConsumerThread.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  slice-analysis-ms
4  *  ================================================================================
5  *   Copyright (C) 2020-2022 Wipro Limited.
6  *   ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *
20  *******************************************************************************/
21
22 package org.onap.slice.analysis.ms.service;
23
24 import java.util.List;
25 import java.util.Objects;
26
27 import org.onap.slice.analysis.ms.configdb.IConfigDbService;
28 import org.onap.slice.analysis.ms.cps.CpsInterface;
29 import org.onap.slice.analysis.ms.models.Configuration;
30 import org.onap.slice.analysis.ms.models.SubCounter;
31 import org.onap.slice.analysis.ms.utils.BeanUtil;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This Thread class consumes message from pm data queue and sends onset message to policy
37  */
38 public class ConsumerThread extends Thread {
39     private static Logger log = LoggerFactory.getLogger(ConsumerThread.class);
40     private PmDataQueue pmDataQueue;
41     private IConfigDbService configDbService;
42     private SnssaiSamplesProcessor snssaiSamplesProcessor;
43     private CpsInterface cpsInterface;
44     private long initialDelaySec;
45     private int samples;
46
47     /**
48      * Default constructor.
49      */
50     public ConsumerThread() {
51         super();
52         this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class);
53         this.configDbService = BeanUtil.getBean(IConfigDbService.class);
54         this.initialDelaySec = Configuration.getInstance().getInitialDelaySeconds();
55         this.samples = Configuration.getInstance().getSamples();
56         this.cpsInterface = BeanUtil.getBean(CpsInterface.class);
57     }
58
59     /**
60      * Consumes data from PM data queue, process the data and sends onset message to policy if needed
61      */
62     @Override
63     public void run() {
64         Boolean isConfigDbEnabled = (Objects.isNull(Configuration.getInstance().getConfigDbEnabled())) ? true
65                 : Configuration.getInstance().getConfigDbEnabled();
66         boolean done = false;
67         boolean result = false;
68         String snssai = "";
69         List<String> nfs = null;
70         while (!done) {
71             try {
72                 Thread.sleep(initialDelaySec);
73                 log.info("Starting Consumer Thread");
74                 snssai = pmDataQueue.getSnnsaiFromQueue();
75                 if (!snssai.equals("")) {
76                     log.info("Consumer thread processing data for s-nssai {}", snssai);
77                     try {
78                         if (isConfigDbEnabled) {
79                             nfs = configDbService.fetchNetworkFunctionsOfSnssai(snssai);
80                         } else {
81                             nfs = cpsInterface.fetchNetworkFunctionsOfSnssai(snssai);
82                         }
83                     } catch (Exception e) {
84                         pmDataQueue.putSnssaiToQueue(snssai);
85                         log.error("Exception caught while fetching nfs of snssai {}, {}", snssai, e.getMessage());
86                     }
87                     if (nfs != null && checkForEnoughSamples(nfs, snssai)) {
88                         this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class);
89                         result = snssaiSamplesProcessor.processSamplesOfSnnsai(snssai, nfs);
90                         if (!result) {
91                             log.info("Not enough samples to process for {}", snssai);
92                             pmDataQueue.putSnssaiToQueue(snssai);
93                         }
94                     }
95                 }
96             } catch (Exception e) {
97                 log.error("Exception in Consumer Thread, {}", e.getMessage());
98                 done = true;
99             }
100         }
101     }
102
103     /**
104      * Checks whether enough samples are available for the network functions
105      */
106     public boolean checkForEnoughSamples(List<String> nfs, String snssai) {
107         for (String nf : nfs) {
108             if (!pmDataQueue.checkSamplesInQueue(new SubCounter(nf, snssai), samples)) {
109                 log.info("Not enough samples to process for network function {} of snssai {}", nf, snssai);
110                 pmDataQueue.putSnssaiToQueue(snssai);
111                 return false;
112             }
113         }
114         return true;
115     }
116 }