2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
 
   6  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
 
   7  * =================================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   9  * in compliance with the License. You may obtain a copy of the License at
 
  11  * http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  14  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  15  * or implied. See the License for the specific language governing permissions and limitations under
 
  17  * ============LICENSE_END==========================================================================
 
  20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
 
  22 import java.util.LinkedList;
 
  23 import java.util.List;
 
  25 import java.util.Properties;
 
  26 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
 
  27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 
  28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
 
  29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
 
  30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
 
  31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
 
  32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
 
  33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
 
  34 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.fault.StrimziKafkaFaultVESMsgConsumer;
 
  35 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.pnfreg.StrimziKafkaPNFRegVESMsgConsumer;
 
  36 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.stnddefined.StrimziKafkaStndDefinedFaultVESMsgConsumer;
 
  37 import org.slf4j.Logger;
 
  38 import org.slf4j.LoggerFactory;
 
  40 public class StrimziKafkaVESMsgConsumerMain implements Runnable {
 
  42     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerMain.class);
 
  43     Properties strimziKafkaProperties = new Properties();
 
  44     private static final String _PNFREG_CLASS =
 
  45             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
 
  46     private static final String _FAULT_CLASS =
 
  47             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
 
  48     private static final String _CM_CLASS =
 
  49             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPCMVESMsgConsumer";
 
  50     private static final String _STNDDEFINED_FAULT_CLASS =
 
  51             "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPStndDefinedFaultVESMsgConsumer";
 
  52     private static final String _PNFREG_DOMAIN = "pnfRegistration";
 
  53     private static final String _FAULT_DOMAIN = "fault";
 
  54     private static final String _CM_DOMAIN = "provisioning";
 
  55     private static final String _STNDDEFINED_FAULT_DOMAIN = "stndDefinedFault";
 
  57     boolean threadsRunning = false;
 
  58     List<StrimziKafkaVESMsgConsumer> consumers = new LinkedList<>();
 
  59     private PNFRegistrationConfig pnfRegistrationConfig;
 
  60     private FaultConfig faultConfig;
 
  61     private GeneralConfig generalConfig;
 
  62     private ProvisioningConfig provisioningConfig;
 
  63     private StndDefinedFaultConfig stndDefinedFaultConfig;
 
  64     private StrimziKafkaConfig strimziKafkaConfig;
 
  66     public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
 
  67         this.generalConfig = generalConfig;
 
  68         configMap.forEach(this::initialize);
 
  71     public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig,
 
  72             StrimziKafkaConfig strimziKafkaConfig) {
 
  73         this.generalConfig = generalConfig;
 
  74         this.strimziKafkaConfig = strimziKafkaConfig;
 
  75         configMap.forEach(this::initialize);
 
  78     public void initialize(String domain, MessageConfig domainConfig) {
 
  79         LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
 
  81         Properties consumerProperties = new Properties();
 
  82         if (domain.equalsIgnoreCase(_PNFREG_DOMAIN)) {
 
  83             this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
 
  84             consumerClass = _PNFREG_CLASS;
 
  85             LOG.debug("Consumer class = {}", consumerClass);
 
  87             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
 
  88                     pnfRegistrationConfig.getConsumerGroup());
 
  89             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
 
  90                     pnfRegistrationConfig.getConsumerId());
 
  91             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
 
  92             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
 
  93                     pnfRegistrationConfig.getTimeout());
 
  94             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
 
  95             consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
 
  96                     pnfRegistrationConfig.getFetchPause());
 
  99                     createConsumer(_PNFREG_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
 
 100         } else if (domain.equalsIgnoreCase(_FAULT_DOMAIN)) {
 
 101             this.faultConfig = (FaultConfig) domainConfig;
 
 102             consumerClass = _FAULT_CLASS;
 
 103             LOG.debug("Consumer class = {}", consumerClass);
 
 104             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
 
 105             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
 
 106             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
 
 107             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
 
 108             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
 
 109             consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
 
 112                     createConsumer(_FAULT_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
 
 113         } else if (domain.equalsIgnoreCase(_CM_DOMAIN)) {
 
 114             this.provisioningConfig = (ProvisioningConfig) domainConfig;
 
 115             consumerClass = _CM_CLASS;
 
 116             LOG.debug("Consumer class = {}", consumerClass);
 
 117             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_GROUP,
 
 118                     provisioningConfig.getConsumerGroup());
 
 119             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_ID, provisioningConfig.getConsumerId());
 
 120             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TOPIC, provisioningConfig.getTopic());
 
 121             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, provisioningConfig.getTimeout());
 
 122             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_LIMIT, provisioningConfig.getLimit());
 
 123             consumerProperties.put(ProvisioningConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
 
 124                     provisioningConfig.getFetchPause());
 
 126             threadsRunning = createConsumer(_CM_DOMAIN, consumerProperties, getStrimziKafkaProps(strimziKafkaConfig));
 
 127         } else if (domain.equalsIgnoreCase(_STNDDEFINED_FAULT_DOMAIN)) {
 
 128             this.stndDefinedFaultConfig = (StndDefinedFaultConfig) domainConfig;
 
 129             consumerClass = _STNDDEFINED_FAULT_CLASS;
 
 130             LOG.debug("Consumer class = {}", consumerClass);
 
 131             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_GROUP,
 
 132                     stndDefinedFaultConfig.getConsumerGroup());
 
 133             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_ID,
 
 134                     stndDefinedFaultConfig.getConsumerId());
 
 135             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TOPIC,
 
 136                     stndDefinedFaultConfig.getTopic());
 
 137             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
 
 138                     stndDefinedFaultConfig.getTimeout());
 
 139             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_LIMIT,
 
 140                     stndDefinedFaultConfig.getLimit());
 
 141             consumerProperties.put(StndDefinedFaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
 
 142                     stndDefinedFaultConfig.getFetchPause());
 
 144             threadsRunning = createConsumer(_STNDDEFINED_FAULT_DOMAIN, consumerProperties,
 
 145                     getStrimziKafkaProps(strimziKafkaConfig));
 
 149     private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
 
 150         if (strimziKafkaProperties.size() == 0) {
 
 151             strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers());
 
 152             strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol());
 
 153             strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism());
 
 154             strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig());
 
 156         return strimziKafkaProperties;
 
 159     private boolean updateThreadState(List<StrimziKafkaVESMsgConsumer> consumers) {
 
 160         boolean threadsRunning = false;
 
 161         for (StrimziKafkaVESMsgConsumer consumer : consumers) {
 
 162             if (consumer.isRunning()) {
 
 163                 threadsRunning = true;
 
 166         return threadsRunning;
 
 169     public boolean createConsumer(String consumerType, Properties consumerProperties, Properties strimziKafkaProps) {
 
 170         StrimziKafkaVESMsgConsumerImpl consumer = null;
 
 172         if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
 
 173             consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig);
 
 174         else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
 
 175             consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig);
 
 176         else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
 
 177             consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig);
 
 178         else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
 
 179             consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig);
 
 181         handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
 
 182         return !consumers.isEmpty();
 
 185     private boolean handleConsumer(StrimziKafkaVESMsgConsumer consumer, Properties consumerProperties,
 
 186             Properties strimziKafkaProps, List<StrimziKafkaVESMsgConsumer> consumers) {
 
 187         if (consumer != null) {
 
 188             consumer.init(strimziKafkaProps, consumerProperties);
 
 190             if (consumer.isReady()) {
 
 191                 Thread consumerThread = new Thread(consumer);
 
 192                 consumerThread.start();
 
 193                 consumers.add(consumer);
 
 195                 LOG.info("Started consumer thread ({} : {})", consumer.getClass().getSimpleName(), consumerProperties);
 
 198                 LOG.debug("Consumer {} is not ready", consumer.getClass().getSimpleName());
 
 206         while (threadsRunning) {
 
 207             threadsRunning = updateThreadState(consumers);
 
 208             if (!threadsRunning) {
 
 214             } catch (InterruptedException e) {
 
 215                 LOG.error(e.getLocalizedMessage(), e);
 
 216                 Thread.currentThread().interrupt();
 
 220         LOG.info("No listener threads running - exiting");
 
 223     public List<StrimziKafkaVESMsgConsumer> getConsumers() {