Storing or registerred services to survice a restart.
Change-Id: If1b41d0a3c995b51bb93000caca5ecff9da6fbc1
Issue-ID: CCSDK-3256
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
                 "required": true
             },
             "description": "The URL to this call is registerred at Service registration.",
-            "operationId": "jobStatusCallback",
+            "operationId": "serviceCallback",
             "responses": {"200": {
                 "description": "OK",
                 "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
 
       - Callbacks
       summary: Callback for Near-RT RIC status
       description: The URL to this call is registerred at Service registration.
-      operationId: jobStatusCallback
+      operationId: serviceCallback
       requestBody:
         content:
           application/json:
 
     }
 
     @Bean
-    public Services getServices() {
-        return new Services();
+    public Services getServices(@Autowired ApplicationConfig applicationConfig) {
+        return new Services(applicationConfig);
     }
 
     @Bean
 
         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
     }
 
-    public void notifyServicesRicSynchronized(Ric ric, Services services) {
-        createTask(ric, services).subscribe(numberOfServices -> logger.debug("Services {} notified", numberOfServices),
-                throwable -> logger.error("Service notification failed, cause: {}", throwable.getMessage()),
-                () -> logger.debug("All services notified"));
-
-    }
-
-    private Mono<Integer> createTask(Ric ric, Services services) {
+    public Flux<Service> notifyServicesRicAvailable(Ric ric, Services services) {
+        final int CONCURRENCY = 10;
         return Flux.fromIterable(services.getAll()) //
-                .flatMap(service -> notifyServiceRicSynchronized(ric, service)) //
-                .collectList() //
-                .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
+                .flatMap(service -> notifyService(ric, service, ServiceCallbackInfo.EventType.AVAILABLE), CONCURRENCY); //
     }
 
-    private Mono<String> notifyServiceRicSynchronized(Ric ric, Service service) {
+    private Mono<Service> notifyService(Ric ric, Service service, ServiceCallbackInfo.EventType eventType) {
         if (service.getCallbackUrl().isEmpty()) {
             return Mono.empty();
         }
 
-        ServiceCallbackInfo request = new ServiceCallbackInfo(ric.id(), ServiceCallbackInfo.EventType.AVAILABLE);
+        ServiceCallbackInfo request = new ServiceCallbackInfo(ric.id(), eventType);
         String body = gson.toJson(request);
 
         return restClient.post(service.getCallbackUrl(), body)
                 .doOnNext(resp -> logger.debug("Invoking service {} callback,   ric: {}", service.getName(), ric.id()))
                 .onErrorResume(throwable -> {
-                    logger.error("Service: {}, callback: {} failed:  {}", service.getName(), service.getCallbackUrl(),
+                    logger.warn("Service: {}, callback: {} failed:  {}", service.getName(), service.getCallbackUrl(),
                             throwable.toString());
                     return Mono.empty();
-                });
+                }) //
+                .flatMap(resp -> Mono.just(service));
     }
 
 }
 
                 this.types.put(type.getId(), type);
             }
             logger.debug("Restored type database,no of types: {}", this.types.size());
-        } catch (IOException e) {
-            logger.warn("Could not restore policy type database : {}", e.getMessage());
         } catch (ServiceException e) {
             logger.debug("Could not restore policy type database : {}", e.getMessage());
+        } catch (Exception e) {
+            logger.warn("Could not restore policy type database : {}", e.getMessage());
         }
     }
 
 
 
 package org.onap.ccsdk.oran.a1policymanagementservice.repository;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
 
 import lombok.Setter;
 
 public class Service {
+
+    static class InstantAdapter extends TypeAdapter<Instant> {
+        @Override
+        public Instant read(JsonReader reader) throws IOException {
+            reader.skipValue();
+            return Instant.now(); // Pretend that the last ping was now (after a restart)
+        }
+
+        @Override
+        public void write(JsonWriter writer, Instant value) throws IOException {
+            writer.value(value.toString());
+        }
+    }
+
+    static class DurationAdapter extends TypeAdapter<Duration> {
+        @Override
+        public Duration read(JsonReader reader) throws IOException {
+            long value = reader.nextLong();
+            return Duration.ofNanos(value);
+        }
+
+        @Override
+        public void write(JsonWriter writer, Duration value) throws IOException {
+            writer.value(value.toNanos());
+        }
+    }
+
+    public static Gson createGson() {
+        return new GsonBuilder() //
+                .registerTypeAdapter(Instant.class, new Service.InstantAdapter()) //
+                .registerTypeAdapter(Duration.class, new Service.DurationAdapter()) //
+                .create();
+    }
+
     @Getter
     private final String name;
 
 
 
 package org.onap.ccsdk.oran.a1policymanagementservice.repository;
 
+import com.google.gson.Gson;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Vector;
 
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
 import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.FileSystemUtils;
 
 public class Services {
     private static final Logger logger = LoggerFactory.getLogger(Services.class);
+    private static Gson gson = Service.createGson();
+    private final ApplicationConfig appConfig;
 
     private Map<String, Service> registeredServices = new HashMap<>();
 
+    public Services(@Autowired ApplicationConfig appConfig) {
+        this.appConfig = appConfig;
+        restoreFromDatabase();
+    }
+
     public synchronized Service getService(String name) throws ServiceException {
         Service s = registeredServices.get(name);
         if (s == null) {
         logger.debug("Put service: {}", service.getName());
         service.keepAlive();
         registeredServices.put(service.getName(), service);
+        store(service);
     }
 
     public synchronized Iterable<Service> getAll() {
     }
 
     public synchronized void remove(String name) {
-        registeredServices.remove(name);
+        Service service = registeredServices.remove(name);
+        if (service != null) {
+            try {
+                Files.delete(getPath(service));
+            } catch (Exception e) {
+
+            }
+        }
     }
 
     public synchronized int size() {
 
     public synchronized void clear() {
         registeredServices.clear();
+        try {
+            FileSystemUtils.deleteRecursively(getDatabasePath());
+        } catch (Exception e) {
+            logger.warn("Could not delete services database : {}", e.getMessage());
+        }
+    }
+
+    public void store(Service service) {
+        try {
+            Files.createDirectories(getDatabasePath());
+            try (PrintStream out = new PrintStream(new FileOutputStream(getFile(service)))) {
+                String str = gson.toJson(service);
+                out.print(str);
+            }
+        } catch (ServiceException e) {
+            logger.debug("Could not store service: {} {}", service.getName(), e.getMessage());
+        } catch (IOException e) {
+            logger.warn("Could not store pservice: {} {}", service.getName(), e.getMessage());
+        }
+    }
+
+    private File getFile(Service service) throws ServiceException {
+        return getPath(service).toFile();
+    }
+
+    private Path getPath(Service service) throws ServiceException {
+        return Path.of(getDatabaseDirectory(), service.getName() + ".json");
+    }
+
+    void restoreFromDatabase() {
+        try {
+            Files.createDirectories(getDatabasePath());
+            for (File file : getDatabasePath().toFile().listFiles()) {
+                String json = Files.readString(file.toPath());
+                Service service = gson.fromJson(json, Service.class);
+                this.registeredServices.put(service.getName(), service);
+            }
+            logger.debug("Restored type database,no of services: {}", this.registeredServices.size());
+        } catch (ServiceException e) {
+            logger.debug("Could not restore services database : {}", e.getMessage());
+        } catch (Exception e) {
+            logger.warn("Could not restore services database : {}", e.getMessage());
+        }
+    }
+
+    private String getDatabaseDirectory() throws ServiceException {
+        if (appConfig.getVardataDirectory() == null) {
+            throw new ServiceException("No storage provided");
+        }
+        return appConfig.getVardataDirectory() + "/database/services";
+    }
+
+    private Path getDatabasePath() throws ServiceException {
+        return Path.of(getDatabaseDirectory());
     }
 }
 
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
+import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
 import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
             RicConfigUpdate.Type event = updatedInfo.getType();
             if (event == RicConfigUpdate.Type.ADDED) {
                 logger.debug("RIC added {}", ricId);
-                Ric ric = new Ric(updatedInfo.getRicConfig());
-                return trySyncronizeSupportedTypes(ric) //
+
+                return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
                         .flatMap(this::addRic) //
+                        .flatMap(this::notifyServicesRicAvailable) //
+                        .doOnNext(ric -> ric.setState(RicState.AVAILABLE)) //
                         .flatMap(notUsed -> Mono.just(event));
             } else if (event == RicConfigUpdate.Type.REMOVED) {
                 logger.debug("RIC removed {}", ricId);
         }
         logger.debug("Added RIC: {}", ric.id());
 
-        ric.setState(RicState.AVAILABLE);
-
         return Mono.just(ric);
     }
 
+    private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
+        ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
+        return callbacks.notifyServicesRicAvailable(ric, services) //
+                .collectList() //
+                .flatMap(list -> Mono.just(ric));
+    }
+
     /**
      * Reads the configuration from file.
      */
 
                     logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
                     ric.setState(RicState.UNAVAILABLE); //
                 }) //
-                .doOnNext(notUsed -> onSynchronizationComplete(ric)) //
+                .flatMap(notUsed -> onSynchronizationComplete(ric)) //
                 .onErrorResume(t -> Mono.just(ric));
     }
 
         return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
     }
 
-    private void onSynchronizationComplete(Ric ric) {
+    private Mono<Ric> onSynchronizationComplete(Ric ric) {
         if (this.rics.get(ric.id()) == null) {
             logger.debug("Policies removed in removed ric: {}", ric.id());
-            return;
+            return Mono.empty();
         }
         logger.debug("Synchronization completed for: {}", ric.id());
         ric.setState(RicState.AVAILABLE);
-        notifyServices(ric);
+        ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
+        return callbacks.notifyServicesRicAvailable(ric, services) //
+                .collectList() //
+                .flatMap(list -> Mono.just(ric));
     }
 
     private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
         return Flux.concat(synchronizedTypes, deletePoliciesInRic);
     }
 
-    void notifyServices(Ric ric) {
-        ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
-        callbacks.notifyServicesRicSynchronized(ric, services);
-    }
-
     private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
         if (policyTypes.contains(policyTypeId)) {
             return Mono.just(policyTypes.get(policyTypeId));
 
         services.clear();
         a1ClientFactory.reset();
         this.rAppSimulator.getTestResults().clear();
+        this.a1ClientFactory.setPolicyTypes(policyTypes); // Default same types in RIC and in this app
     }
 
     @AfterEach
     }
 
     @Test
-    void testPersistency() throws ServiceException {
+    void testPersistencyPolicies() throws ServiceException {
         Ric ric = this.addRic("ric1");
         PolicyType type = this.addPolicyType("type1", ric.id());
-        PolicyTypes types = new PolicyTypes(this.applicationConfig);
-        assertThat(types.size()).isEqualTo(1);
 
         final int noOfPolicies = 100;
         for (int i = 0; i < noOfPolicies; ++i) {
 
         {
             Policies policies = new Policies(this.applicationConfig);
-            policies.restoreFromDatabase(ric, types);
+            policies.restoreFromDatabase(ric, this.policyTypes);
             assertThat(policies.size()).isEqualTo(noOfPolicies);
         }
 
         {
             restClient().delete("/policies/id2").block();
             Policies policies = new Policies(this.applicationConfig);
-            policies.restoreFromDatabase(ric, types);
+            policies.restoreFromDatabase(ric, this.policyTypes);
             assertThat(policies.size()).isEqualTo(noOfPolicies - 1);
         }
+    }
 
-        {
-            // Test adding the RIC from configuration
-            RicConfig config = ric.getConfig();
-            this.rics.remove("ric1");
-            ApplicationConfig.RicConfigUpdate update =
-                    new ApplicationConfig.RicConfigUpdate(config, ApplicationConfig.RicConfigUpdate.Type.ADDED);
-            refreshConfigTask.handleUpdatedRicConfig(update).block();
-            ric = this.rics.getRic("ric1");
-            assertThat(ric.getSupportedPolicyTypes().size()).isEqualTo(1);
-        }
+    @Test
+    void testPersistencyPolicyTypes() throws ServiceException {
+        Ric ric = this.addRic("ric1");
+        this.addPolicyType("type1", ric.id());
+        PolicyTypes types = new PolicyTypes(this.applicationConfig);
+        assertThat(types.size()).isEqualTo(1);
+    }
+
+    @Test
+    void testPersistencyService() throws ServiceException {
+        final String SERVICE = "serviceName";
+        putService(SERVICE, 1234, HttpStatus.CREATED);
+        assertThat(this.services.size()).isEqualTo(1);
+        Service service = this.services.getService(SERVICE);
+
+        Services servicesRestored = new Services(this.applicationConfig);
+        Service serviceRestored = servicesRestored.getService(SERVICE);
+        assertThat(servicesRestored.size()).isEqualTo(1);
+        assertThat(serviceRestored.getCallbackUrl()).isEqualTo(service.getCallbackUrl());
+        assertThat(serviceRestored.getKeepAliveInterval()).isEqualTo(service.getKeepAliveInterval());
+
+        // check that the service can be deleted
+        this.services.remove(SERVICE);
+        servicesRestored = new Services(this.applicationConfig);
+        assertThat(servicesRestored.size()).isEqualTo(0);
+    }
+
+    @Test
+    void testAddingRicFromConfiguration() throws Exception {
+        // Test adding the RIC from configuration
+
+        final String RIC = "ric1";
+        final String TYPE = "type123";
+        PolicyTypes nearRtRicPolicyTypes = new PolicyTypes(this.applicationConfig);
+        nearRtRicPolicyTypes.put(createPolicyType(TYPE));
+        this.a1ClientFactory.setPolicyTypes(nearRtRicPolicyTypes);
+
+        putService("service");
+
+        RicConfig config = ricConfig(RIC, "me1");
+        ApplicationConfig.RicConfigUpdate update =
+                new ApplicationConfig.RicConfigUpdate(config, ApplicationConfig.RicConfigUpdate.Type.ADDED);
+        refreshConfigTask.handleUpdatedRicConfig(update).block();
+        waitForRicState(RIC, RicState.AVAILABLE);
+
+        // Test that the type has been synched
+        Ric addedRic = this.rics.getRic(RIC);
+        assertThat(addedRic.getSupportedPolicyTypes().size()).isEqualTo(1);
+        assertThat(addedRic.getSupportedPolicyTypes().iterator().next().getId()).isEqualTo(TYPE);
+
+        // Check that a service callback for the AVAILABLE RIC is invoked
+        RappSimulatorController.TestResults receivedCallbacks = rAppSimulator.getTestResults();
+        assertThat(receivedCallbacks.getReceivedInfo().size()).isEqualTo(1);
+        ServiceCallbackInfo callbackInfo = receivedCallbacks.getReceivedInfo().get(0);
+        assertThat(callbackInfo.ricId).isEqualTo(RIC);
+        assertThat(callbackInfo.eventType).isEqualTo(ServiceCallbackInfo.EventType.AVAILABLE);
     }
 
     @Test
         return addRic(ricId, null);
     }
 
-    private Ric addRic(String ricId, String managedElement) {
-        if (rics.get(ricId) != null) {
-            return rics.get(ricId);
-        }
+    private RicConfig ricConfig(String ricId, String managedElement) {
         List<String> mes = new ArrayList<>();
         if (managedElement != null) {
             mes.add(managedElement);
         }
-        RicConfig conf = ImmutableRicConfig.builder() //
+        return ImmutableRicConfig.builder() //
                 .ricId(ricId) //
                 .baseUrl(ricId) //
                 .managedElementIds(mes) //
                 .controllerName("") //
                 .build();
+    }
+
+    private Ric addRic(String ricId, String managedElement) {
+        if (rics.get(ricId) != null) {
+            return rics.get(ricId);
+        }
+
+        RicConfig conf = ricConfig(ricId, managedElement);
         Ric ric = new Ric(conf);
         ric.setState(Ric.RicState.AVAILABLE);
         this.rics.put(ric);
 
                     content = @Content(schema = @Schema(implementation = VoidResponse.class)))} //
     )
 
-    public ResponseEntity<Object> jobStatusCallback( //
+    public ResponseEntity<Object> serviceCallback( //
             @RequestBody ServiceCallbackInfo body) {
         logger.info("R-App callback body: {}", gson.toJson(body));
         this.testResults.receivedInfo.add(body);
 
             boolean stubConfigFileExists) {
 
         RefreshConfigTask obj = spy(new RefreshConfigTask(configurationFileMock, appConfig, rics, policies,
-                new Services(), new PolicyTypes(appConfig), new A1ClientFactory(appConfig)));
+                new Services(appConfig), new PolicyTypes(appConfig), new A1ClientFactory(appConfig)));
         if (stubConfigFileExists) {
             when(configurationFileMock.readFile()).thenReturn(Optional.empty());
         }
 
     void init() {
         policyTypes = new PolicyTypes(appConfig);
         policies = new Policies(appConfig);
-        services = new Services();
+        services = new Services(appConfig);
         rics = new Rics();
         RIC_1.setState(RicState.UNAVAILABLE);
         RIC_1.clearSupportedPolicyTypes();
         verifyNoMoreInteractions(a1ClientMock);
 
         verify(synchronizerUnderTest).run(RIC_1);
-        verify(synchronizerUnderTest).notifyServices(any());
 
         assertThat(policyTypes.size()).isEqualTo(1);
         assertThat(policies.size()).isZero();
 
     }
 
     private void setUpRepositoryWithKeepAliveInterval(Duration keepAliveInterval) {
-        services = new Services();
+        ApplicationConfig appConfig = new ApplicationConfig();
+        services = new Services(appConfig);
         service = new Service(SERVICE_NAME, keepAliveInterval, "callbackUrl");
         services.put(service);
-
-        ApplicationConfig appConfig = new ApplicationConfig();
         policies = new Policies(appConfig);
         policies.put(policy);
     }
 
                 "required": true
             },
             "description": "The URL to this call is registerred at Service registration.",
-            "operationId": "jobStatusCallback",
+            "operationId": "serviceCallback",
             "responses": {"200": {
                 "description": "OK",
                 "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
 
       - Callbacks
       summary: Callback for Near-RT RIC status
       description: The URL to this call is registerred at Service registration.
-      operationId: jobStatusCallback
+      operationId: serviceCallback
       requestBody:
         content:
           application/json: