1 /*******************************************************************************
2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
17 ******************************************************************************/
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21 import java.util.LinkedList;
22 import java.util.List;
24 import java.util.Properties;
26 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 public class DMaaPVESMsgConsumerMain implements Runnable {
32 private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
34 boolean threadsRunning = false;
35 static List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
36 public GeneralConfig config;
37 public PNFRegistrationConfig pnfRegistrationConfig;
38 public FaultConfig faultConfig;
40 public DMaaPVESMsgConsumerMain(Map<String, Configuration> configMap) {
41 configMap.forEach((k, v) -> initialize(k, v));
44 public void initialize(String domain, Configuration domainConfig) {
45 LOG.debug("In initialize method : Domain = {} and domainConfig = {}",domain,domainConfig);
46 String consumerClass = null;
47 Properties consumerProperties = new Properties();
48 if (domain.equalsIgnoreCase("pnfregistration")) {
49 this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
51 consumerClass = pnfRegistrationConfig.getConsumerClass();
52 LOG.debug("Consumer class = "+consumerClass);
54 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, pnfRegistrationConfig.getTransportType());
55 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, pnfRegistrationConfig.getHostPort());
56 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, pnfRegistrationConfig.getContenttype());
57 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP, pnfRegistrationConfig.getConsumerGroup());
58 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID, pnfRegistrationConfig.getConsumerId());
59 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
60 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, pnfRegistrationConfig.getTimeout());
61 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
62 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, pnfRegistrationConfig.getFetchPause());
63 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, pnfRegistrationConfig.getProtocol());
64 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME, pnfRegistrationConfig.getUsername());
65 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD, pnfRegistrationConfig.getPassword());
66 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, pnfRegistrationConfig.getClientReadTimeout());
67 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, pnfRegistrationConfig.getClientConnectTimeout());
68 } else if (domain.equalsIgnoreCase("fault")) {
69 this.faultConfig = (FaultConfig) domainConfig;
70 consumerClass = faultConfig.getConsumerClass();
71 LOG.debug("Consumer class = {}",consumerClass);
72 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
73 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
74 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
75 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
76 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
77 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
78 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
79 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
80 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
81 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
82 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
83 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
84 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, faultConfig.getClientReadTimeout());
85 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, faultConfig.getClientConnectTimeout());
88 if (consumerClass != null) {
89 LOG.info("Calling createConsumer : {}",consumerClass);
90 threadsRunning = createConsumer(consumerClass, consumerProperties);
95 private static boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
96 boolean threadsRunning = false;
97 for (DMaaPVESMsgConsumer consumer : consumers) {
98 if (consumer.isRunning()) {
99 threadsRunning = true;
102 return threadsRunning;
105 static boolean createConsumer(String consumerClassName, Properties properties) {
106 Class<?> consumerClass = null;
109 consumerClass = Class.forName(consumerClassName);
110 } catch (Exception e) {
111 LOG.error("Could not find DMaap VES Message consumer class {}", consumerClassName, e);
114 if (consumerClass != null) {
115 LOG.debug("Calling handleConsumerClass");
116 handleConsumerClass(consumerClass, consumerClassName, properties, consumers);
118 return !consumers.isEmpty();
121 private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, Properties properties, List<DMaaPVESMsgConsumer> consumers) {
122 DMaaPVESMsgConsumer consumer = null;
125 consumer = (DMaaPVESMsgConsumer) consumerClass.newInstance();
126 LOG.debug("Successfully created an instance of consumerClass : {}",consumerClassName);
127 } catch (Exception e) {
128 LOG.error("Could not create consumer from class {}",consumerClassName, e);
131 if (consumer != null) {
132 LOG.info("Initializing consumer {}({})", consumerClassName, properties);
133 consumer.init(properties);
135 if (consumer.isReady()) {
136 Thread consumerThread = new Thread(consumer);
137 consumerThread.start();
138 consumers.add(consumer);
140 LOG.info("Started consumer thread ({} : {})", consumerClassName,
144 LOG.debug("Consumer {} is not ready", consumerClassName);
151 while (threadsRunning) {
152 threadsRunning = updateThreadState(consumers);
153 if (!threadsRunning) {
159 } catch (InterruptedException e) {
160 LOG.error(e.getLocalizedMessage(), e);
164 LOG.info("No listener threads running - exiting");
167 public static List<DMaaPVESMsgConsumer> getConsumers() {