Add configuration number of threads in files processing. 90/117390/9
authorTomasz Wrobel <tomasz.wrobel@nokia.com>
Wed, 3 Feb 2021 08:32:09 +0000 (09:32 +0100)
committerTomasz Wrobel <tomasz.wrobel@nokia.com>
Tue, 9 Feb 2021 11:30:22 +0000 (12:30 +0100)
Issue-ID: DCAEGEN2-2600
Signed-off-by: Tomasz Wrobel <tomasz.wrobel@nokia.com>
Change-Id: I04feb27698ef21196ab218de4bdd97be1fc85284

src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
src/main/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfig.java
src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
src/test/java/org/onap/dcaegen2/services/pmmapper/config/FilesProcessingConfigTest.java

index 7aab08a..4583df3 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019-2020 Nordix Foundation.
+ *  Copyright (C) 2021 Nokia.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,6 +31,8 @@ import lombok.Data;
 import lombok.NonNull;
 import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
 import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
+import org.onap.dcaegen2.services.pmmapper.config.EnvironmentReader;
+import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig;
 import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
 import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
 import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
@@ -65,6 +68,9 @@ import java.util.concurrent.TimeUnit;
 
 @Data
 public class App {
+
+    private static final int PREFETCH_ONE_PER_THREAD = 1;
+
     static {
         System.setProperty(ContextInitializer.CONFIG_FILE_PROPERTY, "/opt/app/pm-mapper/etc/logback.xml");
     }
@@ -77,6 +83,8 @@ public class App {
     private static Path templates = Paths.get("/opt/app/pm-mapper/etc/templates/");
     private static Path schemas = Paths.get("/opt/app/pm-mapper/etc/schemas/");
 
+    private final FilesProcessingConfig processingConfig;
+
     private MapperConfig mapperConfig;
     private MetadataFilter metadataFilter;
     private MeasConverter measConverter;
@@ -105,13 +113,16 @@ public class App {
      * @param httpsPort https port to start https server on.
      * @param configHandler instance of the ConfigurationHandler used to acquire config.
      */
-    public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler) {
+    public App(Path templatesDirectory, Path schemasDirectory, int httpPort, int httpsPort, ConfigHandler configHandler, FilesProcessingConfig filesProcessingConfig)
+        throws EnvironmentConfigException {
         try {
             this.mapperConfig = configHandler.getMapperConfig();
         } catch (EnvironmentConfigException | CBSServerError | MapperConfigException e) {
             logger.unwrap().error("Failed to acquire initial configuration, Application cannot start", e);
             throw new IllegalStateException("Config acquisition failed");
         }
+        this.processingConfig = filesProcessingConfig;
+
         this.httpPort = httpPort;
         this.httpsPort = httpsPort;
         this.metadataFilter = new MetadataFilter(mapperConfig);
@@ -124,11 +135,13 @@ public class App {
         this.flux = Flux.create(eventFluxSink -> this.fluxSink = eventFluxSink);
         this.configScheduler = Schedulers.newSingle("Config");
 
+        int processingThreads = processingConfig.getThreadsCount();
+
         this.flux.onBackpressureDrop(App::handleBackPressure)
                 .doOnNext(App::receiveRequest)
-                .limitRate(1)
-                .parallel()
-                .runOn(Schedulers.newParallel(""), 1)
+                .limitRate(processingConfig.getLimitRate())
+                .parallel(processingThreads)
+                .runOn(Schedulers.newParallel("Thread", processingThreads), PREFETCH_ONE_PER_THREAD)
                 .doOnNext(event -> MDC.setContextMap(event.getMdc()))
                 .filter(this.metadataFilter::filter)
                 .filter(event -> App.filterByFileType(this.filterHandler, event, this.mapperConfig))
@@ -191,8 +204,9 @@ public class App {
         }
     }
 
-    public static void main(String[] args) {
-        new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler()).start();
+    public static void main(String[] args) throws EnvironmentConfigException {
+        FilesProcessingConfig processingConfig = new FilesProcessingConfig(new EnvironmentReader());
+        new App(templates, schemas, HTTP_PORT, HTTPS_PORT, new ConfigHandler(), processingConfig).start();
     }
 
     public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
index ab549e4..c79f059 100644 (file)
@@ -33,8 +33,13 @@ public class FilesProcessingConfig {
 
     private static final String ENV_LIMIT_RATE = "PROCESSING_LIMIT_RATE";
     private static final int DEFAULT_LIMIT_RATE = 1;
-    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(FilesProcessingConfig.class));
-    private EnvironmentReader environmentReader;
+    private static final String ENV_THREADS_MULTIPLIER = "THREADS_MULTIPLIER";
+    private static final String ENV_PROCESSING_THREADS_COUNT = "PROCESSING_THREADS_COUNT";
+    private static final int DEFAULT_MULTIPLIER = 1;
+
+    private static final ONAPLogAdapter logger = new ONAPLogAdapter(
+        LoggerFactory.getLogger(FilesProcessingConfig.class));
+    private final EnvironmentReader environmentReader;
 
     /**
      * Creates a FilesProcessingConfig
@@ -61,6 +66,26 @@ public class FilesProcessingConfig {
         }
     }
 
+    /**
+     * Provides reactor parallel threads count from environment variable.
+     *
+     * @throws EnvironmentConfigException
+     * @returns value of threads count
+     */
+    public int getThreadsCount() throws EnvironmentConfigException {
+        logger.unwrap().info("Attempt to read threads configuration");
+        int processingThreadsCount = getProcessingThreadsCount();
+        int threadsMultiplier = getThreadsMultiplier();
+        int processingThreadsAmount = processingThreadsCount * threadsMultiplier;
+
+        logger.unwrap().info(
+            "Processing threads configuration: Processing threads count - {}, Processing threads multiplier - {} ",
+            processingThreadsCount, threadsMultiplier);
+        logger.unwrap().info("Amount of files processing threads: {} ", processingThreadsAmount);
+
+        return processingThreadsAmount;
+    }
+
     private Integer parseIntegerValue(String val) throws NumberFormatException {
         Integer value = Integer.valueOf(val);
         logger.unwrap().info(ENV_LIMIT_RATE + " value is: " + value);
@@ -71,4 +96,39 @@ public class FilesProcessingConfig {
         logger.unwrap().info(ENV_LIMIT_RATE + " env not present. Setting limit rate to default value: " + DEFAULT_LIMIT_RATE);
         return DEFAULT_LIMIT_RATE;
     }
+
+    private int getThreadsMultiplier() throws EnvironmentConfigException {
+        try {
+            return Optional.ofNullable(environmentReader.getVariable(ENV_THREADS_MULTIPLIER))
+                .map(Integer::valueOf)
+                .orElseGet(this::getDefaultMultiplier);
+        } catch (NumberFormatException exception) {
+            throw new EnvironmentConfigException(
+                ENV_THREADS_MULTIPLIER + " environment variable has incorrect value.\n", exception);
+        }
+    }
+
+    private int getDefaultMultiplier() {
+        logger.unwrap().info(ENV_THREADS_MULTIPLIER +
+            " env not present. Setting multiplier to default value: " + DEFAULT_MULTIPLIER);
+        return DEFAULT_MULTIPLIER;
+    }
+
+    private int getProcessingThreadsCount() throws EnvironmentConfigException {
+        try {
+            return Optional.ofNullable(environmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT))
+                .map(Integer::valueOf)
+                .orElseGet(this::getDefaultThreadsCount);
+        } catch (NumberFormatException exception) {
+            throw new EnvironmentConfigException(
+                ENV_PROCESSING_THREADS_COUNT + " environment variable has incorrect value.\n", exception);
+        }
+    }
+
+    private int getDefaultThreadsCount() {
+        int defaultThreadsCount = Runtime.getRuntime().availableProcessors();
+        logger.unwrap().info(ENV_PROCESSING_THREADS_COUNT +
+                " env not present. Setting threads count to available cores: " + defaultThreadsCount);
+        return defaultThreadsCount;
+    }
 }
index 0b8cdfc..db45029 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2019-2020 Nordix Foundation.
+ *  Copyright (C) 2021 Nokia.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -51,6 +52,7 @@ import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
 import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
 import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
 
+import org.onap.dcaegen2.services.pmmapper.config.FilesProcessingConfig;
 import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
 import reactor.core.publisher.Flux;
 
@@ -76,6 +78,7 @@ import utils.EventUtils;
 @ExtendWith(MockitoExtension.class)
 class AppTest {
 
+    public static final int WANTED_NUMBER_OF_INVOCATIONS_1 = 1;
     static ClientAndServer mockServer;
     static MockServerClient client;
 
@@ -91,6 +94,7 @@ class AppTest {
 
     private App objUnderTest;
 
+    private final FilesProcessingConfig processingConfig = mock(FilesProcessingConfig.class);
 
     @BeforeAll
     static void setup() {
@@ -105,8 +109,10 @@ class AppTest {
     }
 
     @BeforeEach
-    void beforeEach() {
+    void beforeEach() throws EnvironmentConfigException {
         configHandler = mock(ConfigHandler.class);
+        when(this.processingConfig.getLimitRate()).thenReturn(1);
+        when(this.processingConfig.getThreadsCount()).thenReturn(1);
     }
 
     @Test
@@ -115,7 +121,7 @@ class AppTest {
         MapperConfig mockConfig = Mockito.spy(mapperConfig);
         when(mockConfig.getEnableHttp()).thenReturn(false);
         when(configHandler.getMapperConfig()).thenReturn(mockConfig);
-        objUnderTest = new App(template, schema, 0, 0, configHandler);
+        objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig);
         objUnderTest.start();
         assertEquals(1, objUnderTest.getApplicationServer().getListenerInfo().size());
         assertEquals("https", objUnderTest.getApplicationServer().getListenerInfo().get(0).getProtcol());
@@ -127,7 +133,7 @@ class AppTest {
         MapperConfig mockConfig = Mockito.spy(mapperConfig);
         when(mockConfig.getEnableHttp()).thenReturn(true);
         when(configHandler.getMapperConfig()).thenReturn(mockConfig);
-        objUnderTest = new App(template, schema, 0, 0, configHandler);
+        objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig);
         objUnderTest.start();
         assertEquals(2, objUnderTest.getApplicationServer().getListenerInfo().size());
         assertEquals("http", objUnderTest.getApplicationServer().getListenerInfo().get(0).getProtcol());
@@ -137,7 +143,7 @@ class AppTest {
     @Test
     void testConfigFailure() throws EnvironmentConfigException, CBSServerError, MapperConfigException {
         when(configHandler.getMapperConfig()).thenThrow(MapperConfigException.class);
-        assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler));
+        assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler, processingConfig));
 
     }
 
@@ -146,7 +152,7 @@ class AppTest {
         MapperConfig mockConfig = Mockito.spy(mapperConfig);
         when(mockConfig.getKeyStorePath()).thenReturn("not_a_file");
         when(configHandler.getMapperConfig()).thenReturn(mockConfig);
-        assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler));
+        assertThrows(IllegalStateException.class, () -> new App(template, schema, 0, 0, configHandler, processingConfig));
 
     }
 
@@ -319,4 +325,19 @@ class AppTest {
         Flux<List<Event>> mappingResult = App.map(new Mapper(mappingTemplate,mockMeasConverter), mockEvents, mockConfig);
         assertEquals(mappingResult, Flux.<List<Event>>empty());
     }
+
+    @Test
+    void filesProcessingConfiguration_IsReadInMainApp() throws Exception {
+        MapperConfig mockConfig = Mockito.spy(mapperConfig);
+        when(mockConfig.getEnableHttp()).thenReturn(true);
+        when(configHandler.getMapperConfig()).thenReturn(mockConfig);
+        objUnderTest = new App(template, schema, 0, 0, configHandler, processingConfig);
+        objUnderTest.start();
+
+        verify(processingConfig, times(WANTED_NUMBER_OF_INVOCATIONS_1)).getLimitRate();
+        verify(processingConfig, times(WANTED_NUMBER_OF_INVOCATIONS_1)).getThreadsCount();
+
+        objUnderTest.stop();
+    }
+
 }
index fd21a39..b8d0a1f 100644 (file)
@@ -32,7 +32,15 @@ public class FilesProcessingConfigTest {
 
     private static final String ENV_LIMIT_RATE = "PROCESSING_LIMIT_RATE";
 
-    private EnvironmentReader mockEnvironmentReader = mock(EnvironmentReader.class);
+    private static final String ENV_THREADS_MULTIPLIER = "THREADS_MULTIPLIER";
+    private static final String ENV_PROCESSING_THREADS_COUNT = "PROCESSING_THREADS_COUNT";
+    private static final String THREADS_4 = "4";
+    private static final String MULTIPLIER_3 = "3";
+
+    private static final int EXPECTED_4 = 4;
+    private static final int EXPECTED_12 = 12;
+
+    private final EnvironmentReader mockEnvironmentReader = mock(EnvironmentReader.class);
     private FilesProcessingConfig filesProcessingConfig;
 
     @Test
@@ -63,4 +71,71 @@ public class FilesProcessingConfigTest {
 
         assertEquals(1, limitRate);
     }
+
+    @Test
+    public void shouldReturnCorrectThreadsCount_whenVariableIsSet() throws EnvironmentConfigException {
+        when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn(THREADS_4);
+        when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn(MULTIPLIER_3);
+
+        filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+        int threadsCount = filesProcessingConfig.getThreadsCount();
+
+        assertEquals(EXPECTED_12, threadsCount);
+    }
+
+    @Test
+    public void shouldReturnCorrectThreadsCount_whenVariableMultiplierIsNotSet() throws EnvironmentConfigException {
+
+        when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn(THREADS_4);
+
+        filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+        int threadsCount = filesProcessingConfig.getThreadsCount();
+
+        assertEquals(EXPECTED_4, threadsCount);
+    }
+
+    @Test
+    public void shouldReturnCorrectThreadsCount_whenVariableThreadsIsNotSet() throws EnvironmentConfigException {
+        when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn(MULTIPLIER_3);
+
+        filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+        int threadsCount = filesProcessingConfig.getThreadsCount();
+        int expected = Runtime.getRuntime().availableProcessors() * Integer.parseInt(MULTIPLIER_3);
+
+        assertEquals(expected, threadsCount);
+    }
+
+    @Test
+    public void shouldReturnCorrectThreadsCount_whenVariableThreadsAndMultiplierIsNotSet()
+        throws EnvironmentConfigException {
+        filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+        int threadsCount = filesProcessingConfig.getThreadsCount();
+        int expected = Runtime.getRuntime().availableProcessors();
+
+        assertEquals(expected, threadsCount);
+    }
+
+    @Test
+    public void shouldThrowEnvironmentConfigException_whenProcessingThreadsVariableHasWrongValue() {
+        when(mockEnvironmentReader.getVariable(ENV_PROCESSING_THREADS_COUNT)).thenReturn("not-an-int");
+        filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+        String expectedMessage = "PROCESSING_THREADS_COUNT environment variable has incorrect value.\n";
+        String causeMessage = "For input string: \"not-an-int\"";
+
+        Throwable exception = assertThrows(EnvironmentConfigException.class, () -> filesProcessingConfig.getThreadsCount());
+        assertEquals(expectedMessage, exception.getMessage());
+        assertEquals(causeMessage, exception.getCause().getMessage());
+    }
+
+    @Test
+    public void shouldThrowEnvironmentConfigException_whenThreadsMultiplierVariableHasWrongValue() {
+        when(mockEnvironmentReader.getVariable(ENV_THREADS_MULTIPLIER)).thenReturn("not-an-int");
+        filesProcessingConfig = new FilesProcessingConfig(mockEnvironmentReader);
+        String expectedMessage = "THREADS_MULTIPLIER environment variable has incorrect value.\n";
+        String causeMessage = "For input string: \"not-an-int\"";
+
+        Throwable exception = assertThrows(EnvironmentConfigException.class, () -> filesProcessingConfig.getThreadsCount());
+        assertEquals(expectedMessage, exception.getMessage());
+        assertEquals(causeMessage, exception.getCause().getMessage());
+    }
 }