Add multi-thread support in FileSystemReceptionHander 99/84799/1
authorLianhao Lu <lianhao.lu@intel.com>
Wed, 10 Apr 2019 06:40:11 +0000 (14:40 +0800)
committerLianhao Lu <lianhao.lu@intel.com>
Wed, 10 Apr 2019 06:40:11 +0000 (14:40 +0800)
By adding multi-thread support in FileSystemReceptionHander, we can have
a more thorough s3p test to test performance & stablity in multithread
situations.

Change-Id: Id263435531e26dcbadfbda6f82b26ac54a72ba1a
Issue-ID: POLICY-1274
Signed-off-by: Lianhao Lu <lianhao.lu@intel.com>
plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java
plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java
plugins/reception-plugins/src/test/resources/handling-filesystem.json

index fd4f69c..2072a0e 100644 (file)
@@ -36,15 +36,18 @@ public class FileClientHandler implements Runnable {
 
     private FileSystemReceptionHandler fileReceptionHandler;
     private String watchPath;
+    private int maxThread;
 
     /**
      * Constructs an instance of {@link FileClientHandler} class.
      *
      * @param fileReceptionHandler the fileReceptionHandler
      */
-    public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) {
+    public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath,
+            final int maxThread) {
         this.fileReceptionHandler = fileReceptionHandler;
         this.watchPath = watchPath;
+        this.maxThread = maxThread;
     }
 
     /**
@@ -53,7 +56,7 @@ public class FileClientHandler implements Runnable {
     @Override
     public void run() {
         try {
-            fileReceptionHandler.initFileWatcher(watchPath);
+            fileReceptionHandler.initFileWatcher(watchPath, maxThread);
         } catch (final IOException ex) {
             LOGGER.error("Failed initializing file watcher thread", ex);
         }
index a3f6fab..0ddd2a9 100644 (file)
@@ -31,6 +31,8 @@ import java.nio.file.Paths;
 import java.nio.file.WatchEvent;
 import java.nio.file.WatchKey;
 import java.nio.file.WatchService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.ZipFile;
 
@@ -59,7 +61,9 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
         try {
             final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
                     ParameterService.get(parameterGroupName);
-            final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath());
+            final FileClientHandler fileClientHandler = new FileClientHandler(this,
+                    handlerParameters.getWatchPath(),
+                    handlerParameters.getMaxThread());
             final Thread fileWatcherThread = new Thread(fileClientHandler);
             fileWatcherThread.start();
         } catch (final Exception ex) {
@@ -89,13 +93,13 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
      *
      * @param watchPath Path to watch
      */
-    public void initFileWatcher(final String watchPath) throws IOException {
+    public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
         try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
             final Path dir = Paths.get(watchPath);
             dir.register(watcher, ENTRY_CREATE);
             LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
-            startWatchService(watcher, dir);
-        } catch (final InterruptedException ex) {
+            startWatchService(watcher, dir, maxThread);
+        } catch (final Exception ex) {
             LOGGER.error("FileWatcher initialization failed", ex);
             Thread.currentThread().interrupt();
         }
@@ -106,11 +110,16 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
      *
      * @param watcher the watcher
      * @param dir the watch directory
+     * @param maxThread the max thread number
      * @throws InterruptedException if it occurs
      */
     @SuppressWarnings("unchecked")
-    protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException {
+    protected void startWatchService(final WatchService watcher,
+            final Path dir,
+            int maxThread) throws InterruptedException, NullPointerException, IllegalArgumentException {
         WatchKey key;
+        ExecutorService pool = Executors.newFixedThreadPool(maxThread);
+
         running = true;
         while (running) {
             key = watcher.take();
@@ -118,12 +127,20 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
             for (final WatchEvent<?> event : key.pollEvents()) {
                 final WatchEvent<Path> ev = (WatchEvent<Path>) event;
                 final Path fileName = ev.context();
-                LOGGER.debug("new CSAR found: {}", fileName);
-                DistributionStatisticsManager.updateTotalDistributionCount();
-                final String fullFilePath = dir.toString() + File.separator + fileName.toString();
-                waitForFileToBeReady(fullFilePath);
-                createPolicyInputAndCallHandler(fullFilePath);
-                LOGGER.debug("CSAR complete: {}", fileName);
+                pool.execute(new Runnable() {
+                    public void run() {
+                        LOGGER.debug("new CSAR found: {}", fileName);
+                        DistributionStatisticsManager.updateTotalDistributionCount();
+                        final String fullFilePath = dir.toString() + File.separator + fileName.toString();
+                        try {
+                            waitForFileToBeReady(fullFilePath);
+                            createPolicyInputAndCallHandler(fullFilePath);
+                            LOGGER.debug("CSAR complete: {}", fileName);
+                        } catch (InterruptedException e) {
+                            LOGGER.error("waitForFileToBeReady interrupted", e);
+                        }
+                    }
+                });
             }
             final boolean valid = key.reset();
             if (!valid) {
@@ -131,6 +148,7 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
                 break;
             }
         }
+        pool.shutdown();
     }
 
     /**
index 693ff0e..415e0d9 100644 (file)
@@ -27,6 +27,7 @@ package org.onap.policy.distribution.reception.handling.file;
 public class FileSystemReceptionHandlerConfigurationParameterBuilder {
 
     private String watchPath;
+    private int maxThread = 1;
 
     /**
      * Set watchPath to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
@@ -38,6 +39,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder {
         return this;
     }
 
+    /**
+     * Set maxThread to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
+     *
+     * @param maxThread the max thread number in the thread pool
+     */
+    public FileSystemReceptionHandlerConfigurationParameterBuilder setMaxThread(final int maxThread) {
+        this.maxThread = maxThread;
+        return this;
+    }
 
     /**
      * Returns the watchPath of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
@@ -47,6 +57,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder {
     public String getWatchPath() {
         return watchPath;
     }
+
+    /**
+     * Returns the maxThread of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
+     *
+     * @return the maxThread
+     */
+    public int getMaxThread() {
+        return maxThread;
+    }
 }
 
 
index dd50dc7..f904581 100644 (file)
@@ -24,6 +24,7 @@ import java.io.File;
 
 import org.onap.policy.common.parameters.GroupValidationResult;
 import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.utils.validation.ParameterValidationUtils;
 import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigurationParameterGroup;
 
 /**
@@ -33,6 +34,7 @@ import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigu
 public class FileSystemReceptionHandlerConfigurationParameterGroup extends ReceptionHandlerConfigurationParameterGroup {
 
     private String watchPath;
+    private int maxThread;
 
     /**
      * The constructor for instantiating {@link FileSystemReceptionHandlerConfigurationParameterGroup} class.
@@ -42,12 +44,17 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep
     public FileSystemReceptionHandlerConfigurationParameterGroup(
             final FileSystemReceptionHandlerConfigurationParameterBuilder builder) {
         watchPath = builder.getWatchPath();
+        maxThread = builder.getMaxThread();
     }
 
     public String getWatchPath() {
         return watchPath;
     }
 
+    public int getMaxThread() {
+        return maxThread;
+    }
+
     /**
      * {@inheritDoc}.
      */
@@ -55,6 +62,9 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep
     public GroupValidationResult validate() {
         final GroupValidationResult validationResult = new GroupValidationResult(this);
         validatePathElement(validationResult, watchPath, "watchPath");
+        if (!ParameterValidationUtils.validateIntParameter(maxThread)) {
+            validationResult.setResult("maxThread", ValidationStatus.INVALID, "must be a positive integer");
+        }
         return validationResult;
     }
 
index fdb0100..556b1d6 100644 (file)
@@ -93,7 +93,8 @@ public class TestFileSystemReceptionHandler {
     @Test
     public final void testInit() throws IOException, InterruptedException {
         final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
-        Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class));
+        Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class),
+                Mockito.anyInt());
         try {
             sypHandler.initializeReception(pssdConfigParameters.getName());
         } catch (final Exception exp) {
@@ -106,7 +107,8 @@ public class TestFileSystemReceptionHandler {
     public final void testDestroy() throws IOException {
         try {
             final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
-            Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class));
+            Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class),
+                    Mockito.anyInt());
             sypHandler.initializeReception(pssdConfigParameters.getName());
             sypHandler.destroy();
         } catch (final Exception exp) {
@@ -141,7 +143,7 @@ public class TestFileSystemReceptionHandler {
 
         final Thread th = new Thread(() -> {
             try {
-                sypHandler.initFileWatcher(watchPath);
+                sypHandler.initFileWatcher(watchPath, 2);
             } catch (final IOException ex) {
                 LOGGER.error("testMain failed", ex);
             }
index 92d9443..1d32b19 100644 (file)
@@ -53,7 +53,7 @@ public class TestFileSystemReceptionHandlerConfigurationParameterGroup {
             validPath = tempFolder.getRoot().getAbsolutePath();
 
             final FileSystemReceptionHandlerConfigurationParameterBuilder builder =
-                    new FileSystemReceptionHandlerConfigurationParameterBuilder().setWatchPath(validPath);
+                new FileSystemReceptionHandlerConfigurationParameterBuilder().setWatchPath(validPath).setMaxThread(2);
             configParameters = new FileSystemReceptionHandlerConfigurationParameterGroup(builder);
         } catch (final Exception e) {
             fail("test should not thrown an exception here: " + e.getMessage());
@@ -61,6 +61,7 @@ public class TestFileSystemReceptionHandlerConfigurationParameterGroup {
         final GroupValidationResult validationResult = configParameters.validate();
         assertTrue(validationResult.isValid());
         assertEquals(validPath, configParameters.getWatchPath());
+        assertEquals(2, configParameters.getMaxThread());
     }
 
     @Test