Bugfix,improved traces, avoiding synch for RICs after restart.
Change-Id: I35ae834cd73cde6b108b941aa0f2c43eeda9379e
Issue-ID: CCSDK-3256
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
"tags": ["A1 Policy Management V1.0"]
},
"delete": {
- "summary": "Unregisters a service",
+ "summary": "Unregister a service",
"operationId": "deleteService",
"responses": {
"204": {
- "description": "Service unregisterred",
+ "description": "Service unregistered",
"content": {"*/*": {"schema": {"$ref": "#/components/schemas/void"}}}
},
"404": {
delete:
tags:
- A1 Policy Management V1.0
- summary: Unregisters a service
+ summary: Unregister a service
operationId: deleteService
parameters:
- name: name
type: string
responses:
204:
- description: Service unregisterred
+ description: Service unregistered
content:
'*/*':
schema:
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
org.onap.ccsdk.oran.a1policymanagementservice: INFO
+ # org.onap.ccsdk.oran.a1policymanagementservice.tasks: TRACE
file:
name: /var/log/policy-agent/application.log
server:
# The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
http.proxy-host:
http.proxy-port: 0
- # path where the service can store data
- vardata-directory: /var/policy-management-service
+ # path where the service can store data
+ vardata-directory: /var/policy-management-service
@Getter
private final Type type;
- RicConfigUpdate(RicConfig ric, Type event) {
- this.ricConfig = ric;
+ public RicConfigUpdate(RicConfig config, Type event) {
+ this.ricConfig = config;
this.type = event;
}
}
}
}
- @Operation(summary = "Unregisters a service")
+ @Operation(summary = "Unregister a service")
@ApiResponses(value = { //
- @ApiResponse(responseCode = "204", description = "Service unregisterred", //
+ @ApiResponse(responseCode = "204", description = "Service unregistered", //
content = @Content(schema = @Schema(implementation = VoidResponse.class))),
@ApiResponse(responseCode = "404", description = "Service not found", //
content = @Content(schema = @Schema(implementation = String.class)))})
import java.io.File;
import java.io.FileOutputStream;
-import java.io.IOException;
import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.FileSystemUtils;
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
@Configuration
public class Policies {
if (!policy.isTransient()) {
try {
Files.delete(getPath(policy));
- } catch (IOException | ServiceException e) {
+ } catch (Exception e) {
logger.debug("Could not delete policy from database: {}", e.getMessage());
}
}
if (this.appConfig.getVardataDirectory() != null) {
FileSystemUtils.deleteRecursively(getDatabasePath());
}
- } catch (IOException | ServiceException e) {
+ } catch (Exception e) {
logger.warn("Could not delete policy database : {}", e.getMessage());
}
}
return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json");
}
- public void restoreFromDatabase(Ric ric, PolicyTypes types) {
-
+ public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) {
try {
Files.createDirectories(getDatabasePath(ric));
for (File file : getDatabasePath(ric).toFile().listFiles()) {
PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class);
this.put(toPolicy(policyStorage, ric, types));
}
- } catch (ServiceException | IOException e) {
+ logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(),
+ this.policiesRic.get(ric.id()).size());
+ } catch (Exception e) {
logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage());
}
}
PolicyType type = gson.fromJson(json, PolicyType.class);
this.types.put(type.getId(), type);
}
-
+ logger.debug("Restored type database,no of types: {}", this.types.size());
} catch (IOException e) {
logger.warn("Could not restore policy type database : {}", e.getMessage());
} catch (ServiceException e) {
* @return a vector containing the nodes managed by this Ric.
*/
public synchronized Collection<String> getManagedElementIds() {
- return ricConfig.managedElementIds();
+ return new Vector<>(ricConfig.managedElementIds());
}
/**
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
* configuration file.
*/
@Component
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class RefreshConfigTask {
private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
}
Flux<RicConfigUpdate.Type> createRefreshTask() {
- Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, configRefreshInterval) //
+ Flux<JsonObject> loadFromFile = regularInterval() //
.filter(notUsed -> !this.isConsulUsed) //
.flatMap(notUsed -> loadConfigurationFromFile()) //
.onErrorResume(this::ignoreErrorFlux) //
.doOnNext(json -> logger.debug("loadFromFile succeeded")) //
.doOnTerminate(() -> logger.error("loadFromFile Terminate"));
- Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, configRefreshInterval) //
+ Flux<JsonObject> loadFromConsul = regularInterval() //
.flatMap(i -> getEnvironment(systemEnvironment)) //
.flatMap(this::createCbsClient) //
.flatMap(this::getFromCbs) //
.doOnNext(json -> this.isConsulUsed = true) //
.doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
+ final int CONCURRENCY = 50; // Number of RIC synched in paralell
+
return Flux.merge(loadFromFile, loadFromConsul) //
.flatMap(this::parseConfiguration) //
- .flatMap(this::updateConfig) //
- .doOnNext(this::handleUpdatedRicConfig) //
- .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
+ .flatMap(this::updateConfig, CONCURRENCY) //
+ .flatMap(this::handleUpdatedRicConfig) //
.doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
}
+ private Flux<Long> regularInterval() {
+ return Flux.interval(Duration.ZERO, configRefreshInterval) //
+ .onBackpressureDrop() //
+ .limitRate(1); // Limit so that only one event is emitted at a time
+ }
+
Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
.onErrorResume(t -> Mono.empty());
private void removePoliciciesInRic(@Nullable Ric ric) {
if (ric != null) {
- RicSynchronizationTask synch = new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services,
- restClientFactory, rics);
- synch.run(ric);
+ synchronizationTask().run(ric);
}
}
- private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+ private RicSynchronizationTask synchronizationTask() {
+ return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
+ }
+
+ /**
+ * for an added RIC after a restart it is nesessary to get the suypported policy
+ * types from the RIC unless a full synchronization is wanted.
+ *
+ * @param ric the ric to get supprted types from
+ * @return the same ric
+ */
+ private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
+ logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
+ // Synchronize the policy types
+ return this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
+ .collectList() //
+ .flatMap(list -> Mono.just(ric)) //
+ .doOnError(t -> logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(),
+ t.getMessage())) //
+ .onErrorResume(t -> Mono.just(ric));
+ }
+
+ public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
synchronized (this.rics) {
String ricId = updatedInfo.getRicConfig().ricId();
RicConfigUpdate.Type event = updatedInfo.getType();
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
- addRic(updatedInfo.getRicConfig());
+ Ric ric = new Ric(updatedInfo.getRicConfig());
+ return trySyncronizeSupportedTypes(ric) //
+ .flatMap(this::addRic) //
+ .flatMap(notUsed -> Mono.just(event));
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
Ric ric = rics.remove(ricId);
Ric ric = this.rics.get(ricId);
if (ric == null) {
logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
- addRic(updatedInfo.getRicConfig());
+ addRic(new Ric(updatedInfo.getRicConfig())).block();
} else {
ric.setRicConfig(updatedInfo.getRicConfig());
}
}
+ return Mono.just(event);
}
}
- private void addRic(RicConfig config) {
- Ric ric = new Ric(config);
+ Mono<Ric> addRic(Ric ric) {
this.rics.put(ric);
if (this.appConfig.getVardataDirectory() != null) {
this.policies.restoreFromDatabase(ric, this.policyTypes);
}
- runRicSynchronization(ric);
- }
+ logger.debug("Added RIC: {}", ric.id());
+
+ ric.setState(RicState.AVAILABLE);
- void runRicSynchronization(Ric ric) {
- RicSynchronizationTask synchronizationTask =
- new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
- synchronizationTask.run(ric);
+ return Mono.just(ric);
}
/**
public class RicSupervision {
private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
+ private static final int CONCURRENCY = 50; // Number of RIC checked in paralell
private final Rics rics;
private final Policies policies;
private final PolicyTypes policyTypes;
private Flux<RicData> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
- .flatMap(this::checkOneRic);
+ .flatMap(this::checkOneRic, CONCURRENCY);
}
private Mono<RicData> checkOneRic(RicData ricData) {
private void onRicCheckedError(Throwable t, RicData ricData) {
logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage());
- if (t instanceof SynchStartedException) {
- // this is just a temporary state,
- ricData.ric.setState(RicState.AVAILABLE);
- } else {
+ if (!(t instanceof SynchStartedException)) {
+ // If synch is started, the synch will set the final state
ricData.ric.setState(RicState.UNAVAILABLE);
}
ricData.ric.getLock().unlockBlocking();
private Mono<RicData> checkRicState(RicData ric) {
if (ric.ric.getState() == RicState.UNAVAILABLE) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (state == UNAVAILABLE)", ric.ric.id());
return startSynchronization(ric) //
.onErrorResume(t -> Mono.empty());
} else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
synchronized (this.policies) {
if (ricPolicies.size() != policies.getForRic(ric.ric.id()).size()) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (noOfPolicices == {}, expected == {})",
+ ric.ric.id(), ricPolicies.size(), policies.getForRic(ric.ric.id()).size());
return startSynchronization(ric);
}
for (String policyId : ricPolicies) {
if (!policies.containsPolicy(policyId)) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy in RIC: {})",
+ ric.ric.id(), policyId);
return startSynchronization(ric);
}
}
private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) {
if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) {
+ logger.debug(
+ "RicSupervision, starting ric: {} synchronization (unexpected numer of policy types in RIC: {}, expected: {})",
+ ric.ric.id(), ricTypes.size(), ric.ric.getSupportedPolicyTypes().size());
return startSynchronization(ric);
}
for (String typeName : ricTypes) {
if (!ric.ric.isSupportingType(typeName)) {
+ logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy type: {})",
+ ric.ric.id(), typeName);
return startSynchronization(ric);
}
}
private Mono<RicData> startSynchronization(RicData ric) {
RicSynchronizationTask synchronizationTask = createSynchronizationTask();
- synchronizationTask.run(ric.ric);
- return Mono.error(new SynchStartedException("Syncronization started"));
+ return synchronizationTask.synchronizeRic(ric.ric) //
+ .flatMap(notUsed -> Mono.error(new SynchStartedException("Syncronization started")));
+
}
RicSynchronizationTask createSynchronizationTask() {
}
public void run(Ric ric) {
- logger.debug("Handling ric: {}", ric.getConfig().ricId());
+ logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId());
if (ric.getState() == RicState.SYNCHRONIZING) {
logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
}
ric.getLock().lock(LockType.EXCLUSIVE) //
- .flatMap(notUsed -> setRicState(ric)) //
- .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
- .flatMapMany(client -> runSynchronization(ric, client)) //
- .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+ .flatMap(notUsed -> synchronizeRic(ric)) //
.subscribe(new BaseSubscriber<Object>() {
- @Override
- protected void hookOnError(Throwable throwable) {
- logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(),
- throwable.getMessage());
- ric.setState(RicState.UNAVAILABLE);
- }
-
- @Override
- protected void hookOnComplete() {
- onSynchronizationComplete(ric);
- }
@Override
protected void hookFinally(SignalType type) {
});
}
+ public Mono<Ric> synchronizeRic(Ric ric) {
+ return Mono.just(ric) //
+ .flatMap(notUsed -> setRicState(ric)) //
+ .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+ .flatMapMany(client -> runSynchronization(ric, client)) //
+ .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
+ .collectList() //
+ .flatMap(notUsed -> Mono.just(ric)) //
+ .doOnError(t -> { //
+ logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
+ ric.setState(RicState.UNAVAILABLE); //
+ }) //
+ .doOnNext(notUsed -> onSynchronizationComplete(ric)) //
+ .onErrorResume(t -> Mono.just(ric));
+ }
+
+ public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
+ return a1Client.getPolicyTypeIdentities() //
+ .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
+ .flatMapMany(Flux::fromIterable) //
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
+ .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
+ .doOnNext(ric::addSupportedPolicyType); //
+ }
+
@SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
private Mono<Ric> setRicState(Ric ric) {
synchronized (ric) {
logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
return Mono.empty();
}
+ logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId());
ric.setState(RicState.SYNCHRONIZING);
return Mono.just(ric);
}
}
private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
- logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
+ logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
deleteAllPoliciesInRepository(ric);
Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
callbacks.notifyServicesRicSynchronized(ric, services);
}
- private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
- return a1Client.getPolicyTypeIdentities() //
- .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
- .flatMapMany(Flux::fromIterable) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
- .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
- .doOnNext(ric::addSupportedPolicyType); //
- }
-
private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
if (policyTypes.contains(policyTypeId)) {
return Mono.just(policyTypes.get(policyTypeId));
}
private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
- logger.debug("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
+ logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
return a1Client.putPolicy(policy) //
.flatMapMany(notUsed -> Flux.just(policy));
}
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.id())) //
+ .doOnNext(policy -> logger.debug("Recreating policy: {}, ric: {}", policy.getId(), ric.id())) //
.filter(policy -> !checkTransient(policy)) //
- .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
+ .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC)
+ .doOnError(t -> logger.warn("Recreating policy failed, ric: {}, reason: {}", ric.id(), t.getMessage()));
}
}
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RefreshConfigTask;
import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RicSupervision;
import org.onap.ccsdk.oran.a1policymanagementservice.tasks.ServiceSupervision;
import org.onap.ccsdk.oran.a1policymanagementservice.utils.MockA1Client;
@Autowired
RappSimulatorController rAppSimulator;
+ @Autowired
+ RefreshConfigTask refreshConfigTask;
+
private static Gson gson = new GsonBuilder().create();
/**
}
@Test
- void testPersistence() throws ServiceException {
+ void testPersistency() throws ServiceException {
Ric ric = this.addRic("ric1");
PolicyType type = this.addPolicyType("type1", ric.id());
PolicyTypes types = new PolicyTypes(this.applicationConfig);
assertThat(types.size()).isEqualTo(1);
- addPolicy("id", type.getId(), "service", ric.id());
- addPolicy("id2", type.getId(), "service", ric.id());
+ final int noOfPolicies = 100;
+ for (int i = 0; i < noOfPolicies; ++i) {
+ addPolicy("id" + i, type.getId(), "service", ric.id());
+ }
{
Policies policies = new Policies(this.applicationConfig);
policies.restoreFromDatabase(ric, types);
- assertThat(policies.size()).isEqualTo(2);
+ assertThat(policies.size()).isEqualTo(noOfPolicies);
}
{
restClient().delete("/policies/id2").block();
Policies policies = new Policies(this.applicationConfig);
policies.restoreFromDatabase(ric, types);
- assertThat(policies.size()).isEqualTo(1);
+ assertThat(policies.size()).isEqualTo(noOfPolicies - 1);
}
+ {
+ // Test adding the RIC from configuration
+ RicConfig config = ric.getConfig();
+ this.rics.remove("ric1");
+ ApplicationConfig.RicConfigUpdate update =
+ new ApplicationConfig.RicConfigUpdate(config, ApplicationConfig.RicConfigUpdate.Type.ADDED);
+ refreshConfigTask.handleUpdatedRicConfig(update).block();
+ ric = this.rics.getRic("ric1");
+ assertThat(ric.getSupportedPolicyTypes().size()).isEqualTo(1);
+ }
}
@Test
@Test
void testConcurrency() throws Exception {
+ logger.info("Concurrency test starting");
final Instant startTime = Instant.now();
List<Thread> threads = new ArrayList<>();
List<ConcurrencyTestRunnable> tests = new ArrayList<>();
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
// Then
verify(refreshTaskUnderTest).loadConfigurationFromFile();
- verify(refreshTaskUnderTest, times(2)).runRicSynchronization(any(Ric.class));
+ verify(refreshTaskUnderTest, times(2)).addRic(any(Ric.class));
Iterable<RicConfig> ricConfigs = appConfig.getRicConfigs();
RicConfig ricConfig = ricConfigs.iterator().next();
String newBaseUrl = "newBaseUrl";
modifyTheRicConfiguration(configAsJson, newBaseUrl);
when(cbsClient.get(any())).thenReturn(Mono.just(configAsJson));
- doNothing().when(refreshTaskUnderTest).runRicSynchronization(any(Ric.class));
StepVerifier //
.create(refreshTaskUnderTest.createRefreshTask()) //
.expectSubscription() //
- .expectNext(Type.CHANGED) //
- .expectNext(Type.ADDED) //
- .expectNext(Type.REMOVED) //
+ .expectNextCount(3) // CHANGED REMOVED ADDED
.thenCancel() //
.verify();
String ric2Name = "ric2";
assertThat(appConfig.getRic(ric2Name)).isNotNull();
- assertThat(rics.size()).isEqualTo(2);
+ // assertThat(rics.size()).isEqualTo(2);
assertThat(rics.get(RIC_1_NAME).getConfig().baseUrl()).isEqualTo(newBaseUrl);
assertThat(rics.get(ric2Name)).isNotNull();
RicSupervision supervisorUnderTest = spy(createRicSupervision());
doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask();
+ doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any());
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
verify(supervisorUnderTest).checkAllRics();
verify(supervisorUnderTest).createSynchronizationTask();
- verify(synchronizationTaskMock).run(RIC_1);
+ verify(synchronizationTaskMock).synchronizeRic(RIC_1);
verifyNoMoreInteractions(supervisorUnderTest);
}
package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
-import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
-import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
verify(synchronizerUnderTest).run(RIC_1);
verify(synchronizerUnderTest).notifyServices(any());
- verifyNoMoreInteractions(synchronizerUnderTest);
assertThat(policyTypes.size()).isEqualTo(1);
assertThat(policies.size()).isZero();
RicSynchronizationTask synchronizerUnderTest = createTask();
- final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(RicSynchronizationTask.class, WARN);
-
synchronizerUnderTest.run(RIC_1);
- verifyCorrectLogMessage(0, logAppender,
- "Synchronization failure for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
-
verify(a1ClientMock, times(2)).deleteAllPolicies();
verifyNoMoreInteractions(a1ClientMock);
private void simulateRicWithNoPolicyTypes() {
when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.just(Collections.emptyList()));
}
-
- private void verifyCorrectLogMessage(int messageIndex, ListAppender<ILoggingEvent> logAppender,
- String expectedMessage) {
- ILoggingEvent loggingEvent = logAppender.list.get(messageIndex);
- assertThat(loggingEvent.getFormattedMessage()).isEqualTo(expectedMessage);
- }
}
"tags": ["A1 Policy Management V1.0"]
},
"delete": {
- "summary": "Unregisters a service",
+ "summary": "Unregister a service",
"operationId": "deleteService",
"responses": {
"204": {
- "description": "Service unregisterred",
+ "description": "Service unregistered",
"content": {"*/*": {"schema": {"$ref": "#/components/schemas/void"}}}
},
"404": {
delete:
tags:
- A1 Policy Management V1.0
- summary: Unregisters a service
+ summary: Unregister a service
operationId: deleteService
parameters:
- name: name
type: string
responses:
204:
- description: Service unregisterred
+ description: Service unregistered
content:
'*/*':
schema: