Adding configurable parameter for intermediary thread pool 83/140783/2
authorFrancescoFioraEst <francesco.fiora@est.tech>
Thu, 17 Apr 2025 08:23:34 +0000 (09:23 +0100)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Thu, 24 Apr 2025 09:17:25 +0000 (10:17 +0100)
Issue-ID: POLICY-5345
Change-Id: I99ea5ef1484fe9aa36fa63f38380bb26b4b65c44
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml
participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java

index 962dcf6..fd8efae 100644 (file)
@@ -20,6 +20,7 @@ participant:
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: ${participantId:101c62b3-8918-41b9-a747-d21eb79c6c90}
+    threadPoolSize: 10
     clampAutomationCompositionTopics:
       topicSources:
         - topic: ${participant.intermediaryParameters.topics.operationTopic}
index d87219b..63a46ef 100644 (file)
@@ -20,6 +20,7 @@ participant:
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90
+    threadPoolSize: 10
     topicValidation: true
     clampAdminTopics:
       servers:
index 027127a..590d45f 100644 (file)
@@ -29,12 +29,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 @Component
-@RequiredArgsConstructor
 public class ThreadHandler implements Closeable {
     private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
 
@@ -55,8 +54,24 @@ public class ThreadHandler implements Closeable {
 
     private final Map<UUID, Future<?>> executionMap = new ConcurrentHashMap<>();
 
-    private final ExecutorService executor =
-            Context.taskWrapping(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
+    private final ExecutorService executor;
+
+    /**
+     * Constructor.
+     *
+     * @param listener the AutomationComposition ElementListener
+     * @param intermediaryApi the intermediaryApi
+     * @param cacheProvider the CacheProvider
+     * @param parameters the parameters
+     */
+    public ThreadHandler(AutomationCompositionElementListener listener, ParticipantIntermediaryApi intermediaryApi,
+            CacheProvider cacheProvider, ParticipantParameters parameters) {
+        this.listener = listener;
+        this.intermediaryApi = intermediaryApi;
+        this.cacheProvider = cacheProvider;
+        executor = Context.taskWrapping(Executors.newFixedThreadPool(
+                parameters.getIntermediaryParameters().getThreadPoolSize()));
+    }
 
     /**
      * Handle a deploy on a automation composition element.
index d94dc5d..aea2d7d 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024-2025 Nordix Foundation.
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -53,6 +53,10 @@ public class ParticipantIntermediaryParameters {
     @Positive
     private long reportingTimeIntervalMs;
 
+    @Valid
+    @Positive
+    private int threadPoolSize = 10;
+
     @NotNull
     @ParameterGroupConstraint
     private TopicParameterGroup clampAutomationCompositionTopics;
index 7e5ca57..957f6e7 100644 (file)
@@ -36,6 +36,7 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
@@ -48,11 +49,17 @@ class ThreadHandlerTest {
 
     private static final int TIMEOUT = 400;
 
+    private ThreadHandler createThreadHandler(AutomationCompositionElementListener listener,
+            ParticipantIntermediaryApi intermediaryApi) {
+        return new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class),
+                CommonTestData.getParticipantParameters());
+    }
+
     @Test
     void testPrime() throws PfModelException, IOException {
         var listener = mock(AutomationCompositionElementListener.class);
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+        try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
 
             var compositionId = UUID.randomUUID();
             var messageId = UUID.randomUUID();
@@ -70,7 +77,7 @@ class ThreadHandlerTest {
     void testPrimeException() throws PfModelException, IOException {
         var listener = mock(AutomationCompositionElementListener.class);
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+        try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
 
             var compositionId = UUID.randomUUID();
             var composition = new CompositionDto(compositionId, Map.of(), Map.of());
@@ -92,7 +99,7 @@ class ThreadHandlerTest {
     void testDeploy() throws PfModelException, IOException {
         var listener = mock(AutomationCompositionElementListener.class);
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+        try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
 
             Map<String, Object> properties = Map.of("key", "value");
             var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
@@ -135,7 +142,7 @@ class ThreadHandlerTest {
     void testDeployException() throws PfModelException, IOException {
         var listener = mock(AutomationCompositionElementListener.class);
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+        try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
 
             Map<String, Object> properties = Map.of("key", "value");
             var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
@@ -197,7 +204,7 @@ class ThreadHandlerTest {
     void testLock() throws PfModelException, IOException {
         var listener = mock(AutomationCompositionElementListener.class);
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+        try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
 
             Map<String, Object> properties = Map.of("key", "value");
             var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
@@ -222,7 +229,7 @@ class ThreadHandlerTest {
     void testLockException() throws PfModelException, IOException {
         var listener = mock(AutomationCompositionElementListener.class);
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+        try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
 
             Map<String, Object> properties = Map.of("key", "value");
             var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
@@ -252,7 +259,7 @@ class ThreadHandlerTest {
     void testSubState() throws PfModelException, IOException {
         var listener = mock(AutomationCompositionElementListener.class);
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+        try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
 
             Map<String, Object> properties = Map.of("key", "value");
             var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
@@ -284,7 +291,7 @@ class ThreadHandlerTest {
     void testSubStateException() throws PfModelException, IOException {
         var listener = mock(AutomationCompositionElementListener.class);
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+        try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
 
             Map<String, Object> properties = Map.of("key", "value");
             var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),