58081a893c078d2c2043b7de865add5e7d759955
[dcaegen2/collectors/datafile.git] /
1 /*-
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.configuration;
18
19 import com.google.gson.GsonBuilder;
20 import com.google.gson.JsonElement;
21 import com.google.gson.JsonObject;
22 import com.google.gson.JsonParser;
23 import com.google.gson.JsonSyntaxException;
24 import com.google.gson.TypeAdapterFactory;
25
26 import java.io.BufferedInputStream;
27 import java.io.FileInputStream;
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.InputStreamReader;
31 import java.time.Duration;
32 import java.util.Map;
33 import java.util.Properties;
34 import java.util.ServiceLoader;
35
36 import javax.validation.constraints.NotEmpty;
37 import javax.validation.constraints.NotNull;
38
39 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
40 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
41 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
42 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
43 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
44 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
46 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import org.springframework.beans.factory.annotation.Value;
50 import org.springframework.boot.context.properties.ConfigurationProperties;
51 import org.springframework.boot.context.properties.EnableConfigurationProperties;
52 import org.springframework.context.annotation.ComponentScan;
53 import org.springframework.stereotype.Component;
54
55 import reactor.core.Disposable;
56 import reactor.core.publisher.Flux;
57 import reactor.core.publisher.Mono;
58
59 /**
60  * Holds all configuration for the DFC.
61  *
62  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
63  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
64  */
65
66 @Component
67 @ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
68 @EnableConfigurationProperties
69 @ConfigurationProperties("app")
70 public class AppConfig {
71     private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
72
73     private ConsumerConfiguration dmaapConsumerConfiguration;
74     private Map<String, PublisherConfiguration> publishingConfigurations;
75     private FtpesConfig ftpesConfiguration;
76     @Value("#{systemEnvironment}")
77     Properties systemEnvironment;
78     private Disposable refreshConfigTask = null;
79
80     @NotEmpty
81     private String filepath;
82
83     public synchronized void setFilepath(String filepath) {
84         this.filepath = filepath;
85     }
86
87     /**
88      * Reads the cloud configuration.
89      */
90     public void initialize() {
91         stop();
92         Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
93         loadConfigurationFromFile();
94
95         refreshConfigTask = createRefreshTask(context) //
96             .subscribe(e -> logger.info("Refreshed configuration data"),
97                 throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
98                 () -> logger.error("Configuration refresh terminated"));
99     }
100
101     Flux<AppConfig> createRefreshTask(Map<String, String> context) {
102         return getEnvironment(systemEnvironment, context) //
103             .flatMap(this::createCbsClient) //
104             .flatMapMany(this::periodicConfigurationUpdates) //
105             .map(this::parseCloudConfig) //
106             .onErrorResume(this::onErrorResume);
107     }
108
109     private Flux<JsonObject> periodicConfigurationUpdates(CbsClient cbsClient) {
110         final Duration initialDelay = Duration.ZERO;
111         final Duration refreshPeriod = Duration.ofMinutes(1);
112         final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
113         return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
114     }
115
116     /**
117      * Stops the refreshing of the configuration.
118      */
119     public void stop() {
120         if (refreshConfigTask != null) {
121             refreshConfigTask.dispose();
122             refreshConfigTask = null;
123         }
124     }
125
126     public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() {
127         return dmaapConsumerConfiguration;
128     }
129
130     /**
131      * Checks if there is a configuration for the given feed.
132      *
133      * @param changeIdentifier the change identifier the feed is configured to belong to.
134      *
135      * @return true if a feed is configured for the given change identifier, false if not.
136      */
137     public synchronized boolean isFeedConfigured(String changeIdentifier) {
138         return publishingConfigurations.containsKey(changeIdentifier);
139     }
140
141     /**
142      * Gets the feed configuration for the given change identifier.
143      *
144      * @param changeIdentifier the change identifier the feed is configured to belong to.
145      * @return the <code>PublisherConfiguration</code> for the feed belonging to the given change identifier.
146      *
147      * @throws DatafileTaskException if no configuration has been loaded or the configuration is missing for the given
148      *         change identifier.
149      */
150     public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
151         throws DatafileTaskException {
152
153         if (publishingConfigurations == null) {
154             throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
155         }
156         PublisherConfiguration cfg = publishingConfigurations.get(changeIdentifier);
157         if (cfg == null) {
158             throw new DatafileTaskException(
159                 "Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
160         }
161         return cfg;
162     }
163
164     public synchronized FtpesConfig getFtpesConfiguration() {
165         return ftpesConfiguration;
166     }
167
168     private <R> Mono<R> onErrorResume(Throwable trowable) {
169         logger.error("Could not refresh application configuration {}", trowable.toString());
170         return Mono.empty();
171     }
172
173     Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) {
174         return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context);
175     }
176
177     Mono<CbsClient> createCbsClient(EnvProperties env) {
178         return CbsClientFactory.createCbsClient(env);
179     }
180
181     /**
182      * Parse configuration.
183      *
184      * @param jsonObject the DFC service's configuration
185      * @return this which is updated if successful
186      */
187     private AppConfig parseCloudConfig(JsonObject jsonObject) {
188         try {
189             CloudConfigParser parser = new CloudConfigParser(jsonObject);
190             setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(),
191                 parser.getFtpesConfig());
192
193         } catch (DatafileTaskException e) {
194             logger.error("Could not parse configuration {}", e.toString(), e);
195         }
196         return this;
197     }
198
199     /**
200      * Reads the configuration from file.
201      */
202     void loadConfigurationFromFile() {
203         GsonBuilder gsonBuilder = new GsonBuilder();
204         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
205
206         try (InputStream inputStream = createInputStream(filepath)) {
207             JsonParser parser = new JsonParser();
208             JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject();
209             if (rootObject == null) {
210                 throw new JsonSyntaxException("Root is not a json object");
211             }
212             parseCloudConfig(rootObject);
213             logger.info("Local configuration file loaded: {}", filepath);
214         } catch (JsonSyntaxException | IOException e) {
215             logger.trace("Local configuration file not loaded: {}", filepath, e);
216         }
217     }
218
219     private synchronized void setConfiguration(@NotNull ConsumerConfiguration consumerConfiguration,
220             @NotNull Map<String, PublisherConfiguration> publisherConfiguration, @NotNull FtpesConfig ftpesConfig) {
221         this.dmaapConsumerConfiguration = consumerConfiguration;
222         this.publishingConfigurations = publisherConfiguration;
223         this.ftpesConfiguration = ftpesConfig;
224     }
225
226     JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
227         return parser.parse(new InputStreamReader(inputStream));
228     }
229
230     InputStream createInputStream(@NotNull String filepath) throws IOException {
231         return new BufferedInputStream(new FileInputStream(filepath));
232     }
233
234 }