1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
22 package org.onap.slice.analysis.ms.service;
24 import org.onap.slice.analysis.ms.configdb.IConfigDbService;
25 import org.onap.slice.analysis.ms.models.Configuration;
26 import org.onap.slice.analysis.ms.utils.BeanUtil;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * This Thread class consumes message from pm data queue and sends onset message to policy
33 public class ConsumerThread extends Thread {
34 private static Logger log = LoggerFactory.getLogger(PmThread.class);
35 private PmDataQueue pmDataQueue;
36 private IConfigDbService configDbService;
37 private SnssaiSamplesProcessor snssaiSamplesProcessor;
38 private long initialDelaySec;
41 * Default constructor.
43 public ConsumerThread() {
45 this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class);
46 this.configDbService = BeanUtil.getBean(IConfigDbService.class);
47 this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class);
48 this.initialDelaySec = Configuration.getInstance().getInitialDelaySeconds();
52 * Consumes data from PM data queue, process the data and sends onset message to policy if needed
60 Thread.sleep(initialDelaySec);
61 snssai = pmDataQueue.getSnnsaiFromQueue();
62 if (!snssai.equals("")) {
63 log.info("Consumer thread started for s-nssai {}",snssai);
64 snssaiSamplesProcessor.processSamplesOfSnnsai(snssai, configDbService.fetchNetworkFunctionsOfSnssai(snssai));
66 } catch (Exception e) {
67 log.error("Exception in Consumer Thread ", e);