import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.DeployState;
import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.springframework.stereotype.Component;
@Component
-@RequiredArgsConstructor
public class ThreadHandler implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
private final Map<UUID, Future<?>> executionMap = new ConcurrentHashMap<>();
- private final ExecutorService executor =
- Context.taskWrapping(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
+ private final ExecutorService executor;
+
+ /**
+ * Constructor.
+ *
+ * @param listener the AutomationComposition ElementListener
+ * @param intermediaryApi the intermediaryApi
+ * @param cacheProvider the CacheProvider
+ * @param parameters the parameters
+ */
+ public ThreadHandler(AutomationCompositionElementListener listener, ParticipantIntermediaryApi intermediaryApi,
+ CacheProvider cacheProvider, ParticipantParameters parameters) {
+ this.listener = listener;
+ this.intermediaryApi = intermediaryApi;
+ this.cacheProvider = cacheProvider;
+ executor = Context.taskWrapping(Executors.newFixedThreadPool(
+ parameters.getIntermediaryParameters().getThreadPoolSize()));
+ }
/**
* Handle a deploy on a automation composition element.
import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
import org.onap.policy.clamp.models.acm.concepts.DeployState;
private static final int TIMEOUT = 400;
+ private ThreadHandler createThreadHandler(AutomationCompositionElementListener listener,
+ ParticipantIntermediaryApi intermediaryApi) {
+ return new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class),
+ CommonTestData.getParticipantParameters());
+ }
+
@Test
void testPrime() throws PfModelException, IOException {
var listener = mock(AutomationCompositionElementListener.class);
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+ try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
var compositionId = UUID.randomUUID();
var messageId = UUID.randomUUID();
void testPrimeException() throws PfModelException, IOException {
var listener = mock(AutomationCompositionElementListener.class);
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+ try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
var compositionId = UUID.randomUUID();
var composition = new CompositionDto(compositionId, Map.of(), Map.of());
void testDeploy() throws PfModelException, IOException {
var listener = mock(AutomationCompositionElementListener.class);
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+ try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
Map<String, Object> properties = Map.of("key", "value");
var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
void testDeployException() throws PfModelException, IOException {
var listener = mock(AutomationCompositionElementListener.class);
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+ try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
Map<String, Object> properties = Map.of("key", "value");
var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
void testLock() throws PfModelException, IOException {
var listener = mock(AutomationCompositionElementListener.class);
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+ try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
Map<String, Object> properties = Map.of("key", "value");
var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
void testLockException() throws PfModelException, IOException {
var listener = mock(AutomationCompositionElementListener.class);
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+ try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
Map<String, Object> properties = Map.of("key", "value");
var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
void testSubState() throws PfModelException, IOException {
var listener = mock(AutomationCompositionElementListener.class);
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+ try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
Map<String, Object> properties = Map.of("key", "value");
var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
void testSubStateException() throws PfModelException, IOException {
var listener = mock(AutomationCompositionElementListener.class);
var intermediaryApi = mock(ParticipantIntermediaryApi.class);
- try (var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class))) {
+ try (var threadHandler = createThreadHandler(listener, intermediaryApi)) {
Map<String, Object> properties = Map.of("key", "value");
var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),