Acmr sync event improvements 54/142454/4 master
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>
Wed, 12 Nov 2025 17:12:25 +0000 (17:12 +0000)
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>
Tue, 18 Nov 2025 14:25:07 +0000 (14:25 +0000)
Add a delete sync event for participant when no elements exist after migration

Add participantIdList in the sync events to avoid all participants
processing all the sync events.

Issue-ID: POLICY-5488
Signed-off-by: rameshiyer27 <ramesh.murugan.iyer@est.tech>
Change-Id: I7827268cb4c0c6d11a76eead555b1782ac74acf3

runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/scanner/AbstractScanner.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/scanner/StageScanner.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/scanner/StageScannerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java

index 96abac4..1f0fa80 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021,2022,2024 Nordix Foundation.
+ * Copyright (C) 2021-2022,2024-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -41,7 +41,11 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher<
     @Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published")
     public void send(UUID participantId) {
         var message = new ParticipantStatusReq();
-        message.setParticipantId(participantId);
+        if (participantId != null) {
+            message.setParticipantId(participantId);
+            message.getParticipantIdList().add(participantId);
+        }
+
         message.setTimestamp(Instant.now());
 
         LOGGER.debug("Participant StatusReq sent {}", message.getMessageId());
index ddbc06e..ab93c26 100644 (file)
@@ -58,6 +58,7 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti
 
         var message = new ParticipantSync();
         message.setParticipantId(participantId);
+        message.getParticipantIdList().add(participantId);
         message.setReplicaId(replicaId);
         message.setRestarting(true);
         message.setCompositionId(acmDefinition.getCompositionId());
@@ -109,6 +110,8 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti
         } else {
             message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition,
                     acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
+            message.getParticipantDefinitionUpdates().forEach(participantDefinition
+                    -> message.getParticipantIdList().add(participantDefinition.getParticipantId()));
         }
         LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message);
         super.send(message);
@@ -121,12 +124,6 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti
      */
     @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
     public void sendSync(AutomationComposition automationComposition) {
-        var message = new ParticipantSync();
-        message.setCompositionId(automationComposition.getCompositionId());
-        message.setAutomationCompositionId(automationComposition.getInstanceId());
-        message.setState(AcTypeState.PRIMED);
-        message.setMessageId(UUID.randomUUID());
-        message.setTimestamp(Instant.now());
         var syncAc = new ParticipantRestartAc();
         syncAc.setCompositionTargetId(automationComposition.getCompositionTargetId());
         syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
@@ -134,12 +131,14 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti
         syncAc.setDeployState(automationComposition.getDeployState());
         syncAc.setLockState(automationComposition.getLockState());
         syncAc.setStateChangeResult(automationComposition.getStateChangeResult());
+        var message = createSyncMsg(automationComposition);
         if (DeployState.DELETED.equals(automationComposition.getDeployState())) {
             message.setDelete(true);
         } else {
             for (var element : automationComposition.getElements().values()) {
                 var acElementSync = AcmUtils.createAcElementRestart(element);
                 syncAc.getAcElementList().add(acElementSync);
+                message.getParticipantIdList().add(acElementSync.getParticipantId());
 
             }
         }
@@ -148,4 +147,38 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher<Parti
         LOGGER.debug("Participant AutomationComposition Sync sent {}", message.getMessageId());
         super.send(message);
     }
+
+    /**
+     * Sync event for a participant with all elements deleted in Migration.
+     * @param automationComposition automationComposition
+     * @param participantId participantId
+     */
+    @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+    public void sendDeleteSync(AutomationComposition automationComposition, UUID participantId) {
+        var syncAc = new ParticipantRestartAc();
+        syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+        syncAc.setRevisionId(automationComposition.getRevisionId());
+        syncAc.setDeployState(DeployState.DELETED);
+        syncAc.setLockState(automationComposition.getLockState());
+        syncAc.setStateChangeResult(automationComposition.getStateChangeResult());
+        var message = createSyncMsg(automationComposition);
+        message.setDelete(true);
+        message.setParticipantId(participantId);
+        message.getParticipantIdList().add(participantId);
+        message.getAutomationcompositionList().add(syncAc);
+
+        LOGGER.debug("Participant AutomationComposition Sync sent {}", message.getMessageId());
+        super.send(message);
+    }
+
+    private ParticipantSync createSyncMsg(AutomationComposition automationComposition) {
+        var message = new ParticipantSync();
+        message.setCompositionId(automationComposition.getCompositionId());
+        message.setAutomationCompositionId(automationComposition.getInstanceId());
+        message.setState(AcTypeState.PRIMED);
+        message.setMessageId(UUID.randomUUID());
+        message.setTimestamp(Instant.now());
+
+        return message;
+    }
 }
index 85f12b7..42df834 100644 (file)
@@ -43,7 +43,7 @@ public abstract class AbstractScanner {
     protected final long maxOperationWaitMs;
 
     protected final AutomationCompositionProvider acProvider;
-    private final ParticipantSyncPublisher participantSyncPublisher;
+    protected final ParticipantSyncPublisher participantSyncPublisher;
     private final EncryptionUtils encryptionUtils;
 
     protected AbstractScanner(final AutomationCompositionProvider acProvider,
index ae0250a..45672ae 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.policy.clamp.acm.runtime.supervision.scanner;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.UUID;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
@@ -134,12 +135,22 @@ public class StageScanner extends AbstractScanner {
 
     private void removeDeletedElements(AutomationComposition automationComposition, List<UUID> elementsDeleted,
                                        UpdateSync updateSync) {
+        var participantIds = new HashSet<UUID>();
         for (var elementId : elementsDeleted) {
             LOGGER.info("Deleting element {} in Migration ", elementId);
+            participantIds.add(automationComposition.getElements().get(elementId).getParticipantId());
             automationComposition.getElements().remove(elementId);
             acProvider.deleteAutomationCompositionElement(elementId);
             updateSync.setUpdated(true);
         }
+        for (var participantId : participantIds) {
+            var hasElements = automationComposition.getElements().values().stream().anyMatch(element
+                    -> element.getParticipantId().equals(participantId));
+            if (! hasElements) {
+                participantSyncPublisher.sendDeleteSync(automationComposition, participantId);
+            }
+
+        }
     }
 
     private void sendNextStage(final AutomationComposition automationComposition, int minStageNotCompleted,
index 0a9ae36..0051238 100644 (file)
@@ -190,6 +190,7 @@ class SupervisionScannerTest {
         var monitoringScanner = new MonitoringScanner(automationCompositionProvider, acDefinitionProvider,
                 mock(AcDefinitionScanner.class), stageScanner, mock(SimpleScanner.class), mock(PhaseScanner.class),
                 messageProvider);
+        when(automationCompositionProvider.getAutomationComposition(any())).thenReturn(automationComposition);
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, acDefinitionProvider,
                 messageProvider, monitoringScanner);
 
index e5f01c8..776499a 100644 (file)
@@ -276,6 +276,11 @@ class SupervisionMessagesTest {
         automationComposition.setDeployState(DeployState.DELETED);
         publisher.sendSync(automationComposition);
         verify(topicSink).send(anyString());
+
+        clearInvocations(topicSink);
+        automationComposition.getElements().values().iterator().next().setDeployState(DeployState.DELETED);
+        publisher.sendDeleteSync(automationComposition, UUID.randomUUID());
+        verify(topicSink).send(anyString());
     }
 
     @Test
index 16ee7ac..42951b1 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.acm.runtime.supervision.scanner;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
@@ -54,18 +55,11 @@ class StageScannerTest {
     @Test
     void testSendAutomationCompositionMigrate() {
         var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud");
-        automationComposition.setInstanceId(UUID.randomUUID());
-        automationComposition.setDeployState(DeployState.MIGRATING);
+        assert automationComposition != null;
         automationComposition.setCompositionId(COMPOSITION_ID);
         var compositionTargetId = UUID.randomUUID();
         automationComposition.setCompositionTargetId(compositionTargetId);
-        automationComposition.setLockState(LockState.LOCKED);
-        automationComposition.setLastMsg(TimestampHelper.now());
-        automationComposition.setPhase(0);
-        for (var element : automationComposition.getElements().values()) {
-            element.setDeployState(DeployState.DEPLOYED);
-            element.setLockState(LockState.LOCKED);
-        }
+        CommonTestData.modifyAcState(automationComposition, DeployState.MIGRATING);
         // first element is not migrated yet
         var element = automationComposition.getElements().entrySet().iterator().next().getValue();
         element.setDeployState(DeployState.MIGRATING);
@@ -74,7 +68,8 @@ class StageScannerTest {
         when(acProvider.updateAutomationComposition(any())).thenReturn(automationComposition);
         var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
         var encryptionUtils = new EncryptionUtils(acRuntimeParameterGroup);
-        var supervisionScanner = new StageScanner(acProvider, mock(ParticipantSyncPublisher.class),
+        var participantSyncPublisher = mock(ParticipantSyncPublisher.class);
+        var supervisionScanner = new StageScanner(acProvider, participantSyncPublisher,
                 mock(AutomationCompositionMigrationPublisher.class), mock(AcPreparePublisher.class),
                 acRuntimeParameterGroup, encryptionUtils);
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
@@ -102,23 +97,38 @@ class StageScannerTest {
 
         assertEquals(DeployState.DEPLOYED, automationComposition.getDeployState());
         assertEquals(compositionTargetId, automationComposition.getCompositionId());
+
+        // remove all element for a participant
+        clearInvocations(acProvider);
+        clearInvocations(participantSyncPublisher);
+        element.setDeployState(DeployState.DELETED);
+        supervisionScanner.scanStage(automationComposition, acDefinition, new UpdateSync(), UUID.randomUUID());
+        verify(acProvider).updateAutomationComposition(any(AutomationComposition.class));
+        assertThat(automationComposition.getElements()).doesNotContainKey(element.getId()); //element deleted
+        verify(participantSyncPublisher, times(1)).sendDeleteSync(automationComposition, element.getParticipantId());
+
+        // remove one element; participant retains other elements
+        clearInvocations(acProvider);
+        clearInvocations(participantSyncPublisher);
+        for (var e : automationComposition.getElements().values()) {
+            e.setParticipantId(element.getParticipantId());
+        }
+        automationComposition.getElements().put(element.getId(), element);
+        supervisionScanner.scanStage(automationComposition, acDefinition, new UpdateSync(), UUID.randomUUID());
+        verify(acProvider).updateAutomationComposition(any(AutomationComposition.class));
+        assertThat(automationComposition.getElements()).doesNotContainKey(element.getId()); //element deleted
+        verify(participantSyncPublisher, times(0)).sendDeleteSync(automationComposition,
+                element.getParticipantId());
     }
 
     @Test
     void testSendAutomationCompositionMigrationReverting() {
         var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud");
-        automationComposition.setInstanceId(UUID.randomUUID());
-        automationComposition.setDeployState(DeployState.MIGRATION_REVERTING);
+        assert automationComposition != null;
         automationComposition.setCompositionId(COMPOSITION_ID);
-        var compositionTargetId = UUID.randomUUID();
-        automationComposition.setCompositionTargetId(compositionTargetId);
-        automationComposition.setLockState(LockState.LOCKED);
-        automationComposition.setLastMsg(TimestampHelper.now());
-        automationComposition.setPhase(0);
-        for (var element : automationComposition.getElements().values()) {
-            element.setDeployState(DeployState.DEPLOYED);
-            element.setLockState(LockState.LOCKED);
-        }
+        automationComposition.setCompositionTargetId(UUID.randomUUID());
+        CommonTestData.modifyAcState(automationComposition, DeployState.MIGRATION_REVERTING);
+
         // first element is not migrated yet
         var element = automationComposition.getElements().entrySet().iterator().next().getValue();
         element.setDeployState(DeployState.MIGRATION_REVERTING);
index 4762681..566e234 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2025 Nordix Foundation.
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,7 +26,10 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcmParameters;
 import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
@@ -176,4 +179,22 @@ public class CommonTestData {
         return acRuntimeParameterGroup;
     }
 
+    /**
+     * Modify the state of the AutomationComposition.
+     * @param automationComposition automationComposition
+     * @param deployState deployState
+     */
+    public static void modifyAcState(AutomationComposition automationComposition,
+                                                      DeployState deployState) {
+        automationComposition.setInstanceId(UUID.randomUUID());
+        automationComposition.setDeployState(deployState);
+        automationComposition.setLockState(LockState.LOCKED);
+        automationComposition.setLastMsg(TimestampHelper.now());
+        automationComposition.setPhase(0);
+        for (var element : automationComposition.getElements().values()) {
+            element.setDeployState(DeployState.DEPLOYED);
+            element.setLockState(LockState.LOCKED);
+        }
+    }
+
 }