Bugfix, new RICs must also be locked before synch. Otherwise other activities may interfere.
Improved the synch. Previously, all policies were removed from the NearRT-RIC and eventually recreated.
After this fix, only unknwon policies are removed.
Change-Id: Ic6224aeb93ef91579cfb8894329538baf1829283
Issue-ID: CCSDK-3827
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
public Mono<String> deletePolicy(Policy policy);
- public Flux<String> deleteAllPolicies();
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds);
+
+ default Flux<String> deleteAllPolicies() {
+ return deleteAllPolicies(Collections.emptySet());
+ }
public Mono<String> getPolicyStatus(Policy policy);
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Mono;
/**
private final AsyncRestClientFactory restClientFactory;
- @Autowired
public A1ClientFactory(ApplicationConfig appConfig, SecurityContext securityContext) {
this.appConfig = appConfig;
this.restClientFactory = new AsyncRestClientFactory(appConfig.getWebClientConfig(), securityContext);
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import lombok.Getter;
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
if (this.protocolType == A1ProtocolType.CCSDK_A1_ADAPTER_STD_V1_1) {
return getPolicyIds() //
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
.flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); //
} else {
A1UriBuilder uriBuilder = this.getUriBuilder();
return getPolicyTypeIdentities() //
.flatMapMany(Flux::fromIterable) //
- .flatMap(type -> deleteAllInstancesForType(uriBuilder, type), CONCURRENCY_RIC);
+ .flatMap(type -> deleteAllInstancesForType(uriBuilder, type, excludePolicyIds), CONCURRENCY_RIC);
}
}
.flatMapMany(A1AdapterJsonHelper::parseJsonArrayOfString);
}
- private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type) {
+ private Flux<String> deleteAllInstancesForType(A1UriBuilder uriBuilder, String type, Set<String> excludePolicyIds) {
return getInstancesForType(uriBuilder, type) //
- .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC);
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
+ .flatMap(policyId -> deletePolicyById(type, policyId), CONCURRENCY_RIC);
}
@Override
import java.lang.invoke.MethodHandles;
import java.util.List;
+import java.util.Set;
import org.json.JSONObject;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
return getPolicyTypeIds() //
- .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
+ .flatMap(typeId -> deletePoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC);
}
@Override
return restClient.delete(policyUri);
}
- private Flux<String> deletePoliciesForType(String typeId) {
+ private Flux<String> deletePoliciesForType(String typeId, Set<String> excludePolicyIds) {
return getPolicyIdentitiesByType(typeId) //
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
.flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
}
}
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
return getPolicyIds() //
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
.flatMap(this::deletePolicyById); //
}
import java.lang.invoke.MethodHandles;
import java.util.List;
+import java.util.Set;
import org.json.JSONObject;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyIds) {
return getPolicyTypeIds() //
- .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
+ .flatMap(typeId -> deleteAllPoliciesForType(typeId, excludePolicyIds), CONCURRENCY_RIC);
}
@Override
public Mono<String> getPolicyStatus(Policy policy) {
String statusUri = uriBuiler.createGetPolicyStatusUri(policy.getType().getId(), policy.getId());
return restClient.get(statusUri);
-
}
private Flux<String> getPolicyTypeIds() {
return restClient.delete(policyUri);
}
- private Flux<String> deletePoliciesForType(String typeId) {
+ private Flux<String> deleteAllPoliciesForType(String typeId, Set<String> excludePolicyIds) {
return getPolicyIdentitiesByType(typeId) //
+ .filter(policyId -> !excludePolicyIds.contains(policyId)) //
.flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
}
}
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.Vector;
/**
return new Vector<>(innerMap.values());
}
+ public Set<String> keySet(String key) {
+ Map<String, T> innerMap = this.map.get(key);
+ if (innerMap == null) {
+ return Collections.emptySet();
+ }
+ return innerMap.keySet();
+ }
+
public void clear() {
this.map.clear();
}
private MultiMap<Policy> policiesType = new MultiMap<>();
private final DataStore dataStore;
- private final ApplicationConfig appConfig;
private static Gson gson = new GsonBuilder().create();
public Policies(@Autowired ApplicationConfig appConfig) {
- this.appConfig = appConfig;
this.dataStore = DataStore.create(appConfig, "policies");
}
return policiesRic.get(ric);
}
+ public synchronized Set<String> getPolicyIdsForRic(String ricId) {
+ return policiesRic.keySet(ricId);
+ }
+
public synchronized Collection<Policy> getForType(String type) {
return policiesType.get(type);
}
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Getter(AccessLevel.PROTECTED)
private Disposable refreshTask = null;
- private final Rics rics;
+ final Rics rics;
private final A1ClientFactory a1ClientFactory;
private final Policies policies;
private final Services services;
private long fileLastModified = 0;
- @Autowired
public RefreshConfigTask(ConfigurationFile configurationFile, ApplicationConfig appConfig, Rics rics,
Policies policies, Services services, PolicyTypes policyTypes, A1ClientFactory a1ClientFactory,
SecurityContext securityContext) {
private void removePoliciciesInRic(@Nullable Ric ric) {
if (ric != null) {
- synchronizationTask().run(ric);
+ ric.getLock().lock(LockType.EXCLUSIVE, "removedRic") //
+ .flatMap(notUsed -> synchronizationTask().synchronizeRic(ric)) //
+ .doFinally(sig -> ric.getLock().unlockBlocking()) //
+ .subscribe();
}
}
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
Ric ric = new Ric(updatedInfo.getRicConfig());
- this.addRic(ric);
- return this.synchronizationTask().synchronizeRic(ric) //
- .map(notUsed -> event);
+
+ return ric.getLock().lock(LockType.EXCLUSIVE, "addedRic") //
+ .doOnNext(grant -> this.rics.put(ric)) //
+ .flatMapMany(grant -> this.policies.restoreFromDatabase(ric, this.policyTypes)) //
+ .collectList() //
+ .doOnNext(l -> logger.debug("Starting sycnhronization for new RIC: {}", ric.id())) //
+ .flatMap(grant -> synchronizationTask().synchronizeRic(ric)) //
+ .map(notUsed -> event) //
+ .doFinally(sig -> ric.getLock().unlockBlocking());
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
Ric ric = this.rics.get(ricId);
if (ric == null) {
logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
- addRic(new Ric(updatedInfo.getRicConfig()));
+ this.rics.put(new Ric(updatedInfo.getRicConfig()));
} else {
ric.setRicConfig(updatedInfo.getRicConfig());
}
}
}
- void addRic(Ric ric) {
- this.rics.put(ric);
- this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe();
- logger.debug("Added RIC: {}", ric.id());
- }
-
/**
* Reads the configuration from file.
*/
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
private final A1Client a1Client;
}
- @Autowired
public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services, ApplicationConfig config, SecurityContext securityContext) {
this.rics = rics;
import static org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
+import java.util.Set;
+
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
-import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
this.rics = rics;
}
- public void run(Ric ric) {
- logger.debug("Ric synchronization task created: {}", ric.getConfig().getRicId());
-
- if (ric.getState() == RicState.SYNCHRONIZING) {
- logger.debug("Ric: {} is already being synchronized", ric.getConfig().getRicId());
- return;
- }
-
- ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") //
- .flatMap(notUsed -> synchronizeRic(ric)) //
- .doFinally(sig -> ric.getLock().unlockBlocking()) //
- .subscribe();
- }
-
public Mono<Ric> synchronizeRic(Ric ric) {
+ if (ric.getLock().getLockCounter() != 1) {
+ logger.error("Exclusive lock is required to run synchronization, ric: {}", ric.id());
+ return Mono.empty();
+ }
return this.a1ClientFactory.createA1Client(ric) //
.doOnNext(client -> ric.setState(RicState.SYNCHRONIZING)) //
.flatMapMany(client -> runSynchronization(ric, client)) //
private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
- Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
+ Set<String> excludeFromDelete = this.policies.getPolicyIdsForRic(ric.id());
+ Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies(excludeFromDelete);
Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
.build();
private RefreshConfigTask createTestObject(boolean configFileExists) {
- return createTestObject(configFileExists, new Rics(), new Policies(appConfig), true);
+ return createTestObject(configFileExists, spy(new Rics()), new Policies(appConfig), true);
}
private RefreshConfigTask createTestObject(boolean configFileExists, Rics rics, Policies policies,
// Then
verify(refreshTaskUnderTest, atLeastOnce()).loadConfigurationFromFile();
- verify(refreshTaskUnderTest, times(2)).addRic(any(Ric.class));
+ verify(refreshTaskUnderTest.rics, times(2)).put(any(Ric.class));
Iterable<RicConfig> ricConfigs = appConfig.getRicConfigs();
RicConfig ricConfig = ricConfigs.iterator().next();
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
rics);
};
- @Test
- void ricAlreadySynchronizing_thenNoSynchronization() {
- ric1.setState(RicState.SYNCHRONIZING);
- ric1.addSupportedPolicyType(POLICY_TYPE_1);
-
- policyTypes.put(POLICY_TYPE_1);
- policies.put(policy1);
-
- RicSynchronizationTask synchronizerUnderTest = createTask();
-
- synchronizerUnderTest.run(ric1);
-
- verifyNoInteractions(a1ClientMock);
-
- assertThat(policyTypes.size()).isEqualTo(1);
- assertThat(policies.size()).isEqualTo(1);
- assertThat(ric1.getState()).isEqualTo(RicState.SYNCHRONIZING);
- assertThat(ric1.getSupportedPolicyTypeNames()).hasSize(1);
- }
-
@Test
void ricIdleAndErrorDeletingPoliciesAllTheTime_thenSynchronizationWithFailedRecovery() {
setUpCreationOfA1Client();
simulateRicWithNoPolicyTypes();
policies.put(policy1);
WebClientResponseException exception = new WebClientResponseException(404, "", null, null, null);
- when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
- RicSynchronizationTask synchronizerUnderTest = createTask();
+ when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.error(exception));
ric1.setState(RicState.AVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
assertThat(policies.size()).isZero();
assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
policies.put(policy1);
WebClientRequestException exception =
new WebClientRequestException(new ServiceException("x"), null, null, null);
- when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
- RicSynchronizationTask synchronizerUnderTest = createTask();
+ when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.error(exception));
ric1.setState(RicState.AVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
}
setUpCreationOfA1Client();
simulateRicWithOnePolicyType();
- RicSynchronizationTask synchronizerUnderTest = spy(createTask());
-
ric1.setState(RicState.UNAVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
verify(a1ClientMock, times(1)).getPolicyTypeIdentities();
verifyNoMoreInteractions(a1ClientMock);
- verify(synchronizerUnderTest).run(ric1);
-
assertThat(policyTypes.size()).isEqualTo(1);
assertThat(policies.size()).isZero();
assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
String typeSchema = "schema";
when(a1ClientMock.getPolicyTypeSchema(POLICY_TYPE_1_NAME)).thenReturn(Mono.just(typeSchema));
- RicSynchronizationTask synchronizerUnderTest = createTask();
-
ric1.setState(RicState.UNAVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
verify(a1ClientMock).getPolicyTypeIdentities();
setUpCreationOfA1Client();
simulateRicWithNoPolicyTypes();
- when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.just("OK"));
+ when(a1ClientMock.deleteAllPolicies(anySet())).thenReturn(Flux.just("OK"));
when(a1ClientMock.putPolicy(any(Policy.class))).thenReturn(Mono.just("OK"));
- RicSynchronizationTask synchronizerUnderTest = createTask();
-
ric1.setState(RicState.UNAVAILABLE);
- synchronizerUnderTest.run(ric1);
+ runSynch(ric1);
await().untilAsserted(() -> RicState.AVAILABLE.equals(ric1.getState()));
- verify(a1ClientMock).deleteAllPolicies();
+ verify(a1ClientMock).deleteAllPolicies(anySet());
verify(a1ClientMock).putPolicy(policy1);
verifyNoMoreInteractions(a1ClientMock);
assertThat(ric1.getState()).isEqualTo(RicState.AVAILABLE);
}
+ private void runSynch(Ric ric) {
+ RicSynchronizationTask synchronizerUnderTest = createTask();
+ ric.getLock().lock(LockType.EXCLUSIVE, "RicSynchronizationTask") //
+ .flatMap(notUsed -> synchronizerUnderTest.synchronizeRic(ric)) //
+ .doFinally(sig -> ric.getLock().unlockBlocking()) //
+ .block();
+ }
+
private void setUpCreationOfA1Client() {
when(a1ClientFactoryMock.createA1Client(any(Ric.class))).thenReturn(Mono.just(a1ClientMock));
- doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies();
+ doReturn(Flux.empty()).when(a1ClientMock).deleteAllPolicies(anySet());
}
private void simulateRicWithOnePolicyType() {
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
+import java.util.Set;
import java.util.Vector;
import lombok.Setter;
}
@Override
- public Flux<String> deleteAllPolicies() {
+ public Flux<String> deleteAllPolicies(Set<String> excludePolicyId) {
this.policies.clear();
return mono("OK") //
.flatMapMany(Flux::just);