Merge "fix oauth code"
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / impl / StrimziKafkaVESMsgConsumerMain.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7  * =================================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9  * in compliance with the License. You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the License
14  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15  * or implied. See the License for the specific language governing permissions and limitations under
16  * the License.
17  * ============LICENSE_END==========================================================================
18  */
19
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Properties;
26 import org.apache.kafka.clients.admin.Admin;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
34 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
35 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer;
36 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer;
37 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 public class StrimziKafkaVESMsgConsumerMain implements Runnable {
42
43     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerMain.class);
44     Properties strimziKafkaProperties = new Properties();
45     private static final String _PNFREG_CLASS =
46             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
47     private static final String _FAULT_CLASS =
48             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
49     private static final String _CM_CLASS =
50             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
51     private static final String _STNDDEFINED_FAULT_CLASS =
52             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPStndDefinedFaultVESMsgConsumer";
53     private static final String _PNFREG_DOMAIN = "pnfRegistration";
54     private static final String _FAULT_DOMAIN = "fault";
55     private static final String _CM_DOMAIN = "provisioning";
56     private static final String _STNDDEFINED_FAULT_DOMAIN = "stndDefinedFault";
57
58     boolean threadsRunning = false;
59     List<StrimziKafkaVESMsgConsumer> consumers = new LinkedList<>();
60     private PNFRegistrationConfig pnfRegistrationConfig;
61     private FaultConfig faultConfig;
62     private GeneralConfig generalConfig;
63     private ProvisioningConfig provisioningConfig;
64     private StndDefinedFaultConfig stndDefinedFaultConfig;
65     private StrimziKafkaConfig strimziKafkaConfig;
66     private Admin kafkaAdminClient = null;
67
68     public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
69         this.generalConfig = generalConfig;
70         configMap.forEach(this::initialize);
71     }
72
73     public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig,
74             StrimziKafkaConfig strimziKafkaConfig) {
75         this.generalConfig = generalConfig;
76         this.strimziKafkaConfig = strimziKafkaConfig;
77         kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig));
78         configMap.forEach(this::initialize);
79     }
80
81     public void initialize(String domain, MessageConfig domainConfig) {
82         LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
83         String consumerClass;
84         Properties consumerProperties = new Properties();
85         if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
86             this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
87             consumerClass = _PNFREG_CLASS;
88             LOG.debug("Consumer class = {}", consumerClass);
89
90             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
91                     pnfRegistrationConfig.getConsumerGroup());
92             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
93                     pnfRegistrationConfig.getConsumerId());
94             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
95             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
96                     pnfRegistrationConfig.getTimeout());
97             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
98             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
99                     pnfRegistrationConfig.getFetchPause());
100
101             threadsRunning =
102                     createConsumer(_PNFREG_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
103         } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
104             this.faultConfig = (FaultConfig) domainConfig;
105             consumerClass = _FAULT_CLASS;
106             LOG.debug("Consumer class = {}", consumerClass);
107             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
108             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
109             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
110             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
111             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
112             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
113
114             threadsRunning =
115                     createConsumer(_FAULT_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
116         } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
117             this.provisioningConfig = (ProvisioningConfig) domainConfig;
118             consumerClass = _CM_CLASS;
119             LOG.debug("Consumer class = {}", consumerClass);
120             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
121                     provisioningConfig.getConsumerGroup());
122             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
123             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
124             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
125             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
126             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
127                     provisioningConfig.getFetchPause());
128
129             threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
130         } else if (domain.equalsIgnoreCase(_STNDDEFINED_FAULT_DOMAIN)) {
131             this.stndDefinedFaultConfig = (StndDefinedFaultConfig) domainConfig;
132             consumerClass = _STNDDEFINED_FAULT_CLASS;
133             LOG.debug("Consumer class = {}", consumerClass);
134             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_GROUP,
135                     stndDefinedFaultConfig.getConsumerGroup());
136             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_ID,
137                     stndDefinedFaultConfig.getConsumerId());
138             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TOPIC,
139                     stndDefinedFaultConfig.getTopic());
140             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
141                     stndDefinedFaultConfig.getTimeout());
142             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_LIMIT,
143                     stndDefinedFaultConfig.getLimit());
144             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
145                     stndDefinedFaultConfig.getFetchPause());
146
147             threadsRunning = createConsumer(_STNDDEFINED_FAULT_DOMAIN, consumerProperties,
148                     getStrimziKafkaProps(strimziKafkaConfig));
149         }
150     }
151
152     private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
153         if (strimziKafkaProperties.size() == 0) {
154             strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers());
155             strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol());
156             strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism());
157             strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig());
158         }
159         return strimziKafkaProperties;
160     }
161
162     private boolean updateThreadState(List<StrimziKafkaVESMsgConsumer> consumers) {
163         boolean threadsRunning = false;
164         for (StrimziKafkaVESMsgConsumer consumer : consumers) {
165             if (consumer.isRunning()) {
166                 threadsRunning = true;
167             }
168         }
169         return threadsRunning;
170     }
171
172     public boolean createConsumer(String consumerType, Properties consumerProperties, Properties strimziKafkaProps) {
173         StrimziKafkaVESMsgConsumerImpl consumer = null;
174
175         if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
176             consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient);
177         else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
178             consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
179         else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
180             consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient);
181         else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
182             consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
183
184         handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
185         return !consumers.isEmpty();
186     }
187
188     private boolean handleConsumer(StrimziKafkaVESMsgConsumer consumer, Properties consumerProperties,
189             Properties strimziKafkaProps, List<StrimziKafkaVESMsgConsumer> consumers) {
190         if (consumer != null) {
191             consumer.init(strimziKafkaProps, consumerProperties);
192
193             if (consumer.isReady()) {
194                 Thread consumerThread = new Thread(consumer);
195                 consumerThread.start();
196                 consumers.add(consumer);
197
198                 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), consumerProperties);
199                 return true;
200             } else {
201                 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
202             }
203         }
204         return false;
205     }
206
207     @Override
208     public void run() {
209         while (threadsRunning) {
210             threadsRunning = updateThreadState(consumers);
211             if (!threadsRunning) {
212                 break;
213             }
214
215             try {
216                 Thread.sleep(10000);
217             } catch (InterruptedException e) {
218                 LOG.error(e.getLocalizedMessage(), e);
219                 Thread.currentThread().interrupt();
220             }
221         }
222         kafkaAdminClient.close();
223         LOG.info("No listener threads running - exiting");
224     }
225
226     public List<StrimziKafkaVESMsgConsumer> getConsumers() {
227         return consumers;
228     }
229
230 }