NONRTRIC PMS, Sporadic instability 94/129594/1
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 10 Jun 2022 10:43:06 +0000 (12:43 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 10 Jun 2022 11:11:04 +0000 (13:11 +0200)
Some further simplifications and added test.

Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: I1ec98017d63047a0036db5ea12f770db00b1152b

a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java

index fea242a..983e92e 100644 (file)
@@ -36,11 +36,9 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
-import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
 import org.slf4j.Logger;
@@ -130,7 +128,7 @@ public class RefreshConfigTask {
                 .flatMap(this::parseConfiguration) //
                 .flatMap(this::updateConfig, CONCURRENCY) //
                 .flatMap(this::handleUpdatedRicConfig) //
-                .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
+                .doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal));
     }
 
     private Flux<Long> regularInterval() {
@@ -170,40 +168,16 @@ public class RefreshConfigTask {
         return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
     }
 
-    /**
-     * for an added RIC after a restart it is nesessary to get the suypported policy
-     * types from the RIC unless a full synchronization is wanted.
-     *
-     * @param ric the ric to get supprted types from
-     * @return the same ric
-     */
-    private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
-        logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
-        // Synchronize the policy types
-        ric.setState(RicState.SYNCHRONIZING);
-        return this.a1ClientFactory.createA1Client(ric) //
-                .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
-                .collectList() //
-                .map(list -> ric) //
-                .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) //
-                .doOnError(t -> {
-                    logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage());
-                    ric.setState(RicState.UNAVAILABLE); //
-                }) //
-                .onErrorResume(t -> Mono.just(ric));
-    }
-
     public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
         synchronized (this.rics) {
             String ricId = updatedInfo.getRicConfig().getRicId();
             RicConfigUpdate.Type event = updatedInfo.getType();
             if (event == RicConfigUpdate.Type.ADDED) {
                 logger.debug("RIC added {}", ricId);
-
-                return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
-                        .doOnNext(this::addRic) //
-                        .flatMap(this::notifyServicesRicAvailable) //
-                        .flatMap(notUsed -> Mono.just(event));
+                Ric ric = new Ric(updatedInfo.getRicConfig());
+                this.addRic(ric);
+                return this.synchronizationTask().synchronizeRic(ric) //
+                        .map(notUsed -> event);
             } else if (event == RicConfigUpdate.Type.REMOVED) {
                 logger.debug("RIC removed {}", ricId);
                 Ric ric = rics.remove(ricId);
@@ -231,17 +205,6 @@ public class RefreshConfigTask {
         logger.debug("Added RIC: {}", ric.id());
     }
 
-    private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
-        if (ric.getState() == RicState.AVAILABLE) {
-            ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
-            return callbacks.notifyServicesRicAvailable(ric, services) //
-                    .collectList() //
-                    .map(list -> ric);
-        } else {
-            return Mono.just(ric);
-        }
-    }
-
     /**
      * Reads the configuration from file.
      */
index 177778b..e3edaf4 100644 (file)
@@ -106,11 +106,12 @@ public class RicSupervision {
         createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed"));
     }
 
-    private Flux<RicData> createTask() {
+    private Flux<Ric> createTask() {
         return Flux.fromIterable(rics.getRics()) //
                 .flatMap(this::createRicData) //
                 .onErrorResume(t -> Flux.empty()) //
-                .flatMap(this::checkOneRic, CONCURRENCY);
+                .flatMap(this::checkOneRic, CONCURRENCY) //
+                .map(ricData -> ricData.ric);
     }
 
     private Mono<RicData> checkOneRic(RicData ricData) {
index dcb03a5..c689097 100644 (file)
@@ -920,11 +920,16 @@ class ApplicationTest {
         final Instant startTime = Instant.now();
         List<Thread> threads = new ArrayList<>();
         List<ConcurrencyTestRunnable> tests = new ArrayList<>();
-        a1ClientFactory.setResponseDelay(Duration.ofMillis(1));
+        a1ClientFactory.setResponseDelay(Duration.ofMillis(2));
         addRic("ric");
         addPolicyType("type1", "ric");
         addPolicyType("type2", "ric");
 
+        final String NON_RESPONDING_RIC = "NonRespondingRic";
+        Ric nonRespondingRic = addRic(NON_RESPONDING_RIC);
+        MockA1Client a1Client = a1ClientFactory.getOrCreateA1Client(NON_RESPONDING_RIC);
+        a1Client.setErrorInject("errorInject");
+
         for (int i = 0; i < 10; ++i) {
             AsyncRestClient restClient = restClient();
             ConcurrencyTestRunnable test =
@@ -942,6 +947,9 @@ class ApplicationTest {
         }
         assertThat(policies.size()).isZero();
         logger.info("Concurrency test took " + Duration.between(startTime, Instant.now()));
+
+        assertThat(nonRespondingRic.getState()).isEqualTo(RicState.UNAVAILABLE);
+        nonRespondingRic.setState(RicState.AVAILABLE);
     }
 
     private AsyncRestClient restClient(String baseUrl, boolean useTrustValidation) {
index 203cc3e..2ecb9c2 100644 (file)
@@ -87,6 +87,7 @@ class RefreshConfigTaskTest {
     private RefreshConfigTask createTestObject(boolean configFileExists, Rics rics, Policies policies,
             boolean stubConfigFileExists) {
         SecurityContext secContext = new SecurityContext("");
+
         RefreshConfigTask obj =
                 spy(new RefreshConfigTask(configurationFileMock, appConfig, rics, policies, new Services(appConfig),
                         new PolicyTypes(appConfig), new A1ClientFactory(appConfig, secContext), secContext));
index 396a406..b2bf58e 100644 (file)
@@ -24,8 +24,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.time.Instant;
@@ -148,7 +148,7 @@ class RicSupervisionTest {
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
-        verifyNoMoreInteractions(supervisorUnderTest);
+        verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
         assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
     }
 
@@ -161,10 +161,8 @@ class RicSupervisionTest {
         doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask();
         doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any());
         supervisorUnderTest.checkAllRics();
-        verify(supervisorUnderTest).checkAllRics();
-        verify(supervisorUnderTest).createSynchronizationTask();
         verify(synchronizationTaskMock).synchronizeRic(RIC_1);
-        verifyNoMoreInteractions(supervisorUnderTest);
+
         assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
@@ -179,7 +177,7 @@ class RicSupervisionTest {
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
-        verifyNoMoreInteractions(supervisorUnderTest);
+        verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
         assertThat(RIC_1.getState()).isEqualTo(RicState.SYNCHRONIZING);
     }
 
@@ -196,7 +194,8 @@ class RicSupervisionTest {
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
-        verifyNoMoreInteractions(supervisorUnderTest);
+        verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
+
         assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
@@ -218,9 +217,8 @@ class RicSupervisionTest {
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
-        verify(supervisorUnderTest).createSynchronizationTask();
         verify(synchronizationTaskMock).synchronizeRic(RIC_1);
-        verifyNoMoreInteractions(supervisorUnderTest);
+
         assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
@@ -242,11 +240,9 @@ class RicSupervisionTest {
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
-        verify(supervisorUnderTest).createSynchronizationTask();
         verify(synchronizationTaskMock).synchronizeRic(RIC_1);
-        verifyNoMoreInteractions(supervisorUnderTest);
-        assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
 
+        assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
     @Test
@@ -263,7 +259,8 @@ class RicSupervisionTest {
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
-        verifyNoMoreInteractions(supervisorUnderTest);
+        verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
+
         assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
@@ -286,9 +283,8 @@ class RicSupervisionTest {
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
-        verify(supervisorUnderTest).createSynchronizationTask();
         verify(synchronizationTaskMock).synchronizeRic(RIC_1);
-        verifyNoMoreInteractions(supervisorUnderTest);
+
         assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
@@ -315,9 +311,7 @@ class RicSupervisionTest {
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
-        verify(supervisorUnderTest).createSynchronizationTask();
         verify(synchronizationTaskMock).synchronizeRic(RIC_1);
-        verifyNoMoreInteractions(supervisorUnderTest);
         assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
     }
 
index 2a3b28e..b76f1e7 100644 (file)
@@ -28,6 +28,8 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Vector;
 
+import lombok.Setter;
+
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
@@ -44,7 +46,12 @@ import reactor.core.publisher.MonoSink;
 public class MockA1Client implements A1Client {
     Policies policies;
     private final PolicyTypes policyTypes;
-    private final Duration asynchDelay;
+
+    @Setter
+    private Duration asynchDelay;
+
+    @Setter
+    private String errorInject;
 
     public MockA1Client(String ricId, ApplicationConfig appConfig, PolicyTypes policyTypes, Duration asynchDelay) {
         this.policyTypes = policyTypes;
@@ -117,14 +124,19 @@ public class MockA1Client implements A1Client {
     }
 
     private <T> Mono<T> mono(T value) {
-        if (this.asynchDelay.isZero()) {
-            return Mono.just(value);
-        } else {
-            return Mono.create(monoSink -> asynchResponse(monoSink, value));
+        Mono<T> res = Mono.just(value);
+        if (!this.asynchDelay.isZero()) {
+            res = Mono.create(monoSink -> asynchResponse(monoSink, value));
         }
+
+        if (this.errorInject != null) {
+            res = res.flatMap(x -> monoError(this.errorInject, HttpStatus.BAD_GATEWAY));
+        }
+
+        return res;
     }
 
-    public static Mono<String> monoError(String responseBody, HttpStatus status) {
+    public static <T> Mono<T> monoError(String responseBody, HttpStatus status) {
         byte[] responseBodyBytes = responseBody.getBytes(StandardCharsets.UTF_8);
         WebClientResponseException a1Exception = new WebClientResponseException(status.value(),
                 status.getReasonPhrase(), null, responseBodyBytes, StandardCharsets.UTF_8, null);