</dependency>
<dependency>
- <groupId>org.functionaljava</groupId>
- <artifactId>functionaljava</artifactId>
- <version>3.0</version>
- </dependency>
+ <groupId>org.functionaljava</groupId>
+ <artifactId>functionaljava</artifactId>
+ <version>3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-core</artifactId>
+ <version>5.0.9.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-expression</artifactId>
+ <version>5.0.9.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-web</artifactId>
+ <version>5.0.9.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-webmvc</artifactId>
+ <version>5.0.9.RELEASE</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tomcat.embed</groupId>
+ <artifactId>tomcat-embed-core</artifactId>
+ <version>9.0.14</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.7</version>
+ </dependency>
</dependencies>
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConfigFetchFromCbs {
+import reactor.core.Disposable;
+
+public class ConfigFetchFromCbs implements Runnable {
private static Logger log = LoggerFactory.getLogger(ConfigFetchFromCbs.class);
+ private Duration interval;
+
+ public ConfigFetchFromCbs() {
+
+ }
+
+ public ConfigFetchFromCbs(Duration interval) {
+ this.interval = interval;
+ }
+
/**
* Gets app config from CBS.
*/
- public void getAppConfig() {
+ private Disposable getAppConfig() {
// Generate RequestID and InvocationID which will be used when logging and in
// HTTP requests
log.debug("environments {}", env);
ConfigPolicy configPolicy = ConfigPolicy.getInstance();
+ // Polling properties
+ final Duration initialDelay = Duration.ofSeconds(5);
+ final Duration period = interval;
+
// Create the client and use it to get the configuration
final CbsRequest request = CbsRequests.getAll(diagnosticContext);
- CbsClientFactory.createCbsClient(env).flatMap(cbsClient -> cbsClient.get(request)).subscribe(jsonObject -> {
- log.info("configuration and policy from CBS {}", jsonObject);
- JsonObject config = jsonObject.getAsJsonObject("config");
-
- updateConfigurationFromJsonObject(config);
-
- Type mapType = new TypeToken<Map<String, Object>>() {
- }.getType();
- JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
- .getAsJsonObject().getAsJsonObject("config");
- Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
- configPolicy.setConfig(policy);
- log.info("Config policy {}", configPolicy);
- }, throwable -> log.warn("Ooops", throwable));
-
+ return CbsClientFactory.createCbsClient(env)
+ .flatMapMany(cbsClient -> cbsClient.updates(request, initialDelay, period)).subscribe(jsonObject -> {
+ log.info("configuration and policy from CBS {}", jsonObject);
+ JsonObject config = jsonObject.getAsJsonObject("config");
+ Duration newPeriod = Duration.ofSeconds(config.get("cbsPollingInterval").getAsInt());
+ if (!newPeriod.equals(period)) {
+ interval = newPeriod;
+ synchronized (this) {
+ this.notifyAll();
+ }
+
+ }
+ updateConfigurationFromJsonObject(config);
+
+ Type mapType = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ if (jsonObject.getAsJsonObject("policies") != null) {
+ JsonObject policyJson = jsonObject.getAsJsonObject("policies").getAsJsonArray("items").get(0)
+ .getAsJsonObject().getAsJsonObject("config");
+ Map<String, Object> policy = new Gson().fromJson(policyJson, mapType);
+ configPolicy.setConfig(policy);
+ log.info("Config policy {}", configPolicy);
+ }
+ }, throwable -> log.warn("Ooops", throwable));
}
private void updateConfigurationFromJsonObject(JsonObject jsonObject) {
configuration.setOofTriggerCountTimer(oofTriggerCountTimer);
configuration.setOofTriggerCountThreshold(oofTriggerCountThreshold);
configuration.setPolicyRespTimer(policyRespTimer);
-
- log.info("configuration from CBS {}", configuration.toString());
+ log.info("configuration from CBS {}", configuration);
+
+ }
+
+ @Override
+ public void run() {
+ Boolean done = false;
+ while (!done) {
+ try {
+ Disposable disp = getAppConfig();
+ synchronized (this) {
+ this.wait();
+ }
+ log.info("Polling interval changed");
+ disp.dispose();
+ } catch (Exception e) {
+ done = true;
+ }
+ }
}
}