add periodic fetch configuration
[dcaegen2/analytics/tca-gen2.git] / dcae-analytics / dcae-analytics-web / src / main / java / org / onap / dcae / analytics / web / spring / ConfigBindingServiceEnvironmentPostProcessor.java
index 2073127..59fca64 100644 (file)
@@ -19,6 +19,7 @@
 
 package org.onap.dcae.analytics.web.spring;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -29,8 +30,16 @@ import java.util.stream.Collectors;
 import org.onap.dcae.analytics.model.AnalyticsProfile;
 import org.onap.dcae.analytics.model.configbindingservice.ConfigBindingServiceConstants;
 import org.onap.dcae.analytics.model.util.function.JsonStringToMapFunction;
-import org.onap.dcae.analytics.model.util.supplier.ConfigBindingServiceJsonSupplier;
+import org.onap.dcae.analytics.web.config.SystemConfig;
 import org.onap.dcae.analytics.web.exception.AnalyticsValidationException;
+import org.onap.dcae.analytics.web.exception.EnvironmentLoaderException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
@@ -44,6 +53,13 @@ import org.springframework.core.env.StandardEnvironment;
 import org.springframework.util.ClassUtils;
 import org.springframework.web.context.support.StandardServletEnvironment;
 
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 /**
  * A custom spring framework environment post processor which can fetch and populate spring context with
  * Config Binding Service application properties.
@@ -60,63 +76,33 @@ public class ConfigBindingServiceEnvironmentPostProcessor implements Environment
 
     private static final int DEFAULT_ORDER = Ordered.HIGHEST_PRECEDENCE;
 
+    private Disposable refreshConfigTask = null;
+
+    private ConfigurableEnvironment env = null;
+
+    private Map<String, Object> filterKeyMap = null;
+
+    private String configServicePropertiesKey =
+            ConfigBindingServiceConstants.CONFIG_BINDING_SERVICE_PROPERTIES_KEY;
+
     @Override
-    public void postProcessEnvironment(final ConfigurableEnvironment environment, final SpringApplication application) {
+    public void postProcessEnvironment(final ConfigurableEnvironment environment,
+            final SpringApplication application) {
 
         final boolean isConfigServiceProfilePresent = Arrays.stream(environment.getActiveProfiles())
                 .anyMatch(p -> p.equalsIgnoreCase(AnalyticsProfile.CONFIG_BINDING_SERVICE_PROFILE_NAME));
 
         if (!isConfigServiceProfilePresent) {
-            logger.info("Config Binding Service Profile is not active. " +
-                    "Skipping Adding config binding service properties");
+            logger.info("Config Binding Service Profile is not active. "
+                    "Skipping Adding config binding service properties");
             return;
         }
 
-        logger.info("Config Binding Service Profile is active. " +
-                "Application properties will be fetched from config binding service");
-
-        // Fetch config binding service json
-        final Optional<String> configServiceJsonOptional = new ConfigBindingServiceJsonSupplier().get();
+        logger.info("Config Binding Service Profile is active. "
+                + "Application properties will be fetched from config binding service");
 
-        if (!configServiceJsonOptional.isPresent()) {
-            final String errorMessage = "Unable to get fetch application configuration from config binding service";
-            throw new AnalyticsValidationException(errorMessage, new IllegalStateException(errorMessage));
-        }
-
-        final String configServicePropertiesKey = ConfigBindingServiceConstants.CONFIG_BINDING_SERVICE_PROPERTIES_KEY;
-
-        // convert fetch config binding service json string to Map of property key and values
-        final Map<String, Object> configPropertiesMap = configServiceJsonOptional
-                .map(new JsonStringToMapFunction(configServicePropertiesKey))
-                .orElse(Collections.emptyMap());
-
-        if (configPropertiesMap.isEmpty()) {
-
-            logger.warn("No properties found in config binding service");
-
-        } else {
-
-            // remove config service key prefix on spring reserved property key prefixes
-            final Set<String> springKeyPrefixes = ConfigBindingServiceConstants.SPRING_RESERVED_PROPERTIES_KEY_PREFIXES;
-            final Set<String> springKeys = springKeyPrefixes.stream()
-                    .map(springKeyPrefix -> configServicePropertiesKey + "." + springKeyPrefix)
-                    .collect(Collectors.toSet());
-
-            final Map<String, Object> filterKeyMap = configPropertiesMap.entrySet()
-                    .stream()
-                    .collect(Collectors.toMap(
-                            (Map.Entry<String, Object> e) ->
-                                    springKeys.stream().anyMatch(springKey -> e.getKey().startsWith(springKey)) ?
-                                            e.getKey().substring(configServicePropertiesKey.toCharArray().length + 1) :
-                                            e.getKey(),
-                            Map.Entry::getValue)
-                    );
-
-            filterKeyMap.forEach((key, value) ->
-                    logger.info("Adding property from config service in spring context: {} -> {}", key, value));
-
-            addJsonPropertySource(environment, new MapPropertySource(configServicePropertiesKey, filterKeyMap));
-        }
+        env = environment;
+        initialize();
 
     }
 
@@ -125,9 +111,8 @@ public class ConfigBindingServiceEnvironmentPostProcessor implements Environment
         return DEFAULT_ORDER;
     }
 
-
-    private void addJsonPropertySource(final ConfigurableEnvironment environment, final PropertySource<?> source) {
-        final MutablePropertySources sources = environment.getPropertySources();
+    public synchronized void addJsonPropertySource(final MutablePropertySources sources,
+            final PropertySource<?> source) {
         final String name = findPropertySource(sources);
         if (sources.contains(name)) {
             sources.addBefore(name, source);
@@ -137,12 +122,164 @@ public class ConfigBindingServiceEnvironmentPostProcessor implements Environment
     }
 
     private String findPropertySource(final MutablePropertySources sources) {
-        if (ClassUtils.isPresent(SERVLET_ENVIRONMENT_CLASS, null) &&
-                sources.contains(StandardServletEnvironment.JNDI_PROPERTY_SOURCE_NAME)) {
+        if (ClassUtils.isPresent(SERVLET_ENVIRONMENT_CLASS, null)
+                && sources.contains(StandardServletEnvironment.JNDI_PROPERTY_SOURCE_NAME)) {
             return StandardServletEnvironment.JNDI_PROPERTY_SOURCE_NAME;
 
         }
         return StandardEnvironment.SYSTEM_PROPERTIES_PROPERTY_SOURCE_NAME;
     }
 
+    /**
+     *
+     * Fetch the configuration.
+     *
+     */
+    public void initialize() {
+        stop();
+
+        refreshConfigTask = createRefreshTask() //
+                .subscribe(e -> logger.info("Refreshed configuration data"),
+                        throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
+                        () -> logger.error("Configuration refresh terminated"));
+    }
+
+    /**
+     * Fetch the configuration task from CBS.
+     *
+     */
+    private Flux<String> createRefreshTask() {
+        return readEnvironmentVariables() //
+                .flatMap(this::createCbsClient) //
+                .flatMapMany(this::periodicConfigurationUpdates) //
+                .map(this::parseTcaConfig) //
+                .onErrorResume(this::onErrorResume);
+    }
+
+    /**
+     * periodicConfigurationUpdates.
+     *
+     * @param cbsClient cbsClient
+     * @return configuration refreshed
+     *
+     */
+    public Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
+        final Duration initialDelay = Duration.ZERO;
+        final Duration refreshPeriod =
+                 Duration.ofMinutes(ConfigBindingServiceConstants.CONFIG_SERVICE_REFRESHPERIOD);
+        final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+        return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
+    }
+
+    /**
+     *
+     * get environment variables.
+     *
+     * @return environment properties.
+     *
+     */
+    public Mono<EnvProperties> readEnvironmentVariables() {
+        logger.trace("Loading configuration from system environment variables");
+        EnvProperties envProperties;
+        try {
+            envProperties = ImmutableEnvProperties.builder() //
+                    .consulHost(SystemConfig.getConsulHost()) //
+                    .consulPort(SystemConfig.getConsultPort()) //
+                    .cbsName(SystemConfig.getConfigBindingService()) //
+                    .appName(SystemConfig.getService()) //
+                    .build();
+        } catch (EnvironmentLoaderException e) {
+            return Mono.error(e);
+        }
+        logger.trace("Evaluated environment system variables {}", envProperties);
+        return Mono.just(envProperties);
+    }
+
+    /**
+     * Stops the refreshing of the configuration.
+     *
+     */
+    public void stop() {
+        if (refreshConfigTask != null) {
+            refreshConfigTask.dispose();
+            refreshConfigTask = null;
+        }
+    }
+
+    /**
+     * periodicConfigurationUpdates.
+     *
+     * @param throwable throwable
+     * @return Mono
+     *
+     */
+    private <R> Mono<R> onErrorResume(Throwable throwable) {
+        logger.error("Could not refresh application configuration {}", throwable.toString());
+        return Mono.empty();
+    }
+
+    /**
+     * create CbsClient.
+     *
+     * @param env environment properties
+     * @return cbsclient
+     *
+     */
+    public Mono<CbsClient> createCbsClient(EnvProperties env) {
+        return CbsClientFactory.createCbsClient(env);
+    }
+
+    /**
+     * Parse configuration.
+     *
+     * @param jsonObject the TCA service's configuration
+     * @return this
+     *
+     */
+    public String parseTcaConfig(JsonObject jsonObject) {
+
+        JsonElement jsonConfig = jsonObject.get(ConfigBindingServiceConstants.CONFIG);
+
+        Optional<String> configServiceJsonOptional = Optional.of(jsonConfig.toString());
+        if (!configServiceJsonOptional.isPresent()) {
+            final String errorMessage =
+                    "Unable to get fetch application configuration from config binding service";
+            throw new AnalyticsValidationException(errorMessage,
+                    new IllegalStateException(errorMessage));
+        }
+
+        // convert fetch config binding service json string to Map of property key and
+        // values
+        Map<String, Object> configPropertiesMap = configServiceJsonOptional
+                .map(new JsonStringToMapFunction(configServicePropertiesKey)).orElse(Collections.emptyMap());
+
+        if (configPropertiesMap.isEmpty()) {
+
+            logger.warn("No properties found in config binding service");
+
+        } else {
+
+            // remove config service key prefix on spring reserved property key prefixes
+            final Set<String> springKeyPrefixes =
+                    ConfigBindingServiceConstants.SPRING_RESERVED_PROPERTIES_KEY_PREFIXES;
+            final Set<String> springKeys = springKeyPrefixes.stream()
+                    .map(springKeyPrefix -> configServicePropertiesKey + "." + springKeyPrefix)
+                    .collect(Collectors.toSet());
+
+            filterKeyMap = configPropertiesMap.entrySet().stream()
+                    .collect(Collectors.toMap((Map.Entry<String, Object> e) -> springKeys.stream()
+                            .anyMatch(springKey -> e.getKey().startsWith(springKey))
+                                    ? e.getKey().substring(configServicePropertiesKey.toCharArray().length + 1)
+                                    : e.getKey(),
+                            Map.Entry::getValue));
+
+            filterKeyMap.forEach((key, value) -> logger
+                    .info("Adding property from config service in spring context: {} -> {}", key, value));
+            MutablePropertySources sources = env.getPropertySources();
+            addJsonPropertySource(sources, new MapPropertySource(configServicePropertiesKey, filterKeyMap));
+
+        }
+        return configServiceJsonOptional.get();
+    }
+
 }