NONRTRIC PMS, Sporadic instability 81/129581/4
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 9 Jun 2022 08:47:17 +0000 (10:47 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 9 Jun 2022 11:24:33 +0000 (13:24 +0200)
Attempt to stablize the synch.

Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: Ieda858e76082fd5224ac43f153e8967f871322d8

a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1AdapterJsonHelper.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java

index d4c264d..108424d 100644 (file)
@@ -31,7 +31,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.json.JSONArray;
-import org.json.JSONException;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,7 +62,7 @@ class A1AdapterJsonHelper {
                 }
             }
             return Flux.fromIterable(arrayList);
-        } catch (JSONException ex) { // invalid json
+        } catch (Exception ex) { // invalid json
             logger.debug("Invalid json {}", ex.getMessage());
             return Flux.error(ex);
         }
@@ -88,7 +87,7 @@ class A1AdapterJsonHelper {
             JSONObject outputJson = new JSONObject(response);
             JSONObject responseParams = outputJson.getJSONObject(OUTPUT);
             return Mono.just(responseParams);
-        } catch (JSONException ex) { // invalid json
+        } catch (Exception ex) { // invalid json
             logger.debug("Invalid json {}", ex.getMessage());
             return Mono.error(ex);
         }
@@ -110,7 +109,7 @@ class A1AdapterJsonHelper {
             JSONObject schemaObject = jsonObject.getJSONObject("policySchema");
             String schemaString = schemaObject.toString();
             return Mono.just(schemaString);
-        } catch (JSONException ex) { // invalid json
+        } catch (Exception ex) { // invalid json
             logger.debug("Invalid json {}", ex.getMessage());
             return Mono.error(ex);
         }
index 6216a4d..764afc8 100644 (file)
@@ -285,17 +285,17 @@ public class CcsdkA1AdapterClient implements A1Client {
         return restClient
                 .postWithAuthHeader(controllerUrl(rpcName), inputJsonString, this.controllerConfig.getUserName(),
                         this.controllerConfig.getPassword()) //
-                .flatMap(this::extractResponseBody);
+                .flatMap(resp -> extractResponseBody(resp, ricUrl));
     }
 
-    private Mono<String> extractResponse(JSONObject responseOutput) {
+    private Mono<String> extractResponse(JSONObject responseOutput, String ricUrl) {
         AdapterOutput output = gson.fromJson(responseOutput.toString(), AdapterOutput.class);
 
         String body = output.body == null ? "" : output.body;
         if (HttpStatus.valueOf(output.httpStatus).is2xxSuccessful()) {
             return Mono.just(body);
         } else {
-            logger.debug("Error response: {} {}", output.httpStatus, body);
+            logger.debug("Error response: {} {}, from: {}", output.httpStatus, body, ricUrl);
             byte[] responseBodyBytes = body.getBytes(StandardCharsets.UTF_8);
             HttpStatus httpStatus = HttpStatus.valueOf(output.httpStatus);
             WebClientResponseException responseException = new WebClientResponseException(httpStatus.value(),
@@ -305,9 +305,9 @@ public class CcsdkA1AdapterClient implements A1Client {
         }
     }
 
-    private Mono<String> extractResponseBody(String responseStr) {
+    private Mono<String> extractResponseBody(String responseStr, String ricUrl) {
         return A1AdapterJsonHelper.getOutput(responseStr) //
-                .flatMap(this::extractResponse);
+                .flatMap(responseOutput -> extractResponse(responseOutput, ricUrl));
     }
 
     private String controllerUrl(String rpcName) {
index 6201842..500ddd2 100644 (file)
@@ -66,6 +66,7 @@ import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.WebClientException;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Mono;
 
@@ -277,6 +278,9 @@ public class PolicyController {
         if (throwable instanceof WebClientResponseException) {
             WebClientResponseException e = (WebClientResponseException) throwable;
             return ErrorResponse.createMono(e.getResponseBodyAsString(), e.getStatusCode());
+        } else if (throwable instanceof WebClientException) {
+            WebClientException e = (WebClientException) throwable;
+            return ErrorResponse.createMono(e.getMessage(), HttpStatus.BAD_GATEWAY);
         } else if (throwable instanceof RejectionException) {
             RejectionException e = (RejectionException) throwable;
             return ErrorResponse.createMono(e.getMessage(), e.getStatus());
index f90d462..177778b 100644 (file)
@@ -109,6 +109,7 @@ public class RicSupervision {
     private Flux<RicData> createTask() {
         return Flux.fromIterable(rics.getRics()) //
                 .flatMap(this::createRicData) //
+                .onErrorResume(t -> Flux.empty()) //
                 .flatMap(this::checkOneRic, CONCURRENCY);
     }
 
@@ -153,8 +154,9 @@ public class RicSupervision {
     }
 
     private Mono<RicData> createRicData(Ric ric) {
-        return Mono.just(ric) //
-                .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
+        return this.a1ClientFactory.createA1Client(ric) //
+                .doOnError(t -> logger.debug("Could not create A1 client for ric: {}, reason: {}", ric.id(),
+                        t.getMessage())) //
                 .map(a1Client -> new RicData(ric, a1Client));
     }
 
index 2d282f3..b3afa7c 100644 (file)
@@ -40,6 +40,7 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
 
 /**
  * Synchronizes the content of a Near-RT RIC with the content in the repository.
@@ -96,12 +97,19 @@ public class RicSynchronizationTask {
                 .flatMapMany(client -> runSynchronization(ric, client)) //
                 .doOnError(t -> { //
                     logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
-                    ric.setState(RicState.UNAVAILABLE); //
                     deletePoliciesIfNotRecreatable(t, ric);
                 }) //
                 .collectList() //
                 .flatMap(notUsed -> onSynchronizationComplete(ric)) //
-                .onErrorResume(t -> Mono.just(ric));
+                .onErrorResume(t -> Mono.just(ric)) //
+                .doFinally(signal -> onFinally(signal, ric));
+    }
+
+    private void onFinally(SignalType signal, Ric ric) {
+        if (ric.getState().equals(RicState.SYNCHRONIZING)) {
+            logger.debug("Resetting ric state after failed synch, ric: {}, signal: {}", ric.id(), signal);
+            ric.setState(RicState.UNAVAILABLE); //
+        }
     }
 
     /**
index 0ea0a8c..6386441 100644 (file)
@@ -47,6 +47,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFact
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
@@ -56,6 +57,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Flux;
@@ -163,6 +165,20 @@ class RicSynchronizationTaskTest {
         assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
+    @Test
+    void testConnectionError() {
+        setUpCreationOfA1Client();
+        simulateRicWithNoPolicyTypes();
+        policies.put(policy1);
+        WebClientRequestException exception =
+                new WebClientRequestException(new ServiceException("x"), null, null, null);
+        when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
+        RicSynchronizationTask synchronizerUnderTest = createTask();
+        ric1.setState(RicState.AVAILABLE);
+        synchronizerUnderTest.run(ric1);
+        await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
+    }
+
     @Test
     void ricIdlePolicyTypeInRepo_thenSynchronizationWithReuseOfTypeFromRepoAndCorrectServiceNotified() {
         rics.put(ric1);