NONRTRIC PMS, Sporadic instability 23/129423/4
authorPatrikBuhr <patrik.buhr@est.tech>
Mon, 30 May 2022 12:22:46 +0000 (14:22 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Tue, 7 Jun 2022 07:54:09 +0000 (09:54 +0200)
Attempt to stablize the synch.

Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: Ie0f266e2eef23e91dcf6f5925a577bb930b6d9e8

a1-policy-management/config/application.yaml
a1-policy-management/pom.xml
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java

index 7628624..f0f5337 100644 (file)
@@ -56,6 +56,8 @@ server:
       key-store: /opt/app/policy-agent/etc/cert/keystore.jks
       key-password: policy_agent
       key-alias: policy_agent
+      # trust-store-password:
+      # trust-store:
 app:
   # Location of the component configuration file.
   filepath: /opt/app/policy-agent/data/application_configuration.json
@@ -80,4 +82,3 @@ app:
   # A file containing an authorization token, which shall be inserted in each HTTP header (authorization).
   # If the file name is empty, no authorization token is sent.
   auth-token-file:
-
index 8808771..04d65ce 100644 (file)
@@ -35,7 +35,7 @@
         <java.version.source>11</java.version.source>
         <java.version.target>11</java.version.target>
         <springfox.version>3.0.0</springfox.version>
-        <immutable.version>2.9.0</immutable.version>
+        <gson.version>2.9.0</gson.version>
         <json.version>20220320</json.version>
         <formatter-maven-plugin.version>2.13.0</formatter-maven-plugin.version>
         <spotless-maven-plugin.version>2.5.0</spotless-maven-plugin.version>
             <artifactId>guava</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.immutables</groupId>
-            <artifactId>value</artifactId>
-            <version>${immutable.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.immutables</groupId>
+            <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
-            <version>${immutable.version}</version>
+            <version>${gson.version}</version>
         </dependency>
         <dependency>
             <groupId>org.json</groupId>
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
index cb4e2eb..6216a4d 100644 (file)
@@ -34,6 +34,7 @@ import lombok.Getter;
 import org.json.JSONObject;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,9 +133,9 @@ public class CcsdkA1AdapterClient implements A1Client {
             this.controllerConfig = controllerConfig;
             logger.debug("CcsdkA1AdapterClient for ric: {}, a1Controller: {}", ricConfig.getRicId(), controllerConfig);
         } else {
+            logger.error("Not supported protocoltype: {}", protocolType);
             throw new IllegalArgumentException("Not handeled protocolversion: " + protocolType);
         }
-
     }
 
     @Override
@@ -146,7 +147,6 @@ public class CcsdkA1AdapterClient implements A1Client {
                     .flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString) //
                     .collectList();
         }
-
     }
 
     @Override
@@ -173,7 +173,7 @@ public class CcsdkA1AdapterClient implements A1Client {
         } else if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V2_0_0) {
             return StdA1ClientVersion2.extractPolicySchema(controllerResponse, policyTypeId);
         } else {
-            throw new NullPointerException("Not supported");
+            return Mono.error(new ServiceException("Not supported " + this.protocolType));
         }
     }
 
@@ -234,6 +234,7 @@ public class CcsdkA1AdapterClient implements A1Client {
         } else if (protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_OSC_V1) {
             return new OscA1Client.UriBuilder(ricConfig);
         }
+        logger.error("Not supported protocoltype: {}", protocolType);
         throw new NullPointerException();
     }
 
index e4711c9..f90d462 100644 (file)
@@ -114,11 +114,12 @@ public class RicSupervision {
 
     private Mono<RicData> checkOneRic(RicData ricData) {
         if (ricData.ric.getState() == RicState.CONSISTENCY_CHECK || ricData.ric.getState() == RicState.SYNCHRONIZING) {
+            logger.debug("Skipping check ric: {}, state: {}", ricData.ric.id(), ricData.ric.getState());
             return Mono.empty(); // Skip, already in progress
         }
         return ricData.ric.getLock().lock(LockType.EXCLUSIVE, "checkOneRic") //
                 .flatMap(lock -> synchIfUnavailable(ricData)) //
-                .doOnNext(ric -> ric.ric.setState(RicState.CONSISTENCY_CHECK)) //
+                .doOnNext(ric -> ricData.ric.setState(RicState.CONSISTENCY_CHECK)) //
                 .flatMap(x -> checkRicPolicies(ricData)) //
                 .flatMap(x -> checkRicPolicyTypes(ricData)) //
                 .doOnNext(x -> onRicCheckedOk(ricData)) //
@@ -127,6 +128,14 @@ public class RicSupervision {
                 .onErrorResume(throwable -> Mono.empty());
     }
 
+    private Mono<RicData> synchIfUnavailable(RicData ric) {
+        if (ric.ric.getState() == RicState.UNAVAILABLE) {
+            return Mono.error(new SynchNeededException(ric));
+        } else {
+            return Mono.just(ric);
+        }
+    }
+
     private Mono<RicData> onRicCheckedError(Throwable t, RicData ricData) {
         logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage());
         ricData.ric.setState(RicState.UNAVAILABLE);
@@ -149,14 +158,6 @@ public class RicSupervision {
                 .map(a1Client -> new RicData(ric, a1Client));
     }
 
-    private Mono<RicData> synchIfUnavailable(RicData ric) {
-        if (ric.ric.getState() == RicState.UNAVAILABLE) {
-            return Mono.error(new SynchNeededException(ric));
-        } else {
-            return Mono.just(ric);
-        }
-    }
-
     private Mono<RicData> checkRicPolicies(RicData ric) {
         return ric.getClient().getPolicyIdentities() //
                 .flatMap(ricP -> validateInstances(ricP, ric));
index 94e873e..2d282f3 100644 (file)
@@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -93,17 +94,36 @@ public class RicSynchronizationTask {
         return this.a1ClientFactory.createA1Client(ric) //
                 .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) //
                 .flatMapMany(client -> runSynchronization(ric, client)) //
-                .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
-                .collectList() //
-                .map(notUsed -> ric) //
                 .doOnError(t -> { //
                     logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
                     ric.setState(RicState.UNAVAILABLE); //
+                    deletePoliciesIfNotRecreatable(t, ric);
                 }) //
+                .collectList() //
                 .flatMap(notUsed -> onSynchronizationComplete(ric)) //
                 .onErrorResume(t -> Mono.just(ric));
     }
 
+    /**
+     * If a 4xx error is received, allpolicies are deleted. This is just to avoid
+     * cyclical receovery due to that the NearRT RIC cannot accept a previously
+     * policy.
+     */
+    private void deletePoliciesIfNotRecreatable(Throwable throwable, Ric ric) {
+        if (throwable instanceof WebClientResponseException) {
+            WebClientResponseException responseException = (WebClientResponseException) throwable;
+            if (responseException.getStatusCode().is4xxClientError()) {
+                deleteAllPoliciesInRepository(ric);
+            }
+        }
+    }
+
+    private void deleteAllPoliciesInRepository(Ric ric) {
+        for (Policy policy : policies.getForRic(ric.id())) {
+            this.policies.remove(policy);
+        }
+    }
+
     public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
         return a1Client.getPolicyTypeIdentities() //
                 .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
@@ -134,19 +154,6 @@ public class RicSynchronizationTask {
                 .map(list -> ric);
     }
 
-    private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
-        logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
-        deleteAllPoliciesInRepository(ric);
-
-        Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
-                .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
-        Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
-                .flatMapMany(A1Client::deleteAllPolicies) //
-                .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
-
-        return Flux.concat(synchronizedTypes, deletePoliciesInRic);
-    }
-
     private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
         if (policyTypes.contains(policyTypeId)) {
             return Mono.just(policyTypes.get(policyTypeId));
@@ -161,12 +168,6 @@ public class RicSynchronizationTask {
         return pt;
     }
 
-    private void deleteAllPoliciesInRepository(Ric ric) {
-        for (Policy policy : policies.getForRic(ric.id())) {
-            this.policies.remove(policy);
-        }
-    }
-
     private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
         logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().getRicId());
         return a1Client.putPolicy(policy) //
index 875698a..396a406 100644 (file)
@@ -157,14 +157,10 @@ class RicSupervisionTest {
         doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
         RIC_1.setState(RicState.UNAVAILABLE);
         rics.put(RIC_1);
-
         RicSupervision supervisorUnderTest = spy(createRicSupervision());
-
         doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask();
         doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any());
-
         supervisorUnderTest.checkAllRics();
-
         verify(supervisorUnderTest).checkAllRics();
         verify(supervisorUnderTest).createSynchronizationTask();
         verify(synchronizationTaskMock).synchronizeRic(RIC_1);
index 2e5424d..0ea0a8c 100644 (file)
@@ -56,6 +56,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -147,6 +148,21 @@ class RicSynchronizationTaskTest {
         assertThat(ric1.getSupportedPolicyTypeNames()).hasSize(1);
     }
 
+    @Test
+    void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() {
+        setUpCreationOfA1Client();
+        simulateRicWithNoPolicyTypes();
+        policies.put(policy1);
+        WebClientResponseException exception = new WebClientResponseException(404, "", null, null, null);
+        when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
+        RicSynchronizationTask synchronizerUnderTest = createTask();
+        ric1.setState(RicState.AVAILABLE);
+        synchronizerUnderTest.run(ric1);
+        await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
+        assertThat(policies.size()).isZero();
+        assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
+    }
+
     @Test
     void ricIdlePolicyTypeInRepo_thenSynchronizationWithReuseOfTypeFromRepoAndCorrectServiceNotified() {
         rics.put(ric1);
@@ -233,58 +249,6 @@ class RicSynchronizationTaskTest {
         assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
     }
 
-    @Test
-    void ricIdleAndErrorDeletingPoliciesFirstTime_thenSynchronizationWithDeletionOfPolicies() {
-        ric1.setState(RicState.AVAILABLE);
-        rics.put(ric1);
-
-        policies.put(policy1);
-
-        setUpCreationOfA1Client();
-        simulateRicWithNoPolicyTypes();
-
-        when(a1ClientMock.deleteAllPolicies()) //
-                .thenReturn(Flux.error(new Exception("Exception"))) //
-                .thenReturn(Flux.just("OK"));
-
-        RicSynchronizationTask synchronizerUnderTest = createTask();
-
-        ric1.setState(RicState.UNAVAILABLE);
-        synchronizerUnderTest.run(ric1);
-        await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
-
-        verify(a1ClientMock, times(2)).deleteAllPolicies();
-        verifyNoMoreInteractions(a1ClientMock);
-
-        assertThat(policyTypes.size()).isZero();
-        assertThat(policies.size()).isZero();
-        assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
-    }
-
-    @Test
-    void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() {
-        setUpCreationOfA1Client();
-        simulateRicWithNoPolicyTypes();
-
-        policies.put(policy1);
-
-        String originalErrorMessage = "Exception";
-        when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(new Exception(originalErrorMessage)));
-
-        RicSynchronizationTask synchronizerUnderTest = createTask();
-
-        ric1.setState(RicState.AVAILABLE);
-        synchronizerUnderTest.run(ric1);
-        await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
-
-        verify(a1ClientMock, times(2)).deleteAllPolicies();
-        verifyNoMoreInteractions(a1ClientMock);
-
-        assertThat(policyTypes.size()).isZero();
-        assertThat(policies.size()).isZero();
-        assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
-    }
-
     private void setUpCreationOfA1Client() {
         when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
         doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();