2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
 
   6  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
 
   7  * =================================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   9  * in compliance with the License. You may obtain a copy of the License at
 
  11  * http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  14  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  15  * or implied. See the License for the specific language governing permissions and limitations under
 
  17  * ============LICENSE_END==========================================================================
 
  20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
 
  22 import java.util.HashMap;
 
  23 import java.util.List;
 
  25 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
 
  26 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
 
  27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
 
  28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 
  29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
 
  30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
 
  31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
 
  32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
 
  33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
 
  34 import org.slf4j.Logger;
 
  35 import org.slf4j.LoggerFactory;
 
  37 public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedListener {
 
  39     private static final Logger LOG = LoggerFactory.getLogger(MountpointRegistrarImpl.class);
 
  40     private static final String APPLICATION_NAME = "mountpoint-registrar";
 
  41     private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
 
  43     private Thread sKafkaVESMsgConsumerMain = null;
 
  45     private GeneralConfig generalConfig;
 
  46     private boolean strimziEnabled = false;
 
  47     private Map<String, MessageConfig> configMap = new HashMap<>();
 
  48     private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null;
 
  49     private StrimziKafkaConfig strimziKafkaConfig;
 
  52     public MountpointRegistrarImpl() {
 
  53         LOG.info("Creating provider class for {}", APPLICATION_NAME);
 
  57         LOG.info("Init call for {}", APPLICATION_NAME);
 
  59         ConfigurationFileRepresentation configFileRepresentation =
 
  60                 new ConfigurationFileRepresentation(CONFIGURATIONFILE);
 
  61         configFileRepresentation.registerConfigChangedListener(this);
 
  63         generalConfig = new GeneralConfig(configFileRepresentation);
 
  64         strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation);
 
  65         PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
 
  66         FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
 
  67         ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
 
  68         StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation);
 
  70         configMap.put("pnfRegistration", pnfRegConfig);
 
  71         configMap.put("fault", faultConfig);
 
  72         configMap.put("provisioning", provisioningConfig);
 
  73         configMap.put("stndDefinedFault", stndFaultConfig);
 
  75         strimziEnabled = strimziKafkaConfig.getEnabled();
 
  76         if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true
 
  77             LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)");
 
  78             sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
 
  79             sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
 
  80             sKafkaVESMsgConsumerMain.start();
 
  82             LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)");
 
  87      * Reflect status for Unit Tests
 
  89      * @return Text with status
 
  91     public String isInitializationOk() {
 
  92         return "No implemented";
 
  96     public void onConfigChanged() {
 
  97         if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
 
  98             LOG.warn("onConfigChange cannot be handled. Unexpected Null");
 
 101         LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
 
 102         boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled();
 
 103         if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
 
 104             LOG.info("Strimzi Kafka is enabled, starting consumer(s)");
 
 105             sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
 
 106             sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
 
 107             sKafkaVESMsgConsumerMain.start();
 
 108         } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
 
 109             LOG.info("Strimzi Kafka is disabled, stopping consumer(s)");
 
 110             List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers();
 
 111             for (StrimziKafkaVESMsgConsumer consumer : consumers) {
 
 112                 // stop all consumers
 
 113                 consumer.stopConsumer();
 
 116         strimziEnabled = strimziEnabledNewVal;
 
 120     public void close() throws Exception {
 
 121         LOG.info("{} closing ...", this.getClass().getName());
 
 122         LOG.info("{} closing done", APPLICATION_NAME);
 
 126      * Used to close all Services, that should support AutoCloseable Pattern
 
 131     @SuppressWarnings("unused")
 
 132     private void close(AutoCloseable... toCloseList) throws Exception {
 
 133         for (AutoCloseable element : toCloseList) {
 
 134             if (element != null) {