Make the participant intermediary threading logic robust 51/142251/2 master
authorFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 14 Oct 2025 07:59:59 +0000 (08:59 +0100)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 14 Oct 2025 14:17:56 +0000 (15:17 +0100)
Issue-ID: POLICY-5465
Change-Id: I2eb170c32bc830bc4e8f9cc7ada80fe8dbf39134
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
common/src/main/java/org/onap/policy/clamp/common/acm/utils/AcmThreadFactory.java [moved from runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/AcmThreadFactory.java with 96% similarity]
common/src/test/java/org/onap/policy/clamp/common/acm/utils/AcmThreadFactoryTest.java [moved from runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/AcmThreadFactoryTest.java with 96% similarity]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutor.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java

@@ -18,7 +18,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.policy.clamp.acm.runtime.supervision;
+package org.onap.policy.clamp.common.acm.utils;
 
 import java.util.concurrent.ThreadFactory;
 import lombok.NonNull;
@@ -18,7 +18,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.policy.clamp.acm.runtime.supervision;
+package org.onap.policy.clamp.common.acm.utils;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.spy;
index 1ff4aab..2da33a9 100644 (file)
@@ -27,6 +27,7 @@ import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg;
 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
+import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +37,8 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class MsgExecutor {
 
-    private final ExecutorService executor = Context.taskWrapping(Executors.newSingleThreadExecutor());
+    private final ExecutorService executor =
+            Context.taskWrapping(Executors.newSingleThreadExecutor(new AcmThreadFactory()));
     private static final Logger LOGGER = LoggerFactory.getLogger(MsgExecutor.class);
 
     private final CacheProvider cacheProvider;
index 29377a9..1c17708 100644 (file)
@@ -36,11 +36,11 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
 import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
-import org.onap.policy.models.base.PfModelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -71,7 +71,7 @@ public class ThreadHandler implements Closeable {
         this.intermediaryApi = intermediaryApi;
         this.cacheProvider = cacheProvider;
         executor = Context.taskWrapping(Executors.newFixedThreadPool(
-                parameters.getIntermediaryParameters().getThreadPoolSize()));
+                parameters.getIntermediaryParameters().getThreadPoolSize(), new AcmThreadFactory()));
         LOGGER.info("ThreadHandler started with thread pool size {}",
                 parameters.getIntermediaryParameters().getThreadPoolSize());
     }
@@ -92,7 +92,7 @@ public class ThreadHandler implements Closeable {
     private void deployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
         try {
             listener.deploy(compositionElement, instanceElement);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element deploy failed {} {}", instanceElement.elementId(),
                 e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
@@ -118,7 +118,7 @@ public class ThreadHandler implements Closeable {
     private void undeployProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
         try {
             listener.undeploy(compositionElement, instanceElement);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error(
                 "Automation composition element undeploy failed {} {}", instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
@@ -144,7 +144,7 @@ public class ThreadHandler implements Closeable {
     private void lockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
         try {
             listener.lock(compositionElement, instanceElement);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element lock failed {} {}",
                 instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
@@ -170,7 +170,7 @@ public class ThreadHandler implements Closeable {
     private void unlockProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
         try {
             listener.unlock(compositionElement, instanceElement);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element unlock failed {} {}",
                 instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
@@ -196,7 +196,7 @@ public class ThreadHandler implements Closeable {
     private void deleteProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
         try {
             listener.delete(compositionElement, instanceElement);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element delete failed {} {}",
                 instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(
@@ -226,7 +226,7 @@ public class ThreadHandler implements Closeable {
                                InstanceElementDto instanceElementUpdated) {
         try {
             listener.update(compositionElement, instanceElement, instanceElementUpdated);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element update failed {} {}",
                 instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
@@ -271,7 +271,7 @@ public class ThreadHandler implements Closeable {
         try {
             listener.prime(composition);
             executionMap.remove(composition.compositionId());
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Composition Defintion prime failed {} {}", composition.compositionId(), e.getMessage());
             intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.COMMISSIONED,
                 StateChangeResult.FAILED, "Composition Defintion prime failed");
@@ -294,7 +294,7 @@ public class ThreadHandler implements Closeable {
         try {
             listener.deprime(composition);
             executionMap.remove(composition.compositionId());
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Composition Defintion deprime failed {} {}", composition.compositionId(), e.getMessage());
             intermediaryApi.updateCompositionState(composition.compositionId(), AcTypeState.PRIMED,
                 StateChangeResult.FAILED, "Composition Defintion deprime failed");
@@ -339,7 +339,7 @@ public class ThreadHandler implements Closeable {
         try {
             listener.migrate(compositionElement, compositionElementTarget,
                 instanceElement, instanceElementMigrate, stage);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element migrate failed {} {}",
                 instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(
@@ -374,7 +374,7 @@ public class ThreadHandler implements Closeable {
         try {
             listener.migratePrecheck(compositionElement, compositionElementTarget, instanceElement,
                 instanceElementMigrate);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element migrate precheck failed {} {}",
                 instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(
@@ -401,7 +401,7 @@ public class ThreadHandler implements Closeable {
     private void reviewProcess(CompositionElementDto compositionElement, InstanceElementDto instanceElement) {
         try {
             listener.review(compositionElement, instanceElement);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element Review failed {} {}",
                 instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(
@@ -430,7 +430,7 @@ public class ThreadHandler implements Closeable {
         int stage) {
         try {
             listener.prepare(compositionElement, instanceElement, stage);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element prepare Pre Deploy failed {} {}",
                 instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(
@@ -465,7 +465,7 @@ public class ThreadHandler implements Closeable {
         try {
             listener.rollbackMigration(compositionElement, compositionElementRollback, instanceElement,
                     instanceElementRollback, stage);
-        } catch (PfModelException e) {
+        } catch (Exception e) {
             LOGGER.error("Automation composition element rollback failed {} {}",
                     instanceElement.elementId(), e.getMessage());
             intermediaryApi.updateAutomationCompositionElementState(
index b6382ac..5a0dacc 100644 (file)
@@ -30,8 +30,8 @@ import java.util.concurrent.Executors;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.supervision.AcmThreadFactory;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher;
+import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
index 18e80bc..a7defea 100644 (file)
@@ -36,6 +36,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcPreparePublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
index 1dc5f12..367e5f9 100644 (file)
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.RequiredArgsConstructor;
 import org.aspectj.lang.annotation.After;
 import org.aspectj.lang.annotation.Aspect;
+import org.onap.policy.clamp.common.acm.utils.AcmThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.annotation.Scheduled;