Some further simplifications and added test.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: I1ec98017d63047a0036db5ea12f770db00b1152b
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
-import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.slf4j.Logger;
.flatMap(this::parseConfiguration) //
.flatMap(this::updateConfig, CONCURRENCY) //
.flatMap(this::handleUpdatedRicConfig) //
- .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
+ .doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal));
}
private Flux<Long> regularInterval() {
return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
}
- /**
- * for an added RIC after a restart it is nesessary to get the suypported policy
- * types from the RIC unless a full synchronization is wanted.
- *
- * @param ric the ric to get supprted types from
- * @return the same ric
- */
- private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
- logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
- // Synchronize the policy types
- ric.setState(RicState.SYNCHRONIZING);
- return this.a1ClientFactory.createA1Client(ric) //
- .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
- .collectList() //
- .map(list -> ric) //
- .doOnNext(notUsed -> ric.setState(RicState.AVAILABLE)) //
- .doOnError(t -> {
- logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(), t.getMessage());
- ric.setState(RicState.UNAVAILABLE); //
- }) //
- .onErrorResume(t -> Mono.just(ric));
- }
-
public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
synchronized (this.rics) {
String ricId = updatedInfo.getRicConfig().getRicId();
RicConfigUpdate.Type event = updatedInfo.getType();
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
-
- return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
- .doOnNext(this::addRic) //
- .flatMap(this::notifyServicesRicAvailable) //
- .flatMap(notUsed -> Mono.just(event));
+ Ric ric = new Ric(updatedInfo.getRicConfig());
+ this.addRic(ric);
+ return this.synchronizationTask().synchronizeRic(ric) //
+ .map(notUsed -> event);
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
logger.debug("Added RIC: {}", ric.id());
}
- private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
- if (ric.getState() == RicState.AVAILABLE) {
- ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
- return callbacks.notifyServicesRicAvailable(ric, services) //
- .collectList() //
- .map(list -> ric);
- } else {
- return Mono.just(ric);
- }
- }
-
/**
* Reads the configuration from file.
*/
createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed"));
}
- private Flux<RicData> createTask() {
+ private Flux<Ric> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
.onErrorResume(t -> Flux.empty()) //
- .flatMap(this::checkOneRic, CONCURRENCY);
+ .flatMap(this::checkOneRic, CONCURRENCY) //
+ .map(ricData -> ricData.ric);
}
private Mono<RicData> checkOneRic(RicData ricData) {
final Instant startTime = Instant.now();
List<Thread> threads = new ArrayList<>();
List<ConcurrencyTestRunnable> tests = new ArrayList<>();
- a1ClientFactory.setResponseDelay(Duration.ofMillis(1));
+ a1ClientFactory.setResponseDelay(Duration.ofMillis(2));
addRic("ric");
addPolicyType("type1", "ric");
addPolicyType("type2", "ric");
+ final String NON_RESPONDING_RIC = "NonRespondingRic";
+ Ric nonRespondingRic = addRic(NON_RESPONDING_RIC);
+ MockA1Client a1Client = a1ClientFactory.getOrCreateA1Client(NON_RESPONDING_RIC);
+ a1Client.setErrorInject("errorInject");
+
for (int i = 0; i < 10; ++i) {
AsyncRestClient restClient = restClient();
ConcurrencyTestRunnable test =
}
assertThat(policies.size()).isZero();
logger.info("Concurrency test took " + Duration.between(startTime, Instant.now()));
+
+ assertThat(nonRespondingRic.getState()).isEqualTo(RicState.UNAVAILABLE);
+ nonRespondingRic.setState(RicState.AVAILABLE);
}
private AsyncRestClient restClient(String baseUrl, boolean useTrustValidation) {
private RefreshConfigTask createTestObject(boolean configFileExists, Rics rics, Policies policies,
boolean stubConfigFileExists) {
SecurityContext secContext = new SecurityContext("");
+
RefreshConfigTask obj =
spy(new RefreshConfigTask(configurationFileMock, appConfig, rics, policies, new Services(appConfig),
new PolicyTypes(appConfig), new A1ClientFactory(appConfig, secContext), secContext));
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.time.Instant;
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verifyNoMoreInteractions(supervisorUnderTest);
+ verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
}
doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask();
doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any());
supervisorUnderTest.checkAllRics();
- verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verifyNoMoreInteractions(supervisorUnderTest);
+ verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
assertThat(RIC_1.getState()).isEqualTo(RicState.SYNCHRONIZING);
}
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verifyNoMoreInteractions(supervisorUnderTest);
+ verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
- assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
+ assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@Test
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verifyNoMoreInteractions(supervisorUnderTest);
+ verify(synchronizationTaskMock, times(0)).synchronizeRic(RIC_1);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
+
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
- verify(supervisorUnderTest).createSynchronizationTask();
verify(synchronizationTaskMock).synchronizeRic(RIC_1);
- verifyNoMoreInteractions(supervisorUnderTest);
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
import java.util.List;
import java.util.Vector;
+import lombok.Setter;
+
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
public class MockA1Client implements A1Client {
Policies policies;
private final PolicyTypes policyTypes;
- private final Duration asynchDelay;
+
+ @Setter
+ private Duration asynchDelay;
+
+ @Setter
+ private String errorInject;
public MockA1Client(String ricId, ApplicationConfig appConfig, PolicyTypes policyTypes, Duration asynchDelay) {
this.policyTypes = policyTypes;
}
private <T> Mono<T> mono(T value) {
- if (this.asynchDelay.isZero()) {
- return Mono.just(value);
- } else {
- return Mono.create(monoSink -> asynchResponse(monoSink, value));
+ Mono<T> res = Mono.just(value);
+ if (!this.asynchDelay.isZero()) {
+ res = Mono.create(monoSink -> asynchResponse(monoSink, value));
}
+
+ if (this.errorInject != null) {
+ res = res.flatMap(x -> monoError(this.errorInject, HttpStatus.BAD_GATEWAY));
+ }
+
+ return res;
}
- public static Mono<String> monoError(String responseBody, HttpStatus status) {
+ public static <T> Mono<T> monoError(String responseBody, HttpStatus status) {
byte[] responseBodyBytes = responseBody.getBytes(StandardCharsets.UTF_8);
WebClientResponseException a1Exception = new WebClientResponseException(status.value(),
status.getReasonPhrase(), null, responseBodyBytes, StandardCharsets.UTF_8, null);