ONAP PMS - new RICs must be locked before synch 38/132838/2
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 23 Dec 2022 14:44:27 +0000 (15:44 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 28 Dec 2022 09:47:11 +0000 (10:47 +0100)
Bugfix, new RICs must also be locked before synch. Otherwise other activities may interfere.

Improved the synch. Previously, all policies were removed from the NearRT-RIC and eventually recreated.
After this fix, only unknwon policies are removed.

Change-Id: Ic6224aeb93ef91579cfb8894329538baf1829283
Issue-ID: CCSDK-3827
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
14 files changed:
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/CcsdkA1AdapterClient.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion2.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/MultiMap.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTaskTest.java
a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/utils/MockA1Client.java

index a691ee1..21636a3 100644 (file)
@@ -20,7 +20,9 @@
 
 package org.onap.ccsdk.oran.a1policymanagementservice.clients;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
@@ -61,7 +63,11 @@ public interface A1Client {
 
     public Mono<String> deletePolicy(Policy policy);
 
-    public Flux<String> deleteAllPolicies();
+    public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds);
+
+    default Flux<String> deleteAllPolicies() {
+        return deleteAllPolicies(Collections.emptySet());
+    }
 
     public Mono<String> getPolicyStatus(Policy policy);
 
index 75ba251..45c0bc4 100644 (file)
@@ -30,7 +30,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import reactor.core.publisher.Mono;
 
 /**
@@ -45,7 +44,6 @@ public class A1ClientFactory {
 
     private final AsyncRestClientFactory restClientFactory;
 
-    @Autowired
     public A1ClientFactory(ApplicationConfig appConfig, SecurityContext securityContext) {
         this.appConfig = appConfig;
         this.restClientFactory = new AsyncRestClientFactory(appConfig.getWebClientConfig(), securityContext);
index df6faad..9c32d79 100644 (file)
@@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 
 import lombok.Getter;
 
@@ -190,15 +191,16 @@ public class CcsdkA1AdapterClient implements A1Client {
     }
 
     @Override
-    public Flux<String> deleteAllPolicies() {
+    public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
         if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V1_1) {
             return getPolicyIds() //
+                    .filter(policyId -> !excludePolicyIds.contains(policyId)) //
                     .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); //
         } else {
             A1UriBuilder uriBuilder = this.getUriBuilder();
             return getPolicyTypeIdentities() //
                     .flatMapMany(Flux::fromIterable) //
-                    .flatMap(type -> deleteAllInstancesForType(uriBuilder, type), CONCURRENCY_RIC);
+                    .flatMap(type -> deleteAllInstancesForType(uriBuilder, type, excludePolicyIds), CONCURRENCY_RIC);
         }
     }
 
@@ -207,9 +209,10 @@ public class CcsdkA1AdapterClient implements A1Client {
                 .flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString);
     }
 
-    private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type) {
+    private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type, Set<String> excludePolicyIds) {
         return getInstancesForType(uriBuilder, type) //
-                .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC);
+                .filter(policyId -> !excludePolicyIds.contains(policyId)) //
+                .flatMap(policyId -> deletePolicyById(type, policyId), CONCURRENCY_RIC);
     }
 
     @Override
index dfe33e7..1d567a8 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients;
 
 import java.lang.invoke.MethodHandles;
 import java.util.List;
+import java.util.Set;
 
 import org.json.JSONObject;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
@@ -182,9 +183,9 @@ public class OscA1Client implements A1Client {
     }
 
     @Override
-    public Flux<String> deleteAllPolicies() {
+    public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
         return getPolicyTypeIds() //
-                .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
+                .flatMap(typeId -> deletePoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC);
     }
 
     @Override
@@ -209,8 +210,9 @@ public class OscA1Client implements A1Client {
         return restClient.delete(policyUri);
     }
 
-    private Flux<String> deletePoliciesForType(String typeId) {
+    private Flux<String> deletePoliciesForType(String typeId, Set<String> excludePolicyIds) {
         return getPolicyIdentitiesByType(typeId) //
+                .filter(policyId -> !excludePolicyIds.contains(policyId)) //
                 .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
     }
 }
index 5eae775..750008d 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
@@ -134,8 +135,9 @@ public class StdA1ClientVersion1 implements A1Client {
     }
 
     @Override
-    public Flux<String> deleteAllPolicies() {
+    public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
         return getPolicyIds() //
+                .filter(policyId -> !excludePolicyIds.contains(policyId)) //
                 .flatMap(this::deletePolicyById); //
     }
 
index 0022057..24990a1 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients;
 
 import java.lang.invoke.MethodHandles;
 import java.util.List;
+import java.util.Set;
 
 import org.json.JSONObject;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
@@ -193,16 +194,15 @@ public class StdA1ClientVersion2 implements A1Client {
     }
 
     @Override
-    public Flux<String> deleteAllPolicies() {
+    public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
         return getPolicyTypeIds() //
-                .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
+                .flatMap(typeId -> deleteAllPoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC);
     }
 
     @Override
     public Mono<String> getPolicyStatus(Policy policy) {
         String statusUri = uriBuiler.createGetPolicyStatusUri(policy.getType().getId(), policy.getId());
         return restClient.get(statusUri);
-
     }
 
     private Flux<String> getPolicyTypeIds() {
@@ -220,8 +220,9 @@ public class StdA1ClientVersion2 implements A1Client {
         return restClient.delete(policyUri);
     }
 
-    private Flux<String> deletePoliciesForType(String typeId) {
+    private Flux<String> deleteAllPoliciesForType(String typeId, Set<String> excludePolicyIds) {
         return getPolicyIdentitiesByType(typeId) //
+                .filter(policyId -> !excludePolicyIds.contains(policyId)) //
                 .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
     }
 }
index ff09ba3..6f15287 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.Vector;
 
 /**
@@ -58,6 +59,14 @@ public class MultiMap<T> {
         return new Vector<>(innerMap.values());
     }
 
+    public Set<String> keySet(String key) {
+        Map<String, T> innerMap = this.map.get(key);
+        if (innerMap == null) {
+            return Collections.emptySet();
+        }
+        return innerMap.keySet();
+    }
+
     public void clear() {
         this.map.clear();
     }
index 6161fbd..77ac0f4 100644 (file)
@@ -71,11 +71,9 @@ public class Policies {
     private MultiMap<Policy> policiesType = new MultiMap<>();
     private final DataStore dataStore;
 
-    private final ApplicationConfig appConfig;
     private static Gson gson = new GsonBuilder().create();
 
     public Policies(@Autowired ApplicationConfig appConfig) {
-        this.appConfig = appConfig;
         this.dataStore = DataStore.create(appConfig, "policies");
     }
 
@@ -139,6 +137,10 @@ public class Policies {
         return policiesRic.get(ric);
     }
 
+    public synchronized Set<String> getPolicyIdsForRic(String ricId) {
+        return policiesRic.keySet(ricId);
+    }
+
     public synchronized Collection<Policy> getForType(String type) {
         return policiesType.get(type);
     }
index 408a0d5..978ae1c 100644 (file)
@@ -36,6 +36,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
@@ -43,7 +44,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
@@ -76,7 +76,7 @@ public class RefreshConfigTask {
     @Getter(AccessLevel.PROTECTED)
     private Disposable refreshTask = null;
 
-    private final Rics rics;
+    final Rics rics;
     private final A1ClientFactory a1ClientFactory;
     private final Policies policies;
     private final Services services;
@@ -85,7 +85,6 @@ public class RefreshConfigTask {
 
     private long fileLastModified = 0;
 
-    @Autowired
     public RefreshConfigTask(ConfigurationFile configurationFile, ApplicationConfig appConfig, Rics rics,
             Policies policies, Services services, PolicyTypes policyTypes, A1ClientFactory a1ClientFactory,
             SecurityContext securityContext) {
@@ -161,7 +160,10 @@ public class RefreshConfigTask {
 
     private void removePoliciciesInRic(@Nullable Ric ric) {
         if (ric != null) {
-            synchronizationTask().run(ric);
+            ric.getLock().lock(LockType.EXCLUSIVE, "removedRic") //
+                    .flatMap(notUsed -> synchronizationTask().synchronizeRic(ric)) //
+                    .doFinally(sig -> ric.getLock().unlockBlocking()) //
+                    .subscribe();
         }
     }
 
@@ -176,9 +178,15 @@ public class RefreshConfigTask {
             if (event == RicConfigUpdate.Type.ADDED) {
                 logger.debug("RIC added {}", ricId);
                 Ric ric = new Ric(updatedInfo.getRicConfig());
-                this.addRic(ric);
-                return this.synchronizationTask().synchronizeRic(ric) //
-                        .map(notUsed -> event);
+
+                return ric.getLock().lock(LockType.EXCLUSIVE, "addedRic") //
+                        .doOnNext(grant -> this.rics.put(ric)) //
+                        .flatMapMany(grant -> this.policies.restoreFromDatabase(ric, this.policyTypes)) //
+                        .collectList() //
+                        .doOnNext(l -> logger.debug("Starting sycnhronization for new RIC: {}", ric.id())) //
+                        .flatMap(grant -> synchronizationTask().synchronizeRic(ric)) //
+                        .map(notUsed -> event) //
+                        .doFinally(sig -> ric.getLock().unlockBlocking());
             } else if (event == RicConfigUpdate.Type.REMOVED) {
                 logger.debug("RIC removed {}", ricId);
                 Ric ric = rics.remove(ricId);
@@ -189,7 +197,7 @@ public class RefreshConfigTask {
                 Ric ric = this.rics.get(ricId);
                 if (ric == null) {
                     logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
-                    addRic(new Ric(updatedInfo.getRicConfig()));
+                    this.rics.put(new Ric(updatedInfo.getRicConfig()));
                 } else {
                     ric.setRicConfig(updatedInfo.getRicConfig());
                 }
@@ -198,12 +206,6 @@ public class RefreshConfigTask {
         }
     }
 
-    void addRic(Ric ric) {
-        this.rics.put(ric);
-        this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe();
-        logger.debug("Added RIC: {}", ric.id());
-    }
-
     /**
      * Reads the configuration from file.
      */
index fdeb47e..1f612e4 100644 (file)
@@ -37,7 +37,6 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
@@ -86,7 +85,6 @@ public class RicSupervision {
         private final A1Client a1Client;
     }
 
-    @Autowired
     public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
             Services services, ApplicationConfig config, SecurityContext securityContext) {
         this.rics = rics;
index b3afa7c..d1bd433 100644 (file)
@@ -22,11 +22,12 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
 
 import static org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
 
+import java.util.Set;
+
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
 import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory;
 import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
-import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
@@ -77,21 +78,11 @@ public class RicSynchronizationTask {
         this.rics = rics;
     }
 
-    public void run(Ric ric) {
-        logger.debug("Ric synchronization task created: {}", ric.getConfig().getRicId());
-
-        if (ric.getState() == RicState.SYNCHRONIZING) {
-            logger.debug("Ric: {} is already being synchronized", ric.getConfig().getRicId());
-            return;
-        }
-
-        ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") //
-                .flatMap(notUsed -> synchronizeRic(ric)) //
-                .doFinally(sig -> ric.getLock().unlockBlocking()) //
-                .subscribe();
-    }
-
     public Mono<Ric> synchronizeRic(Ric ric) {
+        if (ric.getLock().getLockCounter() != 1) {
+            logger.error("Exclusive lock is required to run synchronization, ric: {}", ric.id());
+            return Mono.empty();
+        }
         return this.a1ClientFactory.createA1Client(ric) //
                 .doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) //
                 .flatMapMany(client -> runSynchronization(ric, client)) //
@@ -143,7 +134,8 @@ public class RicSynchronizationTask {
 
     private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
         Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
-        Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
+        Set<String> excludeFromDelete = this.policies.getPolicyIdsForRic(ric.id());
+        Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies(excludeFromDelete);
         Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
 
         return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
index 2ecb9c2..45f2bcb 100644 (file)
@@ -81,7 +81,7 @@ class RefreshConfigTaskTest {
             .build();
 
     private RefreshConfigTask createTestObject(boolean configFileExists) {
-        return createTestObject(configFileExists, new Rics(), new Policies(appConfig), true);
+        return createTestObject(configFileExists, spy(new Rics()), new Policies(appConfig), true);
     }
 
     private RefreshConfigTask createTestObject(boolean configFileExists, Rics rics, Policies policies,
@@ -116,7 +116,7 @@ class RefreshConfigTaskTest {
         // Then
         verify(refreshTaskUnderTest, atLeastOnce()).loadConfigurationFromFile();
 
-        verify(refreshTaskUnderTest, times(2)).addRic(any(Ric.class));
+        verify(refreshTaskUnderTest.rics, times(2)).put(any(Ric.class));
 
         Iterable<RicConfig> ricConfigs = appConfig.getRicConfigs();
         RicConfig ricConfig = ricConfigs.iterator().next();
index 6386441..ae4e92b 100644 (file)
@@ -23,11 +23,10 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
@@ -48,6 +47,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
@@ -130,36 +130,15 @@ class RicSynchronizationTaskTest {
                 rics);
     };
 
-    @Test
-    void ricAlreadySynchronizing_thenNoSynchronization() {
-        ric1.setState(RicState.SYNCHRONIZING);
-        ric1.addSupportedPolicyType(POLICY_TYPE_1);
-
-        policyTypes.put(POLICY_TYPE_1);
-        policies.put(policy1);
-
-        RicSynchronizationTask synchronizerUnderTest = createTask();
-
-        synchronizerUnderTest.run(ric1);
-
-        verifyNoInteractions(a1ClientMock);
-
-        assertThat(policyTypes.size()).isEqualTo(1);
-        assertThat(policies.size()).isEqualTo(1);
-        assertThat(ric1.getState()).isEqualTo(RicState.SYNCHRONIZING);
-        assertThat(ric1.getSupportedPolicyTypeNames()).hasSize(1);
-    }
-
     @Test
     void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() {
         setUpCreationOfA1Client();
         simulateRicWithNoPolicyTypes();
         policies.put(policy1);
         WebClientResponseException exception = new WebClientResponseException(404, "", null, null, null);
-        when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
-        RicSynchronizationTask synchronizerUnderTest = createTask();
+        when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.error(exception));
         ric1.setState(RicState.AVAILABLE);
-        synchronizerUnderTest.run(ric1);
+        runSynch(ric1);
         await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
         assertThat(policies.size()).isZero();
         assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
@@ -172,10 +151,9 @@ class RicSynchronizationTaskTest {
         policies.put(policy1);
         WebClientRequestException exception =
                 new WebClientRequestException(new ServiceException("x"), null, null, null);
-        when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
-        RicSynchronizationTask synchronizerUnderTest = createTask();
+        when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.error(exception));
         ric1.setState(RicState.AVAILABLE);
-        synchronizerUnderTest.run(ric1);
+        runSynch(ric1);
         await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
     }
 
@@ -193,17 +171,13 @@ class RicSynchronizationTaskTest {
         setUpCreationOfA1Client();
         simulateRicWithOnePolicyType();
 
-        RicSynchronizationTask synchronizerUnderTest = spy(createTask());
-
         ric1.setState(RicState.UNAVAILABLE);
-        synchronizerUnderTest.run(ric1);
+        runSynch(ric1);
         await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
 
         verify(a1ClientMock, times(1)).getPolicyTypeIdentities();
         verifyNoMoreInteractions(a1ClientMock);
 
-        verify(synchronizerUnderTest).run(ric1);
-
         assertThat(policyTypes.size()).isEqualTo(1);
         assertThat(policies.size()).isZero();
         assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
@@ -219,10 +193,8 @@ class RicSynchronizationTaskTest {
         String typeSchema = "schema";
         when(a1ClientMock.getPolicyTypeSchema(POLICY_TYPE_1_NAME)).thenReturn(Mono.just(typeSchema));
 
-        RicSynchronizationTask synchronizerUnderTest = createTask();
-
         ric1.setState(RicState.UNAVAILABLE);
-        synchronizerUnderTest.run(ric1);
+        runSynch(ric1);
         await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
 
         verify(a1ClientMock).getPolicyTypeIdentities();
@@ -247,16 +219,14 @@ class RicSynchronizationTaskTest {
         setUpCreationOfA1Client();
         simulateRicWithNoPolicyTypes();
 
-        when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.just("OK"));
+        when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.just("OK"));
         when(a1ClientMock.putPolicy(any(Policy.class))).thenReturn(Mono.just("OK"));
 
-        RicSynchronizationTask synchronizerUnderTest = createTask();
-
         ric1.setState(RicState.UNAVAILABLE);
-        synchronizerUnderTest.run(ric1);
+        runSynch(ric1);
         await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
 
-        verify(a1ClientMock).deleteAllPolicies();
+        verify(a1ClientMock).deleteAllPolicies(anySet());
         verify(a1ClientMock).putPolicy(policy1);
         verifyNoMoreInteractions(a1ClientMock);
 
@@ -265,9 +235,17 @@ class RicSynchronizationTaskTest {
         assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
     }
 
+    private void runSynch(Ric ric) {
+        RicSynchronizationTask synchronizerUnderTest = createTask();
+        ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") //
+                .flatMap(notUsed -> synchronizerUnderTest.synchronizeRic(ric)) //
+                .doFinally(sig -> ric.getLock().unlockBlocking()) //
+                .block();
+    }
+
     private void setUpCreationOfA1Client() {
         when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
-        doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
+        doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies(anySet());
     }
 
     private void simulateRicWithOnePolicyType() {
index b76f1e7..80bfff7 100644 (file)
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.List;
+import java.util.Set;
 import java.util.Vector;
 
 import lombok.Setter;
@@ -112,7 +113,7 @@ public class MockA1Client implements A1Client {
     }
 
     @Override
-    public Flux<String> deleteAllPolicies() {
+    public Flux<String> deleteAllPolicies(Set<String> excludePolicyId) {
         this.policies.clear();
         return mono("OK") //
                 .flatMapMany(Flux::just);