logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .body(bodyProducer, String.class);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(traceTag, request);
}
public Mono<String> post(String uri, @Nullable String body) {
return postForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request) //
- .flatMap(this::toBody);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .headers(headers -> headers.setBasicAuth(username, password)) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request) //
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request);
}
public Mono<ResponseEntity<String>> putForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: <empty>", traceTag);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> put(String uri, String body) {
return putForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> getForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.get().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .get() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> get(String uri) {
return getForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.delete().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .delete() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> delete(String uri) {
return deleteForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
}
}
- private Mono<String> toBody(ResponseEntity<String> entity) {
+ private String toBody(ResponseEntity<String> entity) {
if (entity.getBody() == null) {
- return Mono.just("");
+ return "";
} else {
- return Mono.just(entity.getBody());
+ return entity.getBody();
}
}
.build();
}
- private Mono<WebClient> getWebClient() {
+ private WebClient getWebClient() {
if (this.webClient == null) {
this.webClient = buildWebClient(baseUrl);
}
- return Mono.just(buildWebClient(baseUrl));
+ return this.webClient;
}
-
}
return this.a1ClientFactory.createA1Client(ric) //
.flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
.collectList() //
- .flatMap(list -> Mono.just(ric)) //
+ .map(list -> ric) //
.doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) //
.doOnError(t -> {
logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage());
logger.debug("RIC added {}", ricId);
return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
- .flatMap(this::addRic) //
+ .doOnNext(this::addRic) //
.flatMap(this::notifyServicesRicAvailable) //
.flatMap(notUsed -> Mono.just(event));
} else if (event == RicConfigUpdate.Type.REMOVED) {
Ric ric = this.rics.get(ricId);
if (ric == null) {
logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
- addRic(new Ric(updatedInfo.getRicConfig())).block();
+ addRic(new Ric(updatedInfo.getRicConfig()));
} else {
ric.setRicConfig(updatedInfo.getRicConfig());
}
}
}
- Mono<Ric> addRic(Ric ric) {
+ void addRic(Ric ric) {
this.rics.put(ric);
if (this.appConfig.getVardataDirectory() != null) {
this.policies.restoreFromDatabase(ric, this.policyTypes);
}
logger.debug("Added RIC: {}", ric.id());
-
- return Mono.just(ric);
}
private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
return callbacks.notifyServicesRicAvailable(ric, services) //
.collectList() //
- .flatMap(list -> Mono.just(ric));
+ .map(list -> ric);
} else {
return Mono.just(ric);
}
}
public Mono<Ric> synchronizeRic(Ric ric) {
- return Mono.just(ric) //
- .flatMap(notUsed -> setRicState(ric)) //
+ return setRicState(ric) //
.flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
.flatMapMany(client -> runSynchronization(ric, client)) //
.onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
.collectList() //
- .flatMap(notUsed -> Mono.just(ric)) //
+ .map(notUsed -> ric) //
.doOnError(t -> { //
logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
ric.setState(RicState.UNAVAILABLE); //
ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
return callbacks.notifyServicesRicAvailable(ric, services) //
.collectList() //
- .flatMap(list -> Mono.just(ric));
+ .map(list -> ric);
}
private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
return Mono.just(policyTypes.get(policyTypeId));
}
return a1Client.getPolicyTypeSchema(policyTypeId) //
- .flatMap(schema -> createPolicyType(policyTypeId, schema));
+ .map(schema -> createPolicyType(policyTypeId, schema));
}
- private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
+ private PolicyType createPolicyType(String policyTypeId, String schema) {
PolicyType pt = PolicyType.builder().id(policyTypeId).schema(schema).build();
policyTypes.put(pt);
- return Mono.just(pt);
+ return pt;
}
private void deleteAllPoliciesInRepository(Ric ric) {