2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
23 import com.google.gson.JsonObject;
25 import java.time.Duration;
26 import java.util.Optional;
27 import java.util.Properties;
29 import lombok.AccessLevel;
32 import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
33 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory;
34 import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
35 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
36 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
37 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
38 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
39 import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
40 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
41 import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
42 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
43 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
44 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
45 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.beans.factory.annotation.Value;
50 import org.springframework.stereotype.Component;
52 import reactor.core.Disposable;
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
55 import reactor.util.annotation.Nullable;
58 * Regularly refreshes the component configuration from a configuration file.
61 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
62 public class RefreshConfigTask {
64 private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
66 @Value("#{systemEnvironment}")
67 public Properties systemEnvironment;
70 * The time between refreshes of the configuration. Not final so tests can
73 private static Duration configRefreshInterval = Duration.ofMinutes(1);
75 final ConfigurationFile configurationFile;
76 final ApplicationConfig appConfig;
78 @Getter(AccessLevel.PROTECTED)
79 private Disposable refreshTask = null;
81 private final Rics rics;
82 private final A1ClientFactory a1ClientFactory;
83 private final Policies policies;
84 private final Services services;
85 private final PolicyTypes policyTypes;
86 private final AsyncRestClientFactory restClientFactory;
88 private long fileLastModified = 0;
91 public RefreshConfigTask(ConfigurationFile configurationFile, ApplicationConfig appConfig, Rics rics,
92 Policies policies, Services services, PolicyTypes policyTypes, A1ClientFactory a1ClientFactory,
93 SecurityContext securityContext) {
94 this.configurationFile = configurationFile;
95 this.appConfig = appConfig;
97 this.policies = policies;
98 this.services = services;
99 this.policyTypes = policyTypes;
100 this.a1ClientFactory = a1ClientFactory;
101 this.restClientFactory = new AsyncRestClientFactory(appConfig.getWebClientConfig(), securityContext);
104 public void start() {
105 logger.debug("Starting refreshConfigTask");
107 refreshTask = createRefreshTask() //
109 notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger
110 .error("Configuration refresh terminated due to exception {}", throwable.toString()),
111 () -> logger.error("Configuration refresh terminated"));
115 if (refreshTask != null) {
116 refreshTask.dispose();
120 Flux<RicConfigUpdate.Type> createRefreshTask() {
121 Flux<JsonObject> loadFromFile = regularInterval() //
122 .flatMap(notUsed -> loadConfigurationFromFile()) //
123 .onErrorResume(this::ignoreErrorFlux) //
124 .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
125 .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
127 final int CONCURRENCY = 50; // Number of RIC synched in paralell
129 return loadFromFile //
130 .flatMap(this::parseConfiguration) //
131 .flatMap(this::updateConfig, CONCURRENCY) //
132 .flatMap(this::handleUpdatedRicConfig) //
133 .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
136 private Flux<Long> regularInterval() {
137 return Flux.interval(Duration.ZERO, configRefreshInterval) //
138 .onBackpressureDrop() //
139 .limitRate(1); // Limit so that only one event is emitted at a time
142 private <R> Flux<R> ignoreErrorFlux(Throwable throwable) {
143 String errMsg = throwable.toString();
144 logger.warn("Could not refresh application configuration. {}", errMsg);
148 private Mono<ApplicationConfigParser.ConfigParserResult> parseConfiguration(JsonObject jsonObject) {
150 ApplicationConfigParser parser = new ApplicationConfigParser(this.appConfig);
151 return Mono.just(parser.parse(jsonObject));
152 } catch (Exception e) {
153 String str = e.toString();
154 logger.error("Could not parse configuration {}", str);
159 private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser.ConfigParserResult config) {
160 return this.appConfig.setConfiguration(config);
163 private void removePoliciciesInRic(@Nullable Ric ric) {
165 synchronizationTask().run(ric);
169 private RicSynchronizationTask synchronizationTask() {
170 return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
174 * for an added RIC after a restart it is nesessary to get the suypported policy
175 * types from the RIC unless a full synchronization is wanted.
177 * @param ric the ric to get supprted types from
178 * @return the same ric
180 private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
181 logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
182 // Synchronize the policy types
183 ric.setState(RicState.SYNCHRONIZING);
184 return this.a1ClientFactory.createA1Client(ric) //
185 .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
188 .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) //
190 logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage());
191 ric.setState(RicState.UNAVAILABLE); //
193 .onErrorResume(t -> Mono.just(ric));
196 public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
197 synchronized (this.rics) {
198 String ricId = updatedInfo.getRicConfig().getRicId();
199 RicConfigUpdate.Type event = updatedInfo.getType();
200 if (event == RicConfigUpdate.Type.ADDED) {
201 logger.debug("RIC added {}", ricId);
203 return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
204 .doOnNext(this::addRic) //
205 .flatMap(this::notifyServicesRicAvailable) //
206 .flatMap(notUsed -> Mono.just(event));
207 } else if (event == RicConfigUpdate.Type.REMOVED) {
208 logger.debug("RIC removed {}", ricId);
209 Ric ric = rics.remove(ricId);
210 this.policies.removePoliciesForRic(ricId);
211 removePoliciciesInRic(ric);
212 } else if (event == RicConfigUpdate.Type.CHANGED) {
213 logger.debug("RIC config updated {}", ricId);
214 Ric ric = this.rics.get(ricId);
216 logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
217 addRic(new Ric(updatedInfo.getRicConfig()));
219 ric.setRicConfig(updatedInfo.getRicConfig());
222 return Mono.just(event);
226 void addRic(Ric ric) {
228 if (this.appConfig.getVardataDirectory() != null) {
229 this.policies.restoreFromDatabase(ric, this.policyTypes);
231 logger.debug("Added RIC: {}", ric.id());
234 private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
235 if (ric.getState() == RicState.AVAILABLE) {
236 ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
237 return callbacks.notifyServicesRicAvailable(ric, services) //
241 return Mono.just(ric);
246 * Reads the configuration from file.
248 Flux<JsonObject> loadConfigurationFromFile() {
249 if (configurationFile.getLastModified() == fileLastModified) {
252 fileLastModified = configurationFile.getLastModified();
253 Optional<JsonObject> readJson = configurationFile.readFile();
254 if (readJson.isPresent()) {
255 return Flux.just(readJson.get());