fix oauth code
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / impl / StrimziKafkaVESMsgConsumerMain.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7  * =================================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9  * in compliance with the License. You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the License
14  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15  * or implied. See the License for the specific language governing permissions and limitations under
16  * the License.
17  * ============LICENSE_END==========================================================================
18  */
19
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Properties;
26 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
34 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer;
35 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer;
36 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 public class StrimziKafkaVESMsgConsumerMain implements Runnable {
41
42     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerMain.class);
43     Properties strimziKafkaProperties = new Properties();
44     private static final String _PNFREG_CLASS =
45             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
46     private static final String _FAULT_CLASS =
47             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
48     private static final String _CM_CLASS =
49             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
50     private static final String _STNDDEFINED_FAULT_CLASS =
51             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPStndDefinedFaultVESMsgConsumer";
52     private static final String _PNFREG_DOMAIN = "pnfRegistration";
53     private static final String _FAULT_DOMAIN = "fault";
54     private static final String _CM_DOMAIN = "provisioning";
55     private static final String _STNDDEFINED_FAULT_DOMAIN = "stndDefinedFault";
56
57     boolean threadsRunning = false;
58     List<StrimziKafkaVESMsgConsumer> consumers = new LinkedList<>();
59     private PNFRegistrationConfig pnfRegistrationConfig;
60     private FaultConfig faultConfig;
61     private GeneralConfig generalConfig;
62     private ProvisioningConfig provisioningConfig;
63     private StndDefinedFaultConfig stndDefinedFaultConfig;
64     private StrimziKafkaConfig strimziKafkaConfig;
65
66     public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
67         this.generalConfig = generalConfig;
68         configMap.forEach(this::initialize);
69     }
70
71     public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig,
72             StrimziKafkaConfig strimziKafkaConfig) {
73         this.generalConfig = generalConfig;
74         this.strimziKafkaConfig = strimziKafkaConfig;
75         configMap.forEach(this::initialize);
76     }
77
78     public void initialize(String domain, MessageConfig domainConfig) {
79         LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
80         String consumerClass;
81         Properties consumerProperties = new Properties();
82         if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
83             this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
84             consumerClass = _PNFREG_CLASS;
85             LOG.debug("Consumer class = {}", consumerClass);
86
87             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
88                     pnfRegistrationConfig.getConsumerGroup());
89             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
90                     pnfRegistrationConfig.getConsumerId());
91             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
92             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
93                     pnfRegistrationConfig.getTimeout());
94             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
95             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
96                     pnfRegistrationConfig.getFetchPause());
97
98             threadsRunning =
99                     createConsumer(_PNFREG_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
100         } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
101             this.faultConfig = (FaultConfig) domainConfig;
102             consumerClass = _FAULT_CLASS;
103             LOG.debug("Consumer class = {}", consumerClass);
104             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
105             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
106             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
107             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
108             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
109             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
110
111             threadsRunning =
112                     createConsumer(_FAULT_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
113         } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
114             this.provisioningConfig = (ProvisioningConfig) domainConfig;
115             consumerClass = _CM_CLASS;
116             LOG.debug("Consumer class = {}", consumerClass);
117             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
118                     provisioningConfig.getConsumerGroup());
119             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
120             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
121             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
122             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
123             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
124                     provisioningConfig.getFetchPause());
125
126             threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
127         } else if (domain.equalsIgnoreCase(_STNDDEFINED_FAULT_DOMAIN)) {
128             this.stndDefinedFaultConfig = (StndDefinedFaultConfig) domainConfig;
129             consumerClass = _STNDDEFINED_FAULT_CLASS;
130             LOG.debug("Consumer class = {}", consumerClass);
131             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_GROUP,
132                     stndDefinedFaultConfig.getConsumerGroup());
133             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_ID,
134                     stndDefinedFaultConfig.getConsumerId());
135             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TOPIC,
136                     stndDefinedFaultConfig.getTopic());
137             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
138                     stndDefinedFaultConfig.getTimeout());
139             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_LIMIT,
140                     stndDefinedFaultConfig.getLimit());
141             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
142                     stndDefinedFaultConfig.getFetchPause());
143
144             threadsRunning = createConsumer(_STNDDEFINED_FAULT_DOMAIN, consumerProperties,
145                     getStrimziKafkaProps(strimziKafkaConfig));
146         }
147     }
148
149     private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
150         if (strimziKafkaProperties.size() == 0) {
151             strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers());
152             strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol());
153             strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism());
154             strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig());
155         }
156         return strimziKafkaProperties;
157     }
158
159     private boolean updateThreadState(List<StrimziKafkaVESMsgConsumer> consumers) {
160         boolean threadsRunning = false;
161         for (StrimziKafkaVESMsgConsumer consumer : consumers) {
162             if (consumer.isRunning()) {
163                 threadsRunning = true;
164             }
165         }
166         return threadsRunning;
167     }
168
169     public boolean createConsumer(String consumerType, Properties consumerProperties, Properties strimziKafkaProps) {
170         StrimziKafkaVESMsgConsumerImpl consumer = null;
171
172         if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
173             consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig);
174         else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
175             consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig);
176         else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
177             consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig);
178         else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
179             consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig);
180
181         handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
182         return !consumers.isEmpty();
183     }
184
185     private boolean handleConsumer(StrimziKafkaVESMsgConsumer consumer, Properties consumerProperties,
186             Properties strimziKafkaProps, List<StrimziKafkaVESMsgConsumer> consumers) {
187         if (consumer != null) {
188             consumer.init(strimziKafkaProps, consumerProperties);
189
190             if (consumer.isReady()) {
191                 Thread consumerThread = new Thread(consumer);
192                 consumerThread.start();
193                 consumers.add(consumer);
194
195                 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), consumerProperties);
196                 return true;
197             } else {
198                 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
199             }
200         }
201         return false;
202     }
203
204     @Override
205     public void run() {
206         while (threadsRunning) {
207             threadsRunning = updateThreadState(consumers);
208             if (!threadsRunning) {
209                 break;
210             }
211
212             try {
213                 Thread.sleep(10000);
214             } catch (InterruptedException e) {
215                 LOG.error(e.getLocalizedMessage(), e);
216                 Thread.currentThread().interrupt();
217             }
218         }
219
220         LOG.info("No listener threads running - exiting");
221     }
222
223     public List<StrimziKafkaVESMsgConsumer> getConsumers() {
224         return consumers;
225     }
226
227 }