* ONAP : ccsdk feature sdnr wt
* =================================================================================================
* Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
* =================================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final String APPLICATION_NAME = "mountpoint-registrar";
private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
- private Thread dmaapVESMsgConsumerMain = null;
+ private Thread sKafkaVESMsgConsumerMain = null;
private GeneralConfig generalConfig;
- private boolean dmaapEnabled = false;
- private Map<String, Configuration> configMap = new HashMap<>();
- private DMaaPVESMsgConsumerMain dmaapConsumerMain = null;
+ private boolean strimziEnabled = false;
+ private Map<String, MessageConfig> configMap = new HashMap<>();
+ private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null;
+ private StrimziKafkaConfig strimziKafkaConfig;
// Blueprint 1
public MountpointRegistrarImpl() {
configFileRepresentation.registerConfigChangedListener(this);
generalConfig = new GeneralConfig(configFileRepresentation);
+ strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation);
PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
+ ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
+ StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation);
configMap.put("pnfRegistration", pnfRegConfig);
configMap.put("fault", faultConfig);
-
- dmaapEnabled = generalConfig.getEnabled();
- if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true
- LOG.info("DMaaP seems to be enabled, starting consumer(s)");
- dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
- dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain);
- dmaapVESMsgConsumerMain.start();
+ configMap.put("provisioning", provisioningConfig);
+ configMap.put("stndDefinedFault", stndFaultConfig);
+
+ strimziEnabled = strimziKafkaConfig.getEnabled();
+ if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true
+ LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)");
+ sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
+ sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
+ sKafkaVESMsgConsumerMain.start();
} else {
- LOG.info("DMaaP seems to be disabled, not starting any consumer(s)");
+ LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)");
}
}
@Override
public void onConfigChanged() {
- LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
- boolean dmaapEnabledNewVal = generalConfig.getEnabled();
- if (!dmaapEnabled && dmaapEnabledNewVal) { // Dmaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
- LOG.info("DMaaP is enabled, starting consumer(s)");
- dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig);
- dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain);
- dmaapVESMsgConsumerMain.start();
- } else if (dmaapEnabled && !dmaapEnabledNewVal) { // Dmaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
- LOG.info("DMaaP is disabled, stopping consumer(s)");
- List<DMaaPVESMsgConsumer> consumers = dmaapConsumerMain.getConsumers();
- for (DMaaPVESMsgConsumer consumer : consumers) {
+ if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
+ LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig");
+ return;
+ }
+ if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs
+ LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig");
+ return;
+ }
+ LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
+ boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled();
+ if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
+ LOG.info("Strimzi Kafka is enabled, starting consumer(s)");
+ sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
+ sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
+ sKafkaVESMsgConsumerMain.start();
+ } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
+ LOG.info("Strimzi Kafka is disabled, stopping consumer(s)");
+ List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers();
+ for (StrimziKafkaVESMsgConsumer consumer : consumers) {
// stop all consumers
consumer.stopConsumer();
}
}
- dmaapEnabled = dmaapEnabledNewVal;
+ strimziEnabled = strimziEnabledNewVal;
}
@Override
}
}
}
-
-
}