From afd712ff28b45567495ad90b83fe8b1d2cc3a423 Mon Sep 17 00:00:00 2001 From: FrancescoFioraEst Date: Thu, 17 Apr 2025 09:23:34 +0100 Subject: [PATCH] Adding configurable parameter for intermediary thread pool Issue-ID: POLICY-5345 Change-Id: I99ea5ef1484fe9aa36fa63f38380bb26b4b65c44 Signed-off-by: FrancescoFioraEst --- .../etc/SimulatorParticipantParameters.yaml | 1 + .../src/main/resources/config/application.yaml | 1 + .../intermediary/handler/ThreadHandler.java | 23 ++++++++++++++++++---- .../ParticipantIntermediaryParameters.java | 6 +++++- .../intermediary/handler/ThreadHandlerTest.java | 23 ++++++++++++++-------- 5 files changed, 41 insertions(+), 13 deletions(-) diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml index 962dcf6fa..fd8efaece 100644 --- a/packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml +++ b/packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml @@ -20,6 +20,7 @@ participant: reportingTimeIntervalMs: 120000 description: Participant Description participantId: ${participantId:101c62b3-8918-41b9-a747-d21eb79c6c90} + threadPoolSize: 10 clampAutomationCompositionTopics: topicSources: - topic: ${participant.intermediaryParameters.topics.operationTopic} diff --git a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml index d87219bd6..63a46ef76 100644 --- a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml +++ b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml @@ -20,6 +20,7 @@ participant: reportingTimeIntervalMs: 120000 description: Participant Description participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90 + threadPoolSize: 10 topicValidation: true clampAdminTopics: servers: diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java index 027127ac8..590d45f2a 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java @@ -29,12 +29,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener; import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto; import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto; import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto; import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi; +import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.LockState; @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component -@RequiredArgsConstructor public class ThreadHandler implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class); @@ -55,8 +54,24 @@ public class ThreadHandler implements Closeable { private final Map> executionMap = new ConcurrentHashMap<>(); - private final ExecutorService executor = - Context.taskWrapping(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); + private final ExecutorService executor; + + /** + * Constructor. + * + * @param listener the AutomationComposition ElementListener + * @param intermediaryApi the intermediaryApi + * @param cacheProvider the CacheProvider + * @param parameters the parameters + */ + public ThreadHandler(AutomationCompositionElementListener listener, ParticipantIntermediaryApi intermediaryApi, + CacheProvider cacheProvider, ParticipantParameters parameters) { + this.listener = listener; + this.intermediaryApi = intermediaryApi; + this.cacheProvider = cacheProvider; + executor = Context.taskWrapping(Executors.newFixedThreadPool( + parameters.getIntermediaryParameters().getThreadPoolSize())); + } /** * Handle a deploy on a automation composition element. diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java index d94dc5d39..aea2d7da3 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021-2024-2025 Nordix Foundation. + * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,10 @@ public class ParticipantIntermediaryParameters { @Positive private long reportingTimeIntervalMs; + @Valid + @Positive + private int threadPoolSize = 10; + @NotNull @ParameterGroupConstraint private TopicParameterGroup clampAutomationCompositionTopics; diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java index 7e5ca57c8..957f6e76e 100644 --- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java +++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java @@ -36,6 +36,7 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto; import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto; import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto; import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi; +import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData; import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.DeployState; @@ -48,11 +49,17 @@ class ThreadHandlerTest { private static final int TIMEOUT = 400; + private ThreadHandler createThreadHandler(AutomationCompositionElementListener listener, + ParticipantIntermediaryApi intermediaryApi) { + return new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class), + CommonTestData.getParticipantParameters()); + } + @Test void testPrime() throws PfModelException, IOException { var listener = mock(AutomationCompositionElementListener.class); var intermediaryApi = mock(ParticipantIntermediaryApi.class); - try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) { + try (var threadHandler = createThreadHandler(listener, intermediaryApi)) { var compositionId = UUID.randomUUID(); var messageId = UUID.randomUUID(); @@ -70,7 +77,7 @@ class ThreadHandlerTest { void testPrimeException() throws PfModelException, IOException { var listener = mock(AutomationCompositionElementListener.class); var intermediaryApi = mock(ParticipantIntermediaryApi.class); - try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) { + try (var threadHandler = createThreadHandler(listener, intermediaryApi)) { var compositionId = UUID.randomUUID(); var composition = new CompositionDto(compositionId, Map.of(), Map.of()); @@ -92,7 +99,7 @@ class ThreadHandlerTest { void testDeploy() throws PfModelException, IOException { var listener = mock(AutomationCompositionElementListener.class); var intermediaryApi = mock(ParticipantIntermediaryApi.class); - try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) { + try (var threadHandler = createThreadHandler(listener, intermediaryApi)) { Map properties = Map.of("key", "value"); var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(), @@ -135,7 +142,7 @@ class ThreadHandlerTest { void testDeployException() throws PfModelException, IOException { var listener = mock(AutomationCompositionElementListener.class); var intermediaryApi = mock(ParticipantIntermediaryApi.class); - try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) { + try (var threadHandler = createThreadHandler(listener, intermediaryApi)) { Map properties = Map.of("key", "value"); var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(), @@ -197,7 +204,7 @@ class ThreadHandlerTest { void testLock() throws PfModelException, IOException { var listener = mock(AutomationCompositionElementListener.class); var intermediaryApi = mock(ParticipantIntermediaryApi.class); - try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) { + try (var threadHandler = createThreadHandler(listener, intermediaryApi)) { Map properties = Map.of("key", "value"); var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(), @@ -222,7 +229,7 @@ class ThreadHandlerTest { void testLockException() throws PfModelException, IOException { var listener = mock(AutomationCompositionElementListener.class); var intermediaryApi = mock(ParticipantIntermediaryApi.class); - try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) { + try (var threadHandler = createThreadHandler(listener, intermediaryApi)) { Map properties = Map.of("key", "value"); var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(), @@ -252,7 +259,7 @@ class ThreadHandlerTest { void testSubState() throws PfModelException, IOException { var listener = mock(AutomationCompositionElementListener.class); var intermediaryApi = mock(ParticipantIntermediaryApi.class); - try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) { + try (var threadHandler = createThreadHandler(listener, intermediaryApi)) { Map properties = Map.of("key", "value"); var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(), @@ -284,7 +291,7 @@ class ThreadHandlerTest { void testSubStateException() throws PfModelException, IOException { var listener = mock(AutomationCompositionElementListener.class); var intermediaryApi = mock(ParticipantIntermediaryApi.class); - try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) { + try (var threadHandler = createThreadHandler(listener, intermediaryApi)) { Map properties = Map.of("key", "value"); var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(), -- 2.16.6