Merge "fix oauth code"
[ccsdk/features.git] / sdnr / wt / mountpoint-registrar / provider / src / main / java / org / onap / ccsdk / features / sdnr / wt / mountpointregistrar / impl / MountpointRegistrarImpl.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved.
7  * =================================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
9  * in compliance with the License. You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software distributed under the License
14  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
15  * or implied. See the License for the specific language governing permissions and limitations under
16  * the License.
17  * ============LICENSE_END==========================================================================
18  */
19
20 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
26 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
27 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
28 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
29 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
30 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.PNFRegistrationConfig;
31 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.ProvisioningConfig;
32 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StndDefinedFaultConfig;
33 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedListener {
38
39     private static final Logger LOG = LoggerFactory.getLogger(MountpointRegistrarImpl.class);
40     private static final String APPLICATION_NAME = "mountpoint-registrar";
41     private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
42
43     private Thread sKafkaVESMsgConsumerMain = null;
44
45     private GeneralConfig generalConfig;
46     private boolean strimziEnabled = false;
47     private Map<String, MessageConfig> configMap = new HashMap<>();
48     private StrimziKafkaVESMsgConsumerMain sKafkaConsumerMain = null;
49     private StrimziKafkaConfig strimziKafkaConfig;
50
51     // Blueprint 1
52     public MountpointRegistrarImpl() {
53         LOG.info("Creating provider class for {}", APPLICATION_NAME);
54     }
55
56     public void init() {
57         LOG.info("Init call for {}", APPLICATION_NAME);
58
59         ConfigurationFileRepresentation configFileRepresentation =
60                 new ConfigurationFileRepresentation(CONFIGURATIONFILE);
61         configFileRepresentation.registerConfigChangedListener(this);
62
63         generalConfig = new GeneralConfig(configFileRepresentation);
64         strimziKafkaConfig = new StrimziKafkaConfig(configFileRepresentation);
65         PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
66         FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
67         ProvisioningConfig provisioningConfig = new ProvisioningConfig(configFileRepresentation);
68         StndDefinedFaultConfig stndFaultConfig = new StndDefinedFaultConfig(configFileRepresentation);
69
70         configMap.put("pnfRegistration", pnfRegConfig);
71         configMap.put("fault", faultConfig);
72         configMap.put("provisioning", provisioningConfig);
73         configMap.put("stndDefinedFault", stndFaultConfig);
74
75         strimziEnabled = strimziKafkaConfig.getEnabled();
76         if (strimziEnabled) { // start Kafka consumer thread only if strimziEnabled=true
77             LOG.info("Strimzi Kafka seems to be enabled, starting consumer(s)");
78             sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
79             sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
80             sKafkaVESMsgConsumerMain.start();
81         } else {
82             LOG.info("Strimzi Kafka seems to be disabled, not starting any consumer(s)");
83         }
84     }
85
86     /**
87      * Reflect status for Unit Tests
88      *
89      * @return Text with status
90      */
91     public String isInitializationOk() {
92         return "No implemented";
93     }
94
95     @Override
96     public void onConfigChanged() {
97         if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
98             LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig");
99             return;
100         }
101         if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs
102             LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig");
103             return;
104         }
105         LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
106         boolean strimziEnabledNewVal = strimziKafkaConfig.getEnabled();
107         if (!strimziEnabled && strimziEnabledNewVal) { // Strimzi kafka disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
108             LOG.info("Strimzi Kafka is enabled, starting consumer(s)");
109             sKafkaConsumerMain = new StrimziKafkaVESMsgConsumerMain(configMap, generalConfig, strimziKafkaConfig);
110             sKafkaVESMsgConsumerMain = new Thread(sKafkaConsumerMain);
111             sKafkaVESMsgConsumerMain.start();
112         } else if (strimziEnabled && !strimziEnabledNewVal) { // Strimzi kafka enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
113             LOG.info("Strimzi Kafka is disabled, stopping consumer(s)");
114             List<StrimziKafkaVESMsgConsumer> consumers = sKafkaConsumerMain.getConsumers();
115             for (StrimziKafkaVESMsgConsumer consumer : consumers) {
116                 // stop all consumers
117                 consumer.stopConsumer();
118             }
119         }
120         strimziEnabled = strimziEnabledNewVal;
121     }
122
123     @Override
124     public void close() throws Exception {
125         LOG.info("{} closing ...", this.getClass().getName());
126         LOG.info("{} closing done", APPLICATION_NAME);
127     }
128
129     /**
130      * Used to close all Services, that should support AutoCloseable Pattern
131      *
132      * @param toClose
133      * @throws Exception
134      */
135     @SuppressWarnings("unused")
136     private void close(AutoCloseable... toCloseList) throws Exception {
137         for (AutoCloseable element : toCloseList) {
138             if (element != null) {
139                 element.close();
140             }
141         }
142     }
143 }