A1 Policy Management 83/125683/2
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 11 Nov 2021 08:40:14 +0000 (09:40 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 11 Nov 2021 10:58:03 +0000 (11:58 +0100)
Minor simplifications.

Issue-ID: CCSDK-3495
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: Id90cf9dfeb2b56561baf19391ddc858fc762888c

a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v1/PolicyController.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java

index c72f196..d4c264d 100644 (file)
@@ -96,12 +96,11 @@ class A1AdapterJsonHelper {
 
     public static Mono<String> getValueFromResponse(String response, String key) {
         return getOutput(response) //
-                .flatMap(responseParams -> {
+                .map(responseParams -> {
                     if (!responseParams.has(key)) {
-                        return Mono.just("");
+                        return "";
                     }
-                    String value = responseParams.get(key).toString();
-                    return Mono.just(value);
+                    return responseParams.get(key).toString();
                 });
     }
 
index c3be9b4..4f2770b 100644 (file)
@@ -66,96 +66,89 @@ public class AsyncRestClient {
         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.post() //
-                            .uri(uri) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .body(bodyProducer, String.class);
-                    return retrieve(traceTag, request);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .post() //
+                .uri(uri) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .body(bodyProducer, String.class);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> post(String uri, @Nullable String body) {
         return postForEntity(uri, body) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.post() //
-                            .uri(uri) //
-                            .headers(headers -> headers.setBasicAuth(username, password)) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .bodyValue(body);
-                    return retrieve(traceTag, request) //
-                            .flatMap(this::toBody);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .post() //
+                .uri(uri) //
+                .headers(headers -> headers.setBasicAuth(username, password)) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .bodyValue(body);
+        return retrieve(traceTag, request) //
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: {}", traceTag, body);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.put() //
-                            .uri(uri) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .bodyValue(body);
-                    return retrieve(traceTag, request);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .put() //
+                .uri(uri) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .bodyValue(body);
+        return retrieve(traceTag, request);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: <empty>", traceTag);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.put() //
-                            .uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient() //
+                .put() //
+                .uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> put(String uri, String body) {
         return putForEntity(uri, body) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.get().uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient() //
+                .get() //
+                .uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> get(String uri) {
         return getForEntity(uri) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.delete().uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient() //
+                .delete() //
+                .uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> delete(String uri) {
         return deleteForEntity(uri) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
@@ -184,11 +177,11 @@ public class AsyncRestClient {
         }
     }
 
-    private Mono<String> toBody(ResponseEntity<String> entity) {
+    private String toBody(ResponseEntity<String> entity) {
         if (entity.getBody() == null) {
-            return Mono.just("");
+            return "";
         } else {
-            return Mono.just(entity.getBody());
+            return entity.getBody();
         }
     }
 
@@ -229,11 +222,10 @@ public class AsyncRestClient {
                 .build();
     }
 
-    private Mono<WebClient> getWebClient() {
+    private WebClient getWebClient() {
         if (this.webClient == null) {
             this.webClient = buildWebClient(baseUrl);
         }
-        return Mono.just(buildWebClient(baseUrl));
+        return this.webClient;
     }
-
 }
index 3d2b55a..bead6d1 100644 (file)
@@ -205,7 +205,7 @@ public class PolicyController {
                 .flatMap(client -> client.deletePolicy(policy)) //
                 .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
                 .doOnError(notUsed -> ric.getLock().unlockBlocking()) //
-                .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)))
+                .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) //
                 .onErrorResume(this::handleException);
     }
 
@@ -273,7 +273,7 @@ public class PolicyController {
                 .doOnNext(notUsed -> policies.put(policy)) //
                 .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
                 .doOnError(trowable -> ric.getLock().unlockBlocking()) //
-                .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+                .map(notUsed -> new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK)) //
                 .onErrorResume(this::handleException);
     }
 
@@ -406,7 +406,7 @@ public class PolicyController {
 
         return a1ClientFactory.createA1Client(policy.getRic()) //
                 .flatMap(client -> client.getPolicyStatus(policy)) //
-                .flatMap(status -> Mono.just(new ResponseEntity<>(status, HttpStatus.OK)))
+                .map(status -> new ResponseEntity<>(status, HttpStatus.OK)) //
                 .onErrorResume(this::handleException);
     }
 
index 036958d..134d6d7 100644 (file)
@@ -207,7 +207,7 @@ public class PolicyController {
                 .flatMap(client -> client.deletePolicy(policy)) //
                 .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
                 .doOnError(notUsed -> ric.getLock().unlockBlocking()) //
-                .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)))
+                .map(notUsed -> new ResponseEntity<>(HttpStatus.NO_CONTENT)) //
                 .onErrorResume(this::handleException);
     }
 
index 6177ee1..e488af5 100644 (file)
@@ -226,7 +226,7 @@ public class RefreshConfigTask {
         return this.a1ClientFactory.createA1Client(ric) //
                 .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
                 .collectList() //
-                .flatMap(list -> Mono.just(ric)) //
+                .map(list -> ric) //
                 .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) //
                 .doOnError(t -> {
                     logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage());
@@ -243,7 +243,7 @@ public class RefreshConfigTask {
                 logger.debug("RIC added {}", ricId);
 
                 return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
-                        .flatMap(this::addRic) //
+                        .doOnNext(this::addRic) //
                         .flatMap(this::notifyServicesRicAvailable) //
                         .flatMap(notUsed -> Mono.just(event));
             } else if (event == RicConfigUpdate.Type.REMOVED) {
@@ -256,7 +256,7 @@ public class RefreshConfigTask {
                 Ric ric = this.rics.get(ricId);
                 if (ric == null) {
                     logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
-                    addRic(new Ric(updatedInfo.getRicConfig())).block();
+                    addRic(new Ric(updatedInfo.getRicConfig()));
                 } else {
                     ric.setRicConfig(updatedInfo.getRicConfig());
                 }
@@ -265,14 +265,12 @@ public class RefreshConfigTask {
         }
     }
 
-    Mono<Ric> addRic(Ric ric) {
+    void addRic(Ric ric) {
         this.rics.put(ric);
         if (this.appConfig.getVardataDirectory() != null) {
             this.policies.restoreFromDatabase(ric, this.policyTypes);
         }
         logger.debug("Added RIC: {}", ric.id());
-
-        return Mono.just(ric);
     }
 
     private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
@@ -280,7 +278,7 @@ public class RefreshConfigTask {
             ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
             return callbacks.notifyServicesRicAvailable(ric, services) //
                     .collectList() //
-                    .flatMap(list -> Mono.just(ric));
+                    .map(list -> ric);
         } else {
             return Mono.just(ric);
         }
index 8768901..b4c6595 100644 (file)
@@ -152,7 +152,7 @@ public class RicSupervision {
     private Mono<RicData> createRicData(Ric ric) {
         return Mono.just(ric) //
                 .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
-                .flatMap(a1Client -> Mono.just(new RicData(ric, a1Client)));
+                .map(a1Client -> new RicData(ric, a1Client));
     }
 
     private Mono<RicData> checkRicState(RicData ric) {
index 0ccccb7..0552df6 100644 (file)
@@ -97,13 +97,12 @@ public class RicSynchronizationTask {
     }
 
     public Mono<Ric> synchronizeRic(Ric ric) {
-        return Mono.just(ric) //
-                .flatMap(notUsed -> setRicState(ric)) //
+        return setRicState(ric) //
                 .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
                 .flatMapMany(client -> runSynchronization(ric, client)) //
                 .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
                 .collectList() //
-                .flatMap(notUsed -> Mono.just(ric)) //
+                .map(notUsed -> ric) //
                 .doOnError(t -> { //
                     logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
                     ric.setState(RicState.UNAVAILABLE); //
@@ -152,7 +151,7 @@ public class RicSynchronizationTask {
         ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
         return callbacks.notifyServicesRicAvailable(ric, services) //
                 .collectList() //
-                .flatMap(list -> Mono.just(ric));
+                .map(list -> ric);
     }
 
     private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
@@ -173,13 +172,13 @@ public class RicSynchronizationTask {
             return Mono.just(policyTypes.get(policyTypeId));
         }
         return a1Client.getPolicyTypeSchema(policyTypeId) //
-                .flatMap(schema -> createPolicyType(policyTypeId, schema));
+                .map(schema -> createPolicyType(policyTypeId, schema));
     }
 
-    private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
+    private PolicyType createPolicyType(String policyTypeId, String schema) {
         PolicyType pt = PolicyType.builder().id(policyTypeId).schema(schema).build();
         policyTypes.put(pt);
-        return Mono.just(pt);
+        return pt;
     }
 
     private void deleteAllPoliciesInRepository(Ric ric) {