Purging of cached information 41/78741/4
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 27 Feb 2019 14:22:51 +0000 (14:22 +0000)
committerPatrikBuhr <patrik.buhr@est.tech>
Wed, 27 Feb 2019 14:22:51 +0000 (14:22 +0000)
The datafile collector has a cache will all previously published files.
The cache is on regular intevals purged so that non used entries
are removed so that it does not grow infinitely.

Added a unit test.

Change-Id: I8897fee4522c97031f735b1d6774803dcb73926b
Issue-ID: DCAEGEN2-1118
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java [new file with mode: 0644]
datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java [new file with mode: 0644]
datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java

index 12f303e..bc21f96 100644 (file)
@@ -21,7 +21,9 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
+
 import javax.annotation.PostConstruct;
+
 import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Configuration;
@@ -29,6 +31,7 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
+
 import io.swagger.annotations.ApiOperation;
 import reactor.core.publisher.Mono;
 
@@ -39,8 +42,9 @@ import reactor.core.publisher.Mono;
 @EnableScheduling
 public class SchedulerConfig {
 
-    private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
-    private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
+    private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
+    private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
+    private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
     private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
 
     private final TaskScheduler taskScheduler;
@@ -77,11 +81,13 @@ public class SchedulerConfig {
     @ApiOperation(value = "Start task if possible")
     public synchronized boolean tryToStartTask() {
         if (scheduledFutureList.isEmpty()) {
-            scheduledFutureList.add(taskScheduler
-                .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
-                    Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
+            scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+                    SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
             scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
-                Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)));
+                    SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+            scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+                   SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+
             return true;
         } else {
             return false;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
new file mode 100644 (file)
index 0000000..2cb8411
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A cache of all files that already has been published. Key is the local file path and the value is
+ * a time stamp, when the key was last used.
+ */
+public class PublishedFileCache {
+    private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+
+    public Instant put(Path path) {
+        return publishedFiles.put(path, Instant.now());
+    }
+
+    public void remove(Path localFileName) {
+        publishedFiles.remove(localFileName);
+    }
+
+    public void purge(Instant now) {
+        for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
+            Map.Entry<Path, Instant> pair = it.next();
+            if (isCachedPublishedFileOutdated(now, pair.getValue())) {
+                it.remove();
+            }
+        }
+    }
+
+    public int size() {
+        return publishedFiles.size();
+    }
+
+    private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+        final int timeToKeepInfoInSeconds = 60 * 60 * 24;
+        return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
+    }
+
+
+}
index 37b7a55..50f5431 100644 (file)
@@ -20,11 +20,9 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -34,6 +32,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -55,8 +54,9 @@ public class ScheduledTasks {
     /** Data needed for fetching of files from one PNF */
     private class FileCollectionData {
         final FileData fileData;
-        final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
-                                                  // event
+        final FileCollector collectorTask; // Same object, ftp session etc. can be used for each
+                                           // file in one VES
+                                           // event
         final MessageMetaData metaData;
 
         FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
@@ -69,7 +69,8 @@ public class ScheduledTasks {
     private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
     private final AppConfig applicationConfiguration;
     private final AtomicInteger taskCounter = new AtomicInteger();
-    private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
+
+    PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
 
     /**
      * Constructor for task registration in Datafile Workflow.
@@ -84,7 +85,7 @@ public class ScheduledTasks {
     }
 
     /**
-     * Main function for scheduling Datafile Workflow.
+     * Main function for scheduling for the file collection Workflow.
      */
     public void scheduleMainDatafileEventTask() {
         logger.trace("Execution of tasks was registered");
@@ -105,6 +106,13 @@ public class ScheduledTasks {
         //@formatter:on
     }
 
+    /**
+     * called in regular intervals to remove out-dated cached information
+     */
+    public void purgeCachedInformation(Instant now) {
+        alreadyPublishedFiles.purge(now);
+    }
+
     private void onComplete() {
         logger.info("Datafile tasks have been completed");
     }
@@ -121,15 +129,15 @@ public class ScheduledTasks {
         List<FileCollectionData> fileCollects = new ArrayList<>();
 
         for (FileData fileData : availableFiles.files()) {
-            FileCollector task = new FileCollector(applicationConfiguration,
-                    new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+            FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
+                    new SftpClient(fileData.fileServerData()));
             fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
         }
         return Flux.fromIterable(fileCollects);
     }
 
     private boolean shouldBePublished(FileCollectionData task) {
-        return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
+        return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
     }
 
     private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
@@ -158,7 +166,6 @@ public class ScheduledTasks {
 
         return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
                 .onErrorResume(exception -> handlePublishFailure(model, exception));
-
     }
 
     private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
@@ -179,7 +186,7 @@ public class ScheduledTasks {
 
         final DMaaPMessageConsumerTask messageConsumerTask =
                 new DMaaPMessageConsumerTask(this.applicationConfiguration);
-        return messageConsumerTask.execute()
+        return messageConsumerTask.execute() //
                 .onErrorResume(this::handleConsumeMessageFailure);
     }
 
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java
new file mode 100644 (file)
index 0000000..7b38ee4
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class PublishedFileCacheTest {
+
+    private static PublishedFileCache testObject;
+
+    @BeforeAll
+    public static void setUp() {
+        testObject = new PublishedFileCache();
+    }
+
+    @Test
+    public void purgeFiles_timeNotExpired() {
+        Assertions.assertNull(testObject.put(Paths.get("A")));
+        Assertions.assertNotNull(testObject.put(Paths.get("A")));
+        testObject.put(Paths.get("B"));
+
+        testObject.purge(Instant.now());
+        Assertions.assertEquals(2, testObject.size());
+    }
+
+    @Test
+    public void purgeFiles_timeExpired() {
+        testObject.put(Paths.get("A"));
+        testObject.put(Paths.get("B"));
+        testObject.put(Paths.get("C"));
+
+        testObject.purge(Instant.MAX);
+        Assertions.assertEquals(0, testObject.size());
+    }
+
+    @Test
+    public void purgeFiles_remove() {
+        Path path = Paths.get("A");
+        testObject.put(path);
+        Assertions.assertEquals(1, testObject.size());
+        testObject.remove(path);
+        Assertions.assertEquals(0, testObject.size());
+    }
+}
index 102388e..90fb933 100644 (file)
@@ -19,6 +19,7 @@ package org.onap.dcaegen2.collectors.datafile.ftp;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.io.IOUtils.toByteArray;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
@@ -72,14 +73,9 @@ public class SftpClientTest {
         SftpClient sftpClient = new SftpClient(expectedFileServerData);
         sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
 
-        String errorMessage = "";
-        try {
-            sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
-        } catch (Exception e) {
-            errorMessage = e.getMessage();
-        }
 
-        assertTrue(errorMessage.contains("Auth fail"));
+        assertThatThrownBy(() -> sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE))
+                .hasMessageContaining("Unable to get file from xNF");
     }
 
     @Test