private FileSystemReceptionHandler fileReceptionHandler;
private String watchPath;
+ private int maxThread;
/**
* Constructs an instance of {@link FileClientHandler} class.
*
* @param fileReceptionHandler the fileReceptionHandler
*/
- public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) {
+ public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath,
+ final int maxThread) {
this.fileReceptionHandler = fileReceptionHandler;
this.watchPath = watchPath;
+ this.maxThread = maxThread;
}
/**
@Override
public void run() {
try {
- fileReceptionHandler.initFileWatcher(watchPath);
+ fileReceptionHandler.initFileWatcher(watchPath, maxThread);
} catch (final IOException ex) {
LOGGER.error("Failed initializing file watcher thread", ex);
}
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipFile;
try {
final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
ParameterService.get(parameterGroupName);
- final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath());
+ final FileClientHandler fileClientHandler = new FileClientHandler(this,
+ handlerParameters.getWatchPath(),
+ handlerParameters.getMaxThread());
final Thread fileWatcherThread = new Thread(fileClientHandler);
fileWatcherThread.start();
} catch (final Exception ex) {
*
* @param watchPath Path to watch
*/
- public void initFileWatcher(final String watchPath) throws IOException {
+ public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
final Path dir = Paths.get(watchPath);
dir.register(watcher, ENTRY_CREATE);
LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
- startWatchService(watcher, dir);
- } catch (final InterruptedException ex) {
+ startWatchService(watcher, dir, maxThread);
+ } catch (final Exception ex) {
LOGGER.error("FileWatcher initialization failed", ex);
Thread.currentThread().interrupt();
}
*
* @param watcher the watcher
* @param dir the watch directory
+ * @param maxThread the max thread number
* @throws InterruptedException if it occurs
*/
@SuppressWarnings("unchecked")
- protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException {
+ protected void startWatchService(final WatchService watcher,
+ final Path dir,
+ int maxThread) throws InterruptedException, NullPointerException, IllegalArgumentException {
WatchKey key;
+ ExecutorService pool = Executors.newFixedThreadPool(maxThread);
+
running = true;
while (running) {
key = watcher.take();
for (final WatchEvent<?> event : key.pollEvents()) {
final WatchEvent<Path> ev = (WatchEvent<Path>) event;
final Path fileName = ev.context();
- LOGGER.debug("new CSAR found: {}", fileName);
- DistributionStatisticsManager.updateTotalDistributionCount();
- final String fullFilePath = dir.toString() + File.separator + fileName.toString();
- waitForFileToBeReady(fullFilePath);
- createPolicyInputAndCallHandler(fullFilePath);
- LOGGER.debug("CSAR complete: {}", fileName);
+ pool.execute(new Runnable() {
+ public void run() {
+ LOGGER.debug("new CSAR found: {}", fileName);
+ DistributionStatisticsManager.updateTotalDistributionCount();
+ final String fullFilePath = dir.toString() + File.separator + fileName.toString();
+ try {
+ waitForFileToBeReady(fullFilePath);
+ createPolicyInputAndCallHandler(fullFilePath);
+ LOGGER.debug("CSAR complete: {}", fileName);
+ } catch (InterruptedException e) {
+ LOGGER.error("waitForFileToBeReady interrupted", e);
+ }
+ }
+ });
}
final boolean valid = key.reset();
if (!valid) {
break;
}
}
+ pool.shutdown();
}
/**
public class FileSystemReceptionHandlerConfigurationParameterBuilder {
private String watchPath;
+ private int maxThread = 1;
/**
* Set watchPath to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
return this;
}
+ /**
+ * Set maxThread to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
+ *
+ * @param maxThread the max thread number in the thread pool
+ */
+ public FileSystemReceptionHandlerConfigurationParameterBuilder setMaxThread(final int maxThread) {
+ this.maxThread = maxThread;
+ return this;
+ }
/**
* Returns the watchPath of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
public String getWatchPath() {
return watchPath;
}
+
+ /**
+ * Returns the maxThread of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
+ *
+ * @return the maxThread
+ */
+ public int getMaxThread() {
+ return maxThread;
+ }
}
import org.onap.policy.common.parameters.GroupValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.utils.validation.ParameterValidationUtils;
import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigurationParameterGroup;
/**
public class FileSystemReceptionHandlerConfigurationParameterGroup extends ReceptionHandlerConfigurationParameterGroup {
private String watchPath;
+ private int maxThread;
/**
* The constructor for instantiating {@link FileSystemReceptionHandlerConfigurationParameterGroup} class.
public FileSystemReceptionHandlerConfigurationParameterGroup(
final FileSystemReceptionHandlerConfigurationParameterBuilder builder) {
watchPath = builder.getWatchPath();
+ maxThread = builder.getMaxThread();
}
public String getWatchPath() {
return watchPath;
}
+ public int getMaxThread() {
+ return maxThread;
+ }
+
/**
* {@inheritDoc}.
*/
public GroupValidationResult validate() {
final GroupValidationResult validationResult = new GroupValidationResult(this);
validatePathElement(validationResult, watchPath, "watchPath");
+ if (!ParameterValidationUtils.validateIntParameter(maxThread)) {
+ validationResult.setResult("maxThread", ValidationStatus.INVALID, "must be a positive integer");
+ }
return validationResult;
}
@Test
public final void testInit() throws IOException, InterruptedException {
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
- Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class));
+ Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class),
+ Mockito.anyInt());
try {
sypHandler.initializeReception(pssdConfigParameters.getName());
} catch (final Exception exp) {
public final void testDestroy() throws IOException {
try {
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
- Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class));
+ Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class),
+ Mockito.anyInt());
sypHandler.initializeReception(pssdConfigParameters.getName());
sypHandler.destroy();
} catch (final Exception exp) {
final Thread th = new Thread(() -> {
try {
- sypHandler.initFileWatcher(watchPath);
+ sypHandler.initFileWatcher(watchPath, 2);
} catch (final IOException ex) {
LOGGER.error("testMain failed", ex);
}
validPath = tempFolder.getRoot().getAbsolutePath();
final FileSystemReceptionHandlerConfigurationParameterBuilder builder =
- new FileSystemReceptionHandlerConfigurationParameterBuilder().setWatchPath(validPath);
+ new FileSystemReceptionHandlerConfigurationParameterBuilder().setWatchPath(validPath).setMaxThread(2);
configParameters = new FileSystemReceptionHandlerConfigurationParameterGroup(builder);
} catch (final Exception e) {
fail("test should not thrown an exception here: " + e.getMessage());
final GroupValidationResult validationResult = configParameters.validate();
assertTrue(validationResult.isValid());
assertEquals(validPath, configParameters.getWatchPath());
+ assertEquals(2, configParameters.getMaxThread());
}
@Test
{
"name": "parameterConfig1",
- "watchPath": "/tmp"
+ "watchPath": "/tmp",
+ "maxThread": 2
}