2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21 import java.util.LinkedList;
22 import java.util.List;
24 import java.util.Properties;
26 import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 public class DMaaPVESMsgConsumerMain implements Runnable {
32 private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
34 boolean threadsRunning = false;
35 static List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
36 public GeneralConfig config;
37 public PNFRegistrationConfig pnfRegistrationConfig;
38 public FaultConfig faultConfig;
40 public DMaaPVESMsgConsumerMain(Map<String, Configuration> configMap) {
41 configMap.forEach((k, v) -> initialize(k, v));
44 public void initialize(String domain, Configuration domainConfig) {
45 LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
46 String consumerClass = null;
47 Properties consumerProperties = new Properties();
48 if (domain.equalsIgnoreCase("pnfregistration")) {
49 this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
51 consumerClass = pnfRegistrationConfig.getConsumerClass();
52 LOG.debug("Consumer class = " + consumerClass);
54 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
55 pnfRegistrationConfig.getTransportType());
56 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
57 pnfRegistrationConfig.getHostPort());
58 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
59 pnfRegistrationConfig.getContenttype());
60 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
61 pnfRegistrationConfig.getConsumerGroup());
62 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
63 pnfRegistrationConfig.getConsumerId());
64 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
65 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
66 pnfRegistrationConfig.getTimeout());
67 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
68 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
69 pnfRegistrationConfig.getFetchPause());
70 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
71 pnfRegistrationConfig.getProtocol());
72 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
73 pnfRegistrationConfig.getUsername());
74 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
75 pnfRegistrationConfig.getPassword());
76 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
77 pnfRegistrationConfig.getClientReadTimeout());
78 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
79 pnfRegistrationConfig.getClientConnectTimeout());
80 } else if (domain.equalsIgnoreCase("fault")) {
81 this.faultConfig = (FaultConfig) domainConfig;
82 consumerClass = faultConfig.getConsumerClass();
83 LOG.debug("Consumer class = {}", consumerClass);
84 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
85 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
86 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
87 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
88 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
89 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
90 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
91 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
92 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
93 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
94 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
95 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
96 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
97 faultConfig.getClientReadTimeout());
98 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
99 faultConfig.getClientConnectTimeout());
102 if (consumerClass != null) {
103 LOG.info("Calling createConsumer : {}", consumerClass);
104 threadsRunning = createConsumer(consumerClass, consumerProperties);
109 private static boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
110 boolean threadsRunning = false;
111 for (DMaaPVESMsgConsumer consumer : consumers) {
112 if (consumer.isRunning()) {
113 threadsRunning = true;
116 return threadsRunning;
119 static boolean createConsumer(String consumerClassName, Properties properties) {
120 Class<?> consumerClass = null;
123 consumerClass = Class.forName(consumerClassName);
124 } catch (Exception e) {
125 LOG.error("Could not find DMaap VES Message consumer class {}", consumerClassName, e);
128 if (consumerClass != null) {
129 LOG.debug("Calling handleConsumerClass");
130 handleConsumerClass(consumerClass, consumerClassName, properties, consumers);
132 return !consumers.isEmpty();
135 private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, Properties properties,
136 List<DMaaPVESMsgConsumer> consumers) {
137 DMaaPVESMsgConsumer consumer = null;
140 consumer = (DMaaPVESMsgConsumer) consumerClass.newInstance();
141 LOG.debug("Successfully created an instance of consumerClass : {}", consumerClassName);
142 } catch (Exception e) {
143 LOG.error("Could not create consumer from class {}", consumerClassName, e);
146 if (consumer != null) {
147 LOG.info("Initializing consumer {}({})", consumerClassName, properties);
148 consumer.init(properties);
150 if (consumer.isReady()) {
151 Thread consumerThread = new Thread(consumer);
152 consumerThread.start();
153 consumers.add(consumer);
155 LOG.info("Started consumer thread ({} : {})", consumerClassName, properties);
158 LOG.debug("Consumer {} is not ready", consumerClassName);
165 while (threadsRunning) {
166 threadsRunning = updateThreadState(consumers);
167 if (!threadsRunning) {
173 } catch (InterruptedException e) {
174 LOG.error(e.getLocalizedMessage(), e);
178 LOG.info("No listener threads running - exiting");
181 public static List<DMaaPVESMsgConsumer> getConsumers() {