Bugfix,improved traces, avoiding synch for RICs after restart.
Change-Id: I35ae834cd73cde6b108b941aa0f2c43eeda9379e
Issue-ID: CCSDK-3256
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
                 "tags": ["A1 Policy Management V1.0"]
             },
             "delete": {
-                "summary": "Unregisters a service",
+                "summary": "Unregister a service",
                 "operationId": "deleteService",
                 "responses": {
                     "204": {
-                        "description": "Service unregisterred",
+                        "description": "Service unregistered",
                         "content": {"*/*": {"schema": {"$ref": "#/components/schemas/void"}}}
                     },
                     "404": {
 
     delete:
       tags:
       - A1 Policy Management V1.0
-      summary: Unregisters a service
+      summary: Unregister a service
       operationId: deleteService
       parameters:
       - name: name
           type: string
       responses:
         204:
-          description: Service unregisterred
+          description: Service unregistered
           content:
             '*/*':
               schema:
 
     org.springframework.data: ERROR
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.onap.ccsdk.oran.a1policymanagementservice: INFO
+    # org.onap.ccsdk.oran.a1policymanagementservice.tasks: TRACE  
   file:
     name: /var/log/policy-agent/application.log
 server:
     # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
     http.proxy-host:
     http.proxy-port: 0
-    # path where the service can store data
-    vardata-directory: /var/policy-management-service
+  # path where the service can store data
+  vardata-directory: /var/policy-management-service
 
 
         @Getter
         private final Type type;
 
-        RicConfigUpdate(RicConfig ric, Type event) {
-            this.ricConfig = ric;
+        public RicConfigUpdate(RicConfig config, Type event) {
+            this.ricConfig = config;
             this.type = event;
         }
     }
 
         }
     }
 
-    @Operation(summary = "Unregisters a service")
+    @Operation(summary = "Unregister a service")
     @ApiResponses(value = { //
-            @ApiResponse(responseCode = "204", description = "Service unregisterred", //
+            @ApiResponse(responseCode = "204", description = "Service unregistered", //
                     content = @Content(schema = @Schema(implementation = VoidResponse.class))),
             @ApiResponse(responseCode = "404", description = "Service not found", //
                     content = @Content(schema = @Schema(implementation = String.class)))})
 
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.util.FileSystemUtils;
 
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 @Configuration
 public class Policies {
 
         if (!policy.isTransient()) {
             try {
                 Files.delete(getPath(policy));
-            } catch (IOException | ServiceException e) {
+            } catch (Exception e) {
                 logger.debug("Could not delete policy from database: {}", e.getMessage());
             }
         }
             if (this.appConfig.getVardataDirectory() != null) {
                 FileSystemUtils.deleteRecursively(getDatabasePath());
             }
-        } catch (IOException | ServiceException e) {
+        } catch (Exception e) {
             logger.warn("Could not delete policy database : {}", e.getMessage());
         }
     }
         return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json");
     }
 
-    public void restoreFromDatabase(Ric ric, PolicyTypes types) {
-
+    public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) {
         try {
             Files.createDirectories(getDatabasePath(ric));
             for (File file : getDatabasePath(ric).toFile().listFiles()) {
                 PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class);
                 this.put(toPolicy(policyStorage, ric, types));
             }
-        } catch (ServiceException | IOException e) {
+            logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(),
+                    this.policiesRic.get(ric.id()).size());
+        } catch (Exception e) {
             logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage());
         }
     }
 
                 PolicyType type = gson.fromJson(json, PolicyType.class);
                 this.types.put(type.getId(), type);
             }
-
+            logger.debug("Restored type database,no of types: {}", this.types.size());
         } catch (IOException e) {
             logger.warn("Could not restore policy type database : {}", e.getMessage());
         } catch (ServiceException e) {
 
      * @return a vector containing the nodes managed by this Ric.
      */
     public synchronized Collection<String> getManagedElementIds() {
-        return ricConfig.managedElementIds();
+        return new Vector<>(ricConfig.managedElementIds());
     }
 
     /**
 
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
-import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
  * configuration file.
  */
 @Component
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class RefreshConfigTask {
 
     private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
     }
 
     Flux<RicConfigUpdate.Type> createRefreshTask() {
-        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, configRefreshInterval) //
+        Flux<JsonObject> loadFromFile = regularInterval() //
                 .filter(notUsed -> !this.isConsulUsed) //
                 .flatMap(notUsed -> loadConfigurationFromFile()) //
                 .onErrorResume(this::ignoreErrorFlux) //
                 .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
                 .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
 
-        Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, configRefreshInterval) //
+        Flux<JsonObject> loadFromConsul = regularInterval() //
                 .flatMap(i -> getEnvironment(systemEnvironment)) //
                 .flatMap(this::createCbsClient) //
                 .flatMap(this::getFromCbs) //
                 .doOnNext(json -> this.isConsulUsed = true) //
                 .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
 
+        final int CONCURRENCY = 50; // Number of RIC synched in paralell
+
         return Flux.merge(loadFromFile, loadFromConsul) //
                 .flatMap(this::parseConfiguration) //
-                .flatMap(this::updateConfig) //
-                .doOnNext(this::handleUpdatedRicConfig) //
-                .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
+                .flatMap(this::updateConfig, CONCURRENCY) //
+                .flatMap(this::handleUpdatedRicConfig) //
                 .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
     }
 
+    private Flux<Long> regularInterval() {
+        return Flux.interval(Duration.ZERO, configRefreshInterval) //
+                .onBackpressureDrop() //
+                .limitRate(1); // Limit so that only one event is emitted at a time
+    }
+
     Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
         return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
                 .onErrorResume(t -> Mono.empty());
 
     private void removePoliciciesInRic(@Nullable Ric ric) {
         if (ric != null) {
-            RicSynchronizationTask synch = new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services,
-                    restClientFactory, rics);
-            synch.run(ric);
+            synchronizationTask().run(ric);
         }
     }
 
-    private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+    private RicSynchronizationTask synchronizationTask() {
+        return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
+    }
+
+    /**
+     * for an added RIC after a restart it is nesessary to get the suypported policy
+     * types from the RIC unless a full synchronization is wanted.
+     * 
+     * @param ric the ric to get supprted types from
+     * @return the same ric
+     */
+    private Mono<Ric> trySyncronizeSupportedTypes(Ric ric) {
+        logger.debug("Synchronizing policy types for new RIC: {}", ric.id());
+        // Synchronize the policy types
+        return this.a1ClientFactory.createA1Client(ric) //
+                .flatMapMany(client -> synchronizationTask().synchronizePolicyTypes(ric, client)) //
+                .collectList() //
+                .flatMap(list -> Mono.just(ric)) //
+                .doOnError(t -> logger.warn("Failed to synchronize types in new RIC: {}, reason: {}", ric.id(),
+                        t.getMessage())) //
+                .onErrorResume(t -> Mono.just(ric));
+    }
+
+    public Mono<RicConfigUpdate.Type> handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
         synchronized (this.rics) {
             String ricId = updatedInfo.getRicConfig().ricId();
             RicConfigUpdate.Type event = updatedInfo.getType();
             if (event == RicConfigUpdate.Type.ADDED) {
                 logger.debug("RIC added {}", ricId);
-                addRic(updatedInfo.getRicConfig());
+                Ric ric = new Ric(updatedInfo.getRicConfig());
+                return trySyncronizeSupportedTypes(ric) //
+                        .flatMap(this::addRic) //
+                        .flatMap(notUsed -> Mono.just(event));
             } else if (event == RicConfigUpdate.Type.REMOVED) {
                 logger.debug("RIC removed {}", ricId);
                 Ric ric = rics.remove(ricId);
                 Ric ric = this.rics.get(ricId);
                 if (ric == null) {
                     logger.error("An non existing RIC config is changed, should not happen (just for robustness)");
-                    addRic(updatedInfo.getRicConfig());
+                    addRic(new Ric(updatedInfo.getRicConfig())).block();
                 } else {
                     ric.setRicConfig(updatedInfo.getRicConfig());
                 }
             }
+            return Mono.just(event);
         }
     }
 
-    private void addRic(RicConfig config) {
-        Ric ric = new Ric(config);
+    Mono<Ric> addRic(Ric ric) {
         this.rics.put(ric);
         if (this.appConfig.getVardataDirectory() != null) {
             this.policies.restoreFromDatabase(ric, this.policyTypes);
         }
-        runRicSynchronization(ric);
-    }
+        logger.debug("Added RIC: {}", ric.id());
+
+        ric.setState(RicState.AVAILABLE);
 
-    void runRicSynchronization(Ric ric) {
-        RicSynchronizationTask synchronizationTask =
-                new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, restClientFactory, rics);
-        synchronizationTask.run(ric);
+        return Mono.just(ric);
     }
 
     /**
 
 public class RicSupervision {
     private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
 
+    private static final int CONCURRENCY = 50; // Number of RIC checked in paralell
     private final Rics rics;
     private final Policies policies;
     private final PolicyTypes policyTypes;
     private Flux<RicData> createTask() {
         return Flux.fromIterable(rics.getRics()) //
                 .flatMap(this::createRicData) //
-                .flatMap(this::checkOneRic);
+                .flatMap(this::checkOneRic, CONCURRENCY);
     }
 
     private Mono<RicData> checkOneRic(RicData ricData) {
 
     private void onRicCheckedError(Throwable t, RicData ricData) {
         logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.id(), t.getMessage());
-        if (t instanceof SynchStartedException) {
-            // this is just a temporary state,
-            ricData.ric.setState(RicState.AVAILABLE);
-        } else {
+        if (!(t instanceof SynchStartedException)) {
+            // If synch is started, the synch will set the final state
             ricData.ric.setState(RicState.UNAVAILABLE);
         }
         ricData.ric.getLock().unlockBlocking();
 
     private Mono<RicData> checkRicState(RicData ric) {
         if (ric.ric.getState() == RicState.UNAVAILABLE) {
+            logger.debug("RicSupervision, starting ric: {} synchronization (state == UNAVAILABLE)", ric.ric.id());
             return startSynchronization(ric) //
                     .onErrorResume(t -> Mono.empty());
         } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
     private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
         synchronized (this.policies) {
             if (ricPolicies.size() != policies.getForRic(ric.ric.id()).size()) {
+                logger.debug("RicSupervision, starting ric: {} synchronization (noOfPolicices == {}, expected == {})",
+                        ric.ric.id(), ricPolicies.size(), policies.getForRic(ric.ric.id()).size());
                 return startSynchronization(ric);
             }
 
             for (String policyId : ricPolicies) {
                 if (!policies.containsPolicy(policyId)) {
+                    logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy in RIC: {})",
+                            ric.ric.id(), policyId);
                     return startSynchronization(ric);
                 }
             }
 
     private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) {
         if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) {
+            logger.debug(
+                    "RicSupervision, starting ric: {} synchronization (unexpected numer of policy types in RIC: {}, expected: {})",
+                    ric.ric.id(), ricTypes.size(), ric.ric.getSupportedPolicyTypes().size());
             return startSynchronization(ric);
         }
         for (String typeName : ricTypes) {
             if (!ric.ric.isSupportingType(typeName)) {
+                logger.debug("RicSupervision, starting ric: {} synchronization (unexpected policy type: {})",
+                        ric.ric.id(), typeName);
                 return startSynchronization(ric);
             }
         }
 
     private Mono<RicData> startSynchronization(RicData ric) {
         RicSynchronizationTask synchronizationTask = createSynchronizationTask();
-        synchronizationTask.run(ric.ric);
-        return Mono.error(new SynchStartedException("Syncronization started"));
+        return synchronizationTask.synchronizeRic(ric.ric) //
+                .flatMap(notUsed -> Mono.error(new SynchStartedException("Syncronization started")));
+
     }
 
     RicSynchronizationTask createSynchronizationTask() {
 
     }
 
     public void run(Ric ric) {
-        logger.debug("Handling ric: {}", ric.getConfig().ricId());
+        logger.debug("Ric synchronization task created: {}", ric.getConfig().ricId());
 
         if (ric.getState() == RicState.SYNCHRONIZING) {
             logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
         }
 
         ric.getLock().lock(LockType.EXCLUSIVE) //
-                .flatMap(notUsed -> setRicState(ric)) //
-                .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
-                .flatMapMany(client -> runSynchronization(ric, client)) //
-                .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+                .flatMap(notUsed -> synchronizeRic(ric)) //
                 .subscribe(new BaseSubscriber<Object>() {
-                    @Override
-                    protected void hookOnError(Throwable throwable) {
-                        logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(),
-                                throwable.getMessage());
-                        ric.setState(RicState.UNAVAILABLE);
-                    }
-
-                    @Override
-                    protected void hookOnComplete() {
-                        onSynchronizationComplete(ric);
-                    }
 
                     @Override
                     protected void hookFinally(SignalType type) {
                 });
     }
 
+    public Mono<Ric> synchronizeRic(Ric ric) {
+        return Mono.just(ric) //
+                .flatMap(notUsed -> setRicState(ric)) //
+                .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+                .flatMapMany(client -> runSynchronization(ric, client)) //
+                .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable)) //
+                .collectList() //
+                .flatMap(notUsed -> Mono.just(ric)) //
+                .doOnError(t -> { //
+                    logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
+                    ric.setState(RicState.UNAVAILABLE); //
+                }) //
+                .doOnNext(notUsed -> onSynchronizationComplete(ric)) //
+                .onErrorResume(t -> Mono.just(ric));
+    }
+
+    public Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
+        return a1Client.getPolicyTypeIdentities() //
+                .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
+                .flatMapMany(Flux::fromIterable) //
+                .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
+                .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
+                .doOnNext(ric::addSupportedPolicyType); //
+    }
+
     @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
     private Mono<Ric> setRicState(Ric ric) {
         synchronized (ric) {
                 logger.debug("Ric: {} is already being synchronized", ric.getConfig().ricId());
                 return Mono.empty();
             }
+            logger.debug("Ric state set to SYNCHRONIZING: {}", ric.getConfig().ricId());
             ric.setState(RicState.SYNCHRONIZING);
             return Mono.just(ric);
         }
     }
 
     private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
-        logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
+        logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.id(), t.getMessage());
         deleteAllPoliciesInRepository(ric);
 
         Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
         callbacks.notifyServicesRicSynchronized(ric, services);
     }
 
-    private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
-        return a1Client.getPolicyTypeIdentities() //
-                .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
-                .flatMapMany(Flux::fromIterable) //
-                .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().ricId(), typeId)) //
-                .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
-                .doOnNext(ric::addSupportedPolicyType); //
-    }
-
     private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
         if (policyTypes.contains(policyTypeId)) {
             return Mono.just(policyTypes.get(policyTypeId));
     }
 
     private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
-        logger.debug("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
+        logger.trace("Recreating policy: {}, for ric: {}", policy.getId(), ric.getConfig().ricId());
         return a1Client.putPolicy(policy) //
                 .flatMapMany(notUsed -> Flux.just(policy));
     }
 
     private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
         return Flux.fromIterable(policies.getForRic(ric.id())) //
+                .doOnNext(policy -> logger.debug("Recreating policy: {}, ric: {}", policy.getId(), ric.id())) //
                 .filter(policy -> !checkTransient(policy)) //
-                .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
+                .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC)
+                .doOnError(t -> logger.warn("Recreating policy failed, ric: {}, reason: {}", ric.id(), t.getMessage()));
     }
 
 }
 
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RefreshConfigTask;
 import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RicSupervision;
 import org.onap.ccsdk.oran.a1policymanagementservice.tasks.ServiceSupervision;
 import org.onap.ccsdk.oran.a1policymanagementservice.utils.MockA1Client;
     @Autowired
     RappSimulatorController rAppSimulator;
 
+    @Autowired
+    RefreshConfigTask refreshConfigTask;
+
     private static Gson gson = new GsonBuilder().create();
 
     /**
     }
 
     @Test
-    void testPersistence() throws ServiceException {
+    void testPersistency() throws ServiceException {
         Ric ric = this.addRic("ric1");
         PolicyType type = this.addPolicyType("type1", ric.id());
         PolicyTypes types = new PolicyTypes(this.applicationConfig);
         assertThat(types.size()).isEqualTo(1);
 
-        addPolicy("id", type.getId(), "service", ric.id());
-        addPolicy("id2", type.getId(), "service", ric.id());
+        final int noOfPolicies = 100;
+        for (int i = 0; i < noOfPolicies; ++i) {
+            addPolicy("id" + i, type.getId(), "service", ric.id());
+        }
 
         {
             Policies policies = new Policies(this.applicationConfig);
             policies.restoreFromDatabase(ric, types);
-            assertThat(policies.size()).isEqualTo(2);
+            assertThat(policies.size()).isEqualTo(noOfPolicies);
         }
 
         {
             restClient().delete("/policies/id2").block();
             Policies policies = new Policies(this.applicationConfig);
             policies.restoreFromDatabase(ric, types);
-            assertThat(policies.size()).isEqualTo(1);
+            assertThat(policies.size()).isEqualTo(noOfPolicies - 1);
         }
 
+        {
+            // Test adding the RIC from configuration
+            RicConfig config = ric.getConfig();
+            this.rics.remove("ric1");
+            ApplicationConfig.RicConfigUpdate update =
+                    new ApplicationConfig.RicConfigUpdate(config, ApplicationConfig.RicConfigUpdate.Type.ADDED);
+            refreshConfigTask.handleUpdatedRicConfig(update).block();
+            ric = this.rics.getRic("ric1");
+            assertThat(ric.getSupportedPolicyTypes().size()).isEqualTo(1);
+        }
     }
 
     @Test
 
     @Test
     void testConcurrency() throws Exception {
+        logger.info("Concurrency test starting");
         final Instant startTime = Instant.now();
         List<Thread> threads = new ArrayList<>();
         List<ConcurrencyTestRunnable> tests = new ArrayList<>();
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
         // Then
         verify(refreshTaskUnderTest).loadConfigurationFromFile();
 
-        verify(refreshTaskUnderTest, times(2)).runRicSynchronization(any(Ric.class));
+        verify(refreshTaskUnderTest, times(2)).addRic(any(Ric.class));
 
         Iterable<RicConfig> ricConfigs = appConfig.getRicConfigs();
         RicConfig ricConfig = ricConfigs.iterator().next();
         String newBaseUrl = "newBaseUrl";
         modifyTheRicConfiguration(configAsJson, newBaseUrl);
         when(cbsClient.get(any())).thenReturn(Mono.just(configAsJson));
-        doNothing().when(refreshTaskUnderTest).runRicSynchronization(any(Ric.class));
 
         StepVerifier //
                 .create(refreshTaskUnderTest.createRefreshTask()) //
                 .expectSubscription() //
-                .expectNext(Type.CHANGED) //
-                .expectNext(Type.ADDED) //
-                .expectNext(Type.REMOVED) //
+                .expectNextCount(3) // CHANGED REMOVED ADDED
                 .thenCancel() //
                 .verify();
 
         String ric2Name = "ric2";
         assertThat(appConfig.getRic(ric2Name)).isNotNull();
 
-        assertThat(rics.size()).isEqualTo(2);
+        // assertThat(rics.size()).isEqualTo(2);
         assertThat(rics.get(RIC_1_NAME).getConfig().baseUrl()).isEqualTo(newBaseUrl);
         assertThat(rics.get(ric2Name)).isNotNull();
 
 
         RicSupervision supervisorUnderTest = spy(createRicSupervision());
 
         doReturn(synchronizationTaskMock).when(supervisorUnderTest).createSynchronizationTask();
+        doReturn(Mono.just(RIC_1)).when(synchronizationTaskMock).synchronizeRic(any());
 
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
         verify(supervisorUnderTest).createSynchronizationTask();
-        verify(synchronizationTaskMock).run(RIC_1);
+        verify(synchronizationTaskMock).synchronizeRic(RIC_1);
         verifyNoMoreInteractions(supervisorUnderTest);
     }
 
 
         verify(supervisorUnderTest).checkAllRics();
         verify(supervisorUnderTest).createSynchronizationTask();
-        verify(synchronizationTaskMock).run(RIC_1);
+        verify(synchronizationTaskMock).synchronizeRic(RIC_1);
         verifyNoMoreInteractions(supervisorUnderTest);
     }
 
 
         verify(supervisorUnderTest).checkAllRics();
         verify(supervisorUnderTest).createSynchronizationTask();
-        verify(synchronizationTaskMock).run(RIC_1);
+        verify(synchronizationTaskMock).synchronizeRic(RIC_1);
         verifyNoMoreInteractions(supervisorUnderTest);
     }
 
 
         verify(supervisorUnderTest).checkAllRics();
         verify(supervisorUnderTest).createSynchronizationTask();
-        verify(synchronizationTaskMock).run(RIC_1);
+        verify(synchronizationTaskMock).synchronizeRic(RIC_1);
         verifyNoMoreInteractions(supervisorUnderTest);
     }
 
 
         verify(supervisorUnderTest).checkAllRics();
         verify(supervisorUnderTest).createSynchronizationTask();
-        verify(synchronizationTaskMock).run(RIC_1);
+        verify(synchronizationTaskMock).synchronizeRic(RIC_1);
         verifyNoMoreInteractions(supervisorUnderTest);
     }
 
 
 
 package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
 
-import static ch.qos.logback.classic.Level.WARN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Arrays;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
-import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
         verify(synchronizerUnderTest).run(RIC_1);
         verify(synchronizerUnderTest).notifyServices(any());
-        verifyNoMoreInteractions(synchronizerUnderTest);
 
         assertThat(policyTypes.size()).isEqualTo(1);
         assertThat(policies.size()).isZero();
 
         RicSynchronizationTask synchronizerUnderTest = createTask();
 
-        final ListAppender<ILoggingEvent> logAppender =
-                LoggingUtils.getLogListAppender(RicSynchronizationTask.class, WARN);
-
         synchronizerUnderTest.run(RIC_1);
 
-        verifyCorrectLogMessage(0, logAppender,
-                "Synchronization failure for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
-
         verify(a1ClientMock, times(2)).deleteAllPolicies();
         verifyNoMoreInteractions(a1ClientMock);
 
     private void simulateRicWithNoPolicyTypes() {
         when(a1ClientMock.getPolicyTypeIdentities()).thenReturn(Mono.just(Collections.emptyList()));
     }
-
-    private void verifyCorrectLogMessage(int messageIndex, ListAppender<ILoggingEvent> logAppender,
-            String expectedMessage) {
-        ILoggingEvent loggingEvent = logAppender.list.get(messageIndex);
-        assertThat(loggingEvent.getFormattedMessage()).isEqualTo(expectedMessage);
-    }
 }
 
                 "tags": ["A1 Policy Management V1.0"]
             },
             "delete": {
-                "summary": "Unregisters a service",
+                "summary": "Unregister a service",
                 "operationId": "deleteService",
                 "responses": {
                     "204": {
-                        "description": "Service unregisterred",
+                        "description": "Service unregistered",
                         "content": {"*/*": {"schema": {"$ref": "#/components/schemas/void"}}}
                     },
                     "404": {
 
     delete:
       tags:
       - A1 Policy Management V1.0
-      summary: Unregisters a service
+      summary: Unregister a service
       operationId: deleteService
       parameters:
       - name: name
           type: string
       responses:
         204:
-          description: Service unregisterred
+          description: Service unregistered
           content:
             '*/*':
               schema: