The datafile collector has a cache will all previously published files.
The cache is on regular intevals purged so that non used entries
are removed so that it does not grow infinitely.
Added a unit test.
Change-Id: I8897fee4522c97031f735b1d6774803dcb73926b
Issue-ID: DCAEGEN2-1118
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@EnableScheduling
public class SchedulerConfig {
- private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
- private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
+ private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
+ private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
+ private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler
- .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
- Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+ SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
- Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)));
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+ SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+
return true;
} else {
return false;
--- /dev/null
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A cache of all files that already has been published. Key is the local file path and the value is
+ * a time stamp, when the key was last used.
+ */
+public class PublishedFileCache {
+ private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+
+ public Instant put(Path path) {
+ return publishedFiles.put(path, Instant.now());
+ }
+
+ public void remove(Path localFileName) {
+ publishedFiles.remove(localFileName);
+ }
+
+ public void purge(Instant now) {
+ for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<Path, Instant> pair = it.next();
+ if (isCachedPublishedFileOutdated(now, pair.getValue())) {
+ it.remove();
+ }
+ }
+ }
+
+ public int size() {
+ return publishedFiles.size();
+ }
+
+ private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+ final int timeToKeepInfoInSeconds = 60 * 60 * 24;
+ return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
+ }
+
+
+}
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/** Data needed for fetching of files from one PNF */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
- // event
+ final FileCollector collectorTask; // Same object, ftp session etc. can be used for each
+ // file in one VES
+ // event
final MessageMetaData metaData;
FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
private final AtomicInteger taskCounter = new AtomicInteger();
- private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
+
+ PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
* Constructor for task registration in Datafile Workflow.
}
/**
- * Main function for scheduling Datafile Workflow.
+ * Main function for scheduling for the file collection Workflow.
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
//@formatter:on
}
+ /**
+ * called in regular intervals to remove out-dated cached information
+ */
+ public void purgeCachedInformation(Instant now) {
+ alreadyPublishedFiles.purge(now);
+ }
+
private void onComplete() {
logger.info("Datafile tasks have been completed");
}
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- FileCollector task = new FileCollector(applicationConfiguration,
- new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+ FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
+ new SftpClient(fileData.fileServerData()));
fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
private boolean shouldBePublished(FileCollectionData task) {
- return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
+ return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
}
private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
.onErrorResume(exception -> handlePublishFailure(model, exception));
-
}
private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
final DMaaPMessageConsumerTask messageConsumerTask =
new DMaaPMessageConsumerTask(this.applicationConfiguration);
- return messageConsumerTask.execute()
+ return messageConsumerTask.execute() //
.onErrorResume(this::handleConsumeMessageFailure);
}
--- /dev/null
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class PublishedFileCacheTest {
+
+ private static PublishedFileCache testObject;
+
+ @BeforeAll
+ public static void setUp() {
+ testObject = new PublishedFileCache();
+ }
+
+ @Test
+ public void purgeFiles_timeNotExpired() {
+ Assertions.assertNull(testObject.put(Paths.get("A")));
+ Assertions.assertNotNull(testObject.put(Paths.get("A")));
+ testObject.put(Paths.get("B"));
+
+ testObject.purge(Instant.now());
+ Assertions.assertEquals(2, testObject.size());
+ }
+
+ @Test
+ public void purgeFiles_timeExpired() {
+ testObject.put(Paths.get("A"));
+ testObject.put(Paths.get("B"));
+ testObject.put(Paths.get("C"));
+
+ testObject.purge(Instant.MAX);
+ Assertions.assertEquals(0, testObject.size());
+ }
+
+ @Test
+ public void purgeFiles_remove() {
+ Path path = Paths.get("A");
+ testObject.put(path);
+ Assertions.assertEquals(1, testObject.size());
+ testObject.remove(path);
+ Assertions.assertEquals(0, testObject.size());
+ }
+}
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.io.IOUtils.toByteArray;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
SftpClient sftpClient = new SftpClient(expectedFileServerData);
sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
- String errorMessage = "";
- try {
- sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
- } catch (Exception e) {
- errorMessage = e.getMessage();
- }
- assertTrue(errorMessage.contains("Auth fail"));
+ assertThatThrownBy(() -> sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE))
+ .hasMessageContaining("Unable to get file from xNF");
}
@Test