X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=sdnr%2Fwt%2Fmountpoint-registrar%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fccsdk%2Ffeatures%2Fsdnr%2Fwt%2Fmountpointregistrar%2Fimpl%2FMountpointRegistrarImpl.java;h=deff2e336bef71d39876f565ffe1728277dc1e2e;hb=HEAD;hp=53454ac03c669d5dc277fb2e18350298e4da420e;hpb=4277231cf397c2fbc12d4f5ca16d51c0f657bb4f;p=ccsdk%2Ffeatures.git diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java index 53454ac03..deff2e336 100644 --- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java @@ -3,6 +3,7 @@ * ONAP : ccsdk feature sdnr wt * ================================================================================================= * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. * ================================================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,9 +22,15 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,12 +40,13 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis private static final String APPLICATION_NAME = "mountpoint-registrar"; private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties"; - private Thread dmaapVESMsgConsumerMain = null; + private Thread sKafkaVESMsgConsumerMain = null; private GeneralConfig generalConfig; - private boolean dmaapEnabled = false; - private Map configMap = new HashMap<>(); - private DMaaPVESMsgConsumerMain dmaapConsumerMain = null; + private boolean strimziEnabled = false; + private Map configMap = new HashMap<>(); + private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null; + private StrimziKafkaConfig strimziKafkaConfig; // Blueprint 1 public MountpointRegistrarImpl() { @@ -53,20 +61,25 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis configFileRepresentation.registerConfigChangedListener(this); generalConfig = new GeneralConfig(configFileRepresentation); + strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation); PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation); FaultConfig faultConfig = new FaultConfig(configFileRepresentation); + ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation); + StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation); configMap.put("pnfRegistration", pnfRegConfig); configMap.put("fault", faultConfig); - - dmaapEnabled = generalConfig.getEnabled(); - if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true - LOG.info("DMaaP seems to be enabled, starting consumer(s)"); - dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig); - dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain); - dmaapVESMsgConsumerMain.start(); + configMap.put("provisioning", provisioningConfig); + configMap.put("stndDefinedFault", stndFaultConfig); + + strimziEnabled = strimziKafkaConfig.getEnabled(); + if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true + LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)"); + sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig); + sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain); + sKafkaVESMsgConsumerMain.start(); } else { - LOG.info("DMaaP seems to be disabled, not starting any consumer(s)"); + LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)"); } } @@ -81,22 +94,30 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis @Override public void onConfigChanged() { - LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled()); - boolean dmaapEnabledNewVal = generalConfig.getEnabled(); - if (!dmaapEnabled && dmaapEnabledNewVal) { // Dmaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s) - LOG.info("DMaaP is enabled, starting consumer(s)"); - dmaapConsumerMain = new DMaaPVESMsgConsumerMain(configMap, generalConfig); - dmaapVESMsgConsumerMain = new Thread(dmaapConsumerMain); - dmaapVESMsgConsumerMain.start(); - } else if (dmaapEnabled && !dmaapEnabledNewVal) { // Dmaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s) - LOG.info("DMaaP is disabled, stopping consumer(s)"); - List consumers = dmaapConsumerMain.getConsumers(); - for (DMaaPVESMsgConsumer consumer : consumers) { + if (generalConfig == null) { // Included as NullPointerException observed once in docker logs + LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig"); + return; + } + if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs + LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig"); + return; + } + LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled()); + boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled(); + if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s) + LOG.info("Strimzi Kafka is enabled, starting consumer(s)"); + sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig); + sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain); + sKafkaVESMsgConsumerMain.start(); + } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s) + LOG.info("Strimzi Kafka is disabled, stopping consumer(s)"); + List consumers = sKafkaConsumerMain.getConsumers(); + for (StrimziKafkaVESMsgConsumer consumer : consumers) { // stop all consumers consumer.stopConsumer(); } } - dmaapEnabled = dmaapEnabledNewVal; + strimziEnabled = strimziEnabledNewVal; } @Override @@ -119,6 +140,4 @@ public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedLis } } } - - }