2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7 * =================================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9 * in compliance with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software distributed under the License
14 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15 * or implied. See the License for the specific language governing permissions and limitations under
17 * ============LICENSE_END==========================================================================
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
22 import java.util.LinkedList;
23 import java.util.List;
25 import java.util.Properties;
26 import org.apache.kafka.clients.admin.Admin;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
34 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
35 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer;
36 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer;
37 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 public class StrimziKafkaVESMsgConsumerMain implements Runnable {
43 private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerMain.class);
44 Properties strimziKafkaProperties = new Properties();
45 private static final String _PNFREG_CLASS =
46 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
47 private static final String _FAULT_CLASS =
48 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
49 private static final String _CM_CLASS =
50 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
51 private static final String _STNDDEFINED_FAULT_CLASS =
52 "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPStndDefinedFaultVESMsgConsumer";
53 private static final String _PNFREG_DOMAIN = "pnfRegistration";
54 private static final String _FAULT_DOMAIN = "fault";
55 private static final String _CM_DOMAIN = "provisioning";
56 private static final String _STNDDEFINED_FAULT_DOMAIN = "stndDefinedFault";
58 boolean threadsRunning = false;
59 List<StrimziKafkaVESMsgConsumer> consumers = new LinkedList<>();
60 private PNFRegistrationConfig pnfRegistrationConfig;
61 private FaultConfig faultConfig;
62 private GeneralConfig generalConfig;
63 private ProvisioningConfig provisioningConfig;
64 private StndDefinedFaultConfig stndDefinedFaultConfig;
65 private StrimziKafkaConfig strimziKafkaConfig;
66 private Admin kafkaAdminClient = null;
68 public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
69 this.generalConfig = generalConfig;
70 configMap.forEach(this::initialize);
73 public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig,
74 StrimziKafkaConfig strimziKafkaConfig) {
75 this.generalConfig = generalConfig;
76 this.strimziKafkaConfig = strimziKafkaConfig;
77 kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig));
78 configMap.forEach(this::initialize);
81 public void initialize(String domain, MessageConfig domainConfig) {
82 LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
84 Properties consumerProperties = new Properties();
85 if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
86 this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
87 consumerClass = _PNFREG_CLASS;
88 LOG.debug("Consumer class = {}", consumerClass);
90 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
91 pnfRegistrationConfig.getConsumerGroup());
92 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
93 pnfRegistrationConfig.getConsumerId());
94 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
95 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
96 pnfRegistrationConfig.getTimeout());
97 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
98 consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
99 pnfRegistrationConfig.getFetchPause());
102 createConsumer(_PNFREG_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
103 } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
104 this.faultConfig = (FaultConfig) domainConfig;
105 consumerClass = _FAULT_CLASS;
106 LOG.debug("Consumer class = {}", consumerClass);
107 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
108 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
109 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
110 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
111 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
112 consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
115 createConsumer(_FAULT_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
116 } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
117 this.provisioningConfig = (ProvisioningConfig) domainConfig;
118 consumerClass = _CM_CLASS;
119 LOG.debug("Consumer class = {}", consumerClass);
120 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
121 provisioningConfig.getConsumerGroup());
122 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
123 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
124 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
125 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
126 consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
127 provisioningConfig.getFetchPause());
129 threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
130 } else if (domain.equalsIgnoreCase(_STNDDEFINED_FAULT_DOMAIN)) {
131 this.stndDefinedFaultConfig = (StndDefinedFaultConfig) domainConfig;
132 consumerClass = _STNDDEFINED_FAULT_CLASS;
133 LOG.debug("Consumer class = {}", consumerClass);
134 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_GROUP,
135 stndDefinedFaultConfig.getConsumerGroup());
136 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_ID,
137 stndDefinedFaultConfig.getConsumerId());
138 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TOPIC,
139 stndDefinedFaultConfig.getTopic());
140 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
141 stndDefinedFaultConfig.getTimeout());
142 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_LIMIT,
143 stndDefinedFaultConfig.getLimit());
144 consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
145 stndDefinedFaultConfig.getFetchPause());
147 threadsRunning = createConsumer(_STNDDEFINED_FAULT_DOMAIN, consumerProperties,
148 getStrimziKafkaProps(strimziKafkaConfig));
152 private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
153 if (strimziKafkaProperties.size() == 0) {
154 strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers());
155 strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol());
156 strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism());
157 strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig());
159 return strimziKafkaProperties;
162 private boolean updateThreadState(List<StrimziKafkaVESMsgConsumer> consumers) {
163 boolean threadsRunning = false;
164 for (StrimziKafkaVESMsgConsumer consumer : consumers) {
165 if (consumer.isRunning()) {
166 threadsRunning = true;
169 return threadsRunning;
172 public boolean createConsumer(String consumerType, Properties consumerProperties, Properties strimziKafkaProps) {
173 StrimziKafkaVESMsgConsumerImpl consumer = null;
175 if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
176 consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient);
177 else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
178 consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
179 else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
180 consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient);
181 else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
182 consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
184 handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
185 return !consumers.isEmpty();
188 private boolean handleConsumer(StrimziKafkaVESMsgConsumer consumer, Properties consumerProperties,
189 Properties strimziKafkaProps, List<StrimziKafkaVESMsgConsumer> consumers) {
190 if (consumer != null) {
191 consumer.init(strimziKafkaProps, consumerProperties);
193 if (consumer.isReady()) {
194 Thread consumerThread = new Thread(consumer);
195 consumerThread.start();
196 consumers.add(consumer);
198 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), consumerProperties);
201 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
209 while (threadsRunning) {
210 threadsRunning = updateThreadState(consumers);
211 if (!threadsRunning) {
217 } catch (InterruptedException e) {
218 LOG.error(e.getLocalizedMessage(), e);
219 Thread.currentThread().interrupt();
222 kafkaAdminClient.close();
223 LOG.info("No listener threads running - exiting");
226 public List<StrimziKafkaVESMsgConsumer> getConsumers() {