From: FrancescoFioraEst Date: Tue, 14 Oct 2025 07:59:59 +0000 (+0100) Subject: Make the participant intermediary threading logic robust X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=1ba1d099a80a7d4211c4c35bf77903987c4a2851;p=policy%2Fclamp.git Make the participant intermediary threading logic robust Issue-ID: POLICY-5465 Change-Id: I2eb170c32bc830bc4e8f9cc7ada80fe8dbf39134 Signed-off-by: FrancescoFioraEst --- diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/AcmThreadFactory.java b/common/src/main/java/org/onap/policy/clamp/common/acm/utils/AcmThreadFactory.java similarity index 96% rename from runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/AcmThreadFactory.java rename to common/src/main/java/org/onap/policy/clamp/common/acm/utils/AcmThreadFactory.java index 12fcb7c64..28b5fd340 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/AcmThreadFactory.java +++ b/common/src/main/java/org/onap/policy/clamp/common/acm/utils/AcmThreadFactory.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.clamp.acm.runtime.supervision; +package org.onap.policy.clamp.common.acm.utils; import java.util.concurrent.ThreadFactory; import lombok.NonNull; diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/AcmThreadFactoryTest.java b/common/src/test/java/org/onap/policy/clamp/common/acm/utils/AcmThreadFactoryTest.java similarity index 96% rename from runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/AcmThreadFactoryTest.java rename to common/src/test/java/org/onap/policy/clamp/common/acm/utils/AcmThreadFactoryTest.java index 18356becf..01e33eea9 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/AcmThreadFactoryTest.java +++ b/common/src/test/java/org/onap/policy/clamp/common/acm/utils/AcmThreadFactoryTest.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.clamp.acm.runtime.supervision; +package org.onap.policy.clamp.common.acm.utils; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.spy; diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutor.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutor.java index 1ff4aabdc..2da33a9b4 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutor.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutor.java @@ -27,6 +27,7 @@ import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg; import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider; +import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory; import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,8 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class MsgExecutor { - private final ExecutorService executor = Context.taskWrapping(Executors.newSingleThreadExecutor()); + private final ExecutorService executor = + Context.taskWrapping(Executors.newSingleThreadExecutor(new AcmThreadFactory())); private static final Logger LOGGER = LoggerFactory.getLogger(MsgExecutor.class); private final CacheProvider cacheProvider; diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java index 29377a923..1c177089e 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java @@ -36,11 +36,11 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi; import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider; import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters; +import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.DeployState; import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; -import org.onap.policy.models.base.PfModelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -71,7 +71,7 @@ public class ThreadHandler implements Closeable { this.intermediaryApi = intermediaryApi; this.cacheProvider = cacheProvider; executor = Context.taskWrapping(Executors.newFixedThreadPool( - parameters.getIntermediaryParameters().getThreadPoolSize())); + parameters.getIntermediaryParameters().getThreadPoolSize(), new AcmThreadFactory())); LOGGER.info("ThreadHandler started with thread pool size {}", parameters.getIntermediaryParameters().getThreadPoolSize()); } @@ -92,7 +92,7 @@ public class ThreadHandler implements Closeable { private void deployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) { try { listener.deploy(compositionElement, instanceElement); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element deploy failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), @@ -118,7 +118,7 @@ public class ThreadHandler implements Closeable { private void undeployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) { try { listener.undeploy(compositionElement, instanceElement); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error( "Automation composition element undeploy failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), @@ -144,7 +144,7 @@ public class ThreadHandler implements Closeable { private void lockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) { try { listener.lock(compositionElement, instanceElement); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element lock failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), @@ -170,7 +170,7 @@ public class ThreadHandler implements Closeable { private void unlockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) { try { listener.unlock(compositionElement, instanceElement); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element unlock failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), @@ -196,7 +196,7 @@ public class ThreadHandler implements Closeable { private void deleteProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) { try { listener.delete(compositionElement, instanceElement); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element delete failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState( @@ -226,7 +226,7 @@ public class ThreadHandler implements Closeable { InstanceElementDto instanceElementUpdated) { try { listener.update(compositionElement, instanceElement, instanceElementUpdated); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element update failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(), @@ -271,7 +271,7 @@ public class ThreadHandler implements Closeable { try { listener.prime(composition); executionMap.remove(composition.compositionId()); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Composition Defintion prime failed {} {}", composition.compositionId(), e.getMessage()); intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.COMMISSIONED, StateChangeResult.FAILED, "Composition Defintion prime failed"); @@ -294,7 +294,7 @@ public class ThreadHandler implements Closeable { try { listener.deprime(composition); executionMap.remove(composition.compositionId()); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Composition Defintion deprime failed {} {}", composition.compositionId(), e.getMessage()); intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.PRIMED, StateChangeResult.FAILED, "Composition Defintion deprime failed"); @@ -339,7 +339,7 @@ public class ThreadHandler implements Closeable { try { listener.migrate(compositionElement, compositionElementTarget, instanceElement, instanceElementMigrate, stage); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element migrate failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState( @@ -374,7 +374,7 @@ public class ThreadHandler implements Closeable { try { listener.migratePrecheck(compositionElement, compositionElementTarget, instanceElement, instanceElementMigrate); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element migrate precheck failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState( @@ -401,7 +401,7 @@ public class ThreadHandler implements Closeable { private void reviewProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) { try { listener.review(compositionElement, instanceElement); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element Review failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState( @@ -430,7 +430,7 @@ public class ThreadHandler implements Closeable { int stage) { try { listener.prepare(compositionElement, instanceElement, stage); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element prepare Pre Deploy failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState( @@ -465,7 +465,7 @@ public class ThreadHandler implements Closeable { try { listener.rollbackMigration(compositionElement, compositionElementRollback, instanceElement, instanceElementRollback, stage); - } catch (PfModelException e) { + } catch (Exception e) { LOGGER.error("Automation composition element rollback failed {} {}", instanceElement.elementId(), e.getMessage()); intermediaryApi.updateAutomationCompositionElementState( diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java index b6382ac8d..5a0dacc54 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java @@ -30,8 +30,8 @@ import java.util.concurrent.Executors; import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup; -import org.onap.policy.clamp.acm.runtime.supervision.AcmThreadFactory; import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher; +import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; import org.onap.policy.clamp.models.acm.concepts.StateChangeResult; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java index 18e80bc11..a7defea00 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java @@ -36,6 +36,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcPreparePublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher; import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher; +import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory; import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck; import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java index 1dc5f12af..367e5f975 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import org.aspectj.lang.annotation.After; import org.aspectj.lang.annotation.Aspect; +import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled;