2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7 * =================================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9 * in compliance with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software distributed under the License
14 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15 * or implied. See the License for the specific language governing permissions and limitations under
17 * ============LICENSE_END==========================================================================
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
22 import java.util.HashMap;
23 import java.util.List;
25 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
26 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedListener {
39 private static final Logger LOG = LoggerFactory.getLogger(MountpointRegistrarImpl.class);
40 private static final String APPLICATION_NAME = "mountpoint-registrar";
41 private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
43 private Thread sKafkaVESMsgConsumerMain = null;
45 private GeneralConfig generalConfig;
46 private boolean strimziEnabled = false;
47 private Map<String, MessageConfig> configMap = new HashMap<>();
48 private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null;
49 private StrimziKafkaConfig strimziKafkaConfig;
52 public MountpointRegistrarImpl() {
53 LOG.info("Creating provider class for {}", APPLICATION_NAME);
57 LOG.info("Init call for {}", APPLICATION_NAME);
59 ConfigurationFileRepresentation configFileRepresentation =
60 new ConfigurationFileRepresentation(CONFIGURATIONFILE);
61 configFileRepresentation.registerConfigChangedListener(this);
63 generalConfig = new GeneralConfig(configFileRepresentation);
64 strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation);
65 PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
66 FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
67 ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
68 StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation);
70 configMap.put("pnfRegistration", pnfRegConfig);
71 configMap.put("fault", faultConfig);
72 configMap.put("provisioning", provisioningConfig);
73 configMap.put("stndDefinedFault", stndFaultConfig);
75 strimziEnabled = strimziKafkaConfig.getEnabled();
76 if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true
77 LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)");
78 sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
79 sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
80 sKafkaVESMsgConsumerMain.start();
82 LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)");
87 * Reflect status for Unit Tests
89 * @return Text with status
91 public String isInitializationOk() {
92 return "No implemented";
96 public void onConfigChanged() {
97 if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
98 LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig");
101 if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs
102 LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig");
105 LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
106 boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled();
107 if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
108 LOG.info("Strimzi Kafka is enabled, starting consumer(s)");
109 sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
110 sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
111 sKafkaVESMsgConsumerMain.start();
112 } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
113 LOG.info("Strimzi Kafka is disabled, stopping consumer(s)");
114 List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers();
115 for (StrimziKafkaVESMsgConsumer consumer : consumers) {
116 // stop all consumers
117 consumer.stopConsumer();
120 strimziEnabled = strimziEnabledNewVal;
124 public void close() throws Exception {
125 LOG.info("{} closing ...", this.getClass().getName());
126 LOG.info("{} closing done", APPLICATION_NAME);
130 * Used to close all Services, that should support AutoCloseable Pattern
135 @SuppressWarnings("unused")
136 private void close(AutoCloseable... toCloseList) throws Exception {
137 for (AutoCloseable element : toCloseList) {
138 if (element != null) {