32d68ee62198a5b55db87a6cfeb01e6f340d564b
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7  * =================================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9  * in compliance with the License. You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the License
14  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15  * or implied. See the License for the specific language governing permissions and limitations under
16  * the License.
17  * ============LICENSE_END==========================================================================
18  */
19
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
26 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedListener {
38
39     private static final Logger LOG = LoggerFactory.getLogger(MountpointRegistrarImpl.class);
40     private static final String APPLICATION_NAME = "mountpoint-registrar";
41     private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
42
43     private Thread sKafkaVESMsgConsumerMain = null;
44
45     private GeneralConfig generalConfig;
46     private boolean strimziEnabled = false;
47     private Map<String, MessageConfig> configMap = new HashMap<>();
48     private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null;
49     private StrimziKafkaConfig strimziKafkaConfig;
50
51     // Blueprint 1
52     public MountpointRegistrarImpl() {
53         LOG.info("Creating provider class for {}", APPLICATION_NAME);
54     }
55
56     public void init() {
57         LOG.info("Init call for {}", APPLICATION_NAME);
58
59         ConfigurationFileRepresentation configFileRepresentation =
60                 new ConfigurationFileRepresentation(CONFIGURATIONFILE);
61         configFileRepresentation.registerConfigChangedListener(this);
62
63         generalConfig = new GeneralConfig(configFileRepresentation);
64         strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation);
65         PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
66         FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
67         ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
68         StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation);
69
70         configMap.put("pnfRegistration", pnfRegConfig);
71         configMap.put("fault", faultConfig);
72         configMap.put("provisioning", provisioningConfig);
73         configMap.put("stndDefinedFault", stndFaultConfig);
74
75         strimziEnabled = strimziKafkaConfig.getEnabled();
76         if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true
77             LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)");
78             sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
79             sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
80             sKafkaVESMsgConsumerMain.start();
81         } else {
82             LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)");
83         }
84     }
85
86     /**
87      * Reflect status for Unit Tests
88      *
89      * @return Text with status
90      */
91     public String isInitializationOk() {
92         return "No implemented";
93     }
94
95     @Override
96     public void onConfigChanged() {
97         if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
98             LOG.warn("onConfigChange cannot be handled. Unexpected Null");
99             return;
100         }
101         LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
102         boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled();
103         if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
104             LOG.info("Strimzi Kafka is enabled, starting consumer(s)");
105             sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
106             sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
107             sKafkaVESMsgConsumerMain.start();
108         } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
109             LOG.info("Strimzi Kafka is disabled, stopping consumer(s)");
110             List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers();
111             for (StrimziKafkaVESMsgConsumer consumer : consumers) {
112                 // stop all consumers
113                 consumer.stopConsumer();
114             }
115         }
116         strimziEnabled = strimziEnabledNewVal;
117     }
118
119     @Override
120     public void close() throws Exception {
121         LOG.info("{} closing ...", this.getClass().getName());
122         LOG.info("{} closing done", APPLICATION_NAME);
123     }
124
125     /**
126      * Used to close all Services, that should support AutoCloseable Pattern
127      *
128      * @param toClose
129      * @throws Exception
130      */
131     @SuppressWarnings("unused")
132     private void close(AutoCloseable... toCloseList) throws Exception {
133         for (AutoCloseable element : toCloseList) {
134             if (element != null) {
135                 element.close();
136             }
137         }
138     }
139 }