2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7 * =================================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9 * in compliance with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software distributed under the License
14 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15 * or implied. See the License for the specific language governing permissions and limitations under
17 * ============LICENSE_END==========================================================================
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
22 import java.util.LinkedList;
23 import java.util.List;
25 import java.util.Properties;
26 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
34 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer;
35 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer;
36 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 public class StrimziKafkaVESMsgConsumerMain implements Runnable {
42 private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerMain.class);
43 Properties strimziKafkaProperties = new Properties();
44 private static final String _PNFREG_CLASS =
45 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
46 private static final String _FAULT_CLASS =
47 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
48 private static final String _CM_CLASS =
49 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
50 private static final String _STNDDEFINED_FAULT_CLASS =
51 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPStndDefinedFaultVESMsgConsumer";
52 private static final String _PNFREG_DOMAIN = "pnfRegistration";
53 private static final String _FAULT_DOMAIN = "fault";
54 private static final String _CM_DOMAIN = "provisioning";
55 private static final String _STNDDEFINED_FAULT_DOMAIN = "stndDefinedFault";
57 boolean threadsRunning = false;
58 List<StrimziKafkaVESMsgConsumer> consumers = new LinkedList<>();
59 private PNFRegistrationConfig pnfRegistrationConfig;
60 private FaultConfig faultConfig;
61 private GeneralConfig generalConfig;
62 private ProvisioningConfig provisioningConfig;
63 private StndDefinedFaultConfig stndDefinedFaultConfig;
64 private StrimziKafkaConfig strimziKafkaConfig;
66 public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
67 this.generalConfig = generalConfig;
68 configMap.forEach(this::initialize);
71 public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig,
72 StrimziKafkaConfig strimziKafkaConfig) {
73 this.generalConfig = generalConfig;
74 this.strimziKafkaConfig = strimziKafkaConfig;
75 configMap.forEach(this::initialize);
78 public void initialize(String domain, MessageConfig domainConfig) {
79 LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
81 Properties consumerProperties = new Properties();
82 if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
83 this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
84 consumerClass = _PNFREG_CLASS;
85 LOG.debug("Consumer class = {}", consumerClass);
87 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
88 pnfRegistrationConfig.getConsumerGroup());
89 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
90 pnfRegistrationConfig.getConsumerId());
91 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
92 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
93 pnfRegistrationConfig.getTimeout());
94 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
95 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
96 pnfRegistrationConfig.getFetchPause());
99 createConsumer(_PNFREG_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
100 } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
101 this.faultConfig = (FaultConfig) domainConfig;
102 consumerClass = _FAULT_CLASS;
103 LOG.debug("Consumer class = {}", consumerClass);
104 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
105 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
106 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
107 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
108 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
109 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
112 createConsumer(_FAULT_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
113 } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
114 this.provisioningConfig = (ProvisioningConfig) domainConfig;
115 consumerClass = _CM_CLASS;
116 LOG.debug("Consumer class = {}", consumerClass);
117 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
118 provisioningConfig.getConsumerGroup());
119 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
120 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
121 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
122 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
123 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
124 provisioningConfig.getFetchPause());
126 threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
127 } else if (domain.equalsIgnoreCase(_STNDDEFINED_FAULT_DOMAIN)) {
128 this.stndDefinedFaultConfig = (StndDefinedFaultConfig) domainConfig;
129 consumerClass = _STNDDEFINED_FAULT_CLASS;
130 LOG.debug("Consumer class = {}", consumerClass);
131 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_GROUP,
132 stndDefinedFaultConfig.getConsumerGroup());
133 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_ID,
134 stndDefinedFaultConfig.getConsumerId());
135 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TOPIC,
136 stndDefinedFaultConfig.getTopic());
137 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
138 stndDefinedFaultConfig.getTimeout());
139 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_LIMIT,
140 stndDefinedFaultConfig.getLimit());
141 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
142 stndDefinedFaultConfig.getFetchPause());
144 threadsRunning = createConsumer(_STNDDEFINED_FAULT_DOMAIN, consumerProperties,
145 getStrimziKafkaProps(strimziKafkaConfig));
149 private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
150 if (strimziKafkaProperties.size() == 0) {
151 strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers());
152 strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol());
153 strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism());
154 strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig());
156 return strimziKafkaProperties;
159 private boolean updateThreadState(List<StrimziKafkaVESMsgConsumer> consumers) {
160 boolean threadsRunning = false;
161 for (StrimziKafkaVESMsgConsumer consumer : consumers) {
162 if (consumer.isRunning()) {
163 threadsRunning = true;
166 return threadsRunning;
169 public boolean createConsumer(String consumerType, Properties consumerProperties, Properties strimziKafkaProps) {
170 StrimziKafkaVESMsgConsumerImpl consumer = null;
172 if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
173 consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig);
174 else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
175 consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig);
176 else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
177 consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig);
178 else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
179 consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig);
181 handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
182 return !consumers.isEmpty();
185 private boolean handleConsumer(StrimziKafkaVESMsgConsumer consumer, Properties consumerProperties,
186 Properties strimziKafkaProps, List<StrimziKafkaVESMsgConsumer> consumers) {
187 if (consumer != null) {
188 consumer.init(strimziKafkaProps, consumerProperties);
190 if (consumer.isReady()) {
191 Thread consumerThread = new Thread(consumer);
192 consumerThread.start();
193 consumers.add(consumer);
195 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), consumerProperties);
198 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
206 while (threadsRunning) {
207 threadsRunning = updateThreadState(consumers);
208 if (!threadsRunning) {
214 } catch (InterruptedException e) {
215 LOG.error(e.getLocalizedMessage(), e);
216 Thread.currentThread().interrupt();
220 LOG.info("No listener threads running - exiting");
223 public List<StrimziKafkaVESMsgConsumer> getConsumers() {