2 * ============LICENSE_START=======================================================
3 * PNF-REGISTRATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.services.prh.configuration;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
26 import org.springframework.cloud.context.refresh.ContextRefresher;
27 import org.springframework.context.event.EventListener;
28 import org.springframework.core.env.Environment;
29 import org.springframework.stereotype.Component;
30 import reactor.core.Disposable;
31 import reactor.core.publisher.Flux;
32 import reactor.core.scheduler.Scheduler;
33 import reactor.core.scheduler.Schedulers;
35 import javax.annotation.PostConstruct;
36 import javax.annotation.PreDestroy;
37 import java.time.Duration;
40 public class CbsConfigRefreshScheduler {
42 private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfigRefreshScheduler.class);
43 private static final String CBS_UPDATES_INTERVAL_PROPERTY = "cbs.updates-interval";
44 private static final Duration NO_UPDATES = Duration.ZERO;
46 private final ContextRefresher contextRefresher;
47 private final Environment environment;
48 private final Scheduler scheduler;
49 private transient Disposable refreshEventsStreamHandle;
52 public CbsConfigRefreshScheduler(ContextRefresher contextRefresher, Environment environment) {
53 this.contextRefresher = contextRefresher;
54 this.environment = environment;
55 this.scheduler = Schedulers.newElastic("conf-updates");
59 public void startPollingForCbsUpdates() {
60 startPollingForCbsUpdates(getCbsUpdatesInterval());
63 private void startPollingForCbsUpdates(Duration updatesInterval) {
64 if (!updatesInterval.equals(NO_UPDATES)) {
65 LOGGER.info("Configuring pulling for CBS updates in every {}", updatesInterval);
66 refreshEventsStreamHandle = Flux.interval(updatesInterval, scheduler)
68 LOGGER.debug("Requesting context refresh");
69 contextRefresher.refresh();
71 .onErrorContinue((e, o) -> LOGGER.error("Failed fetching config updates from CBS", e))
77 public void onEnvironmentChanged(EnvironmentChangeEvent event) {
78 if (event.getKeys().contains(CBS_UPDATES_INTERVAL_PROPERTY)) {
79 LOGGER.info("CBS config polling interval changed to {}", environment.getProperty(CBS_UPDATES_INTERVAL_PROPERTY));
80 stopPollingForCbsUpdates();
81 startPollingForCbsUpdates(getCbsUpdatesInterval());
85 private Duration getCbsUpdatesInterval() {
86 return environment.getProperty(CBS_UPDATES_INTERVAL_PROPERTY, Duration.class, NO_UPDATES);
90 private void stopPollingForCbsUpdates() {
91 if(refreshEventsStreamHandle != null) {
92 LOGGER.debug("Stopping pulling for CBS updates");
93 refreshEventsStreamHandle.dispose();