From: rameshiyer27 Date: Wed, 12 Nov 2025 17:12:25 +0000 (+0000) Subject: Acmr sync event improvements X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=ac3470b06e996d43b49b63aa197d93202028a112;p=policy%2Fclamp.git Acmr sync event improvements Add a delete sync event for participant when no elements exist after migration Add participantIdList in the sync events to avoid all participants processing all the sync events. Issue-ID: POLICY-5488 Signed-off-by: rameshiyer27 Change-Id: I7827268cb4c0c6d11a76eead555b1782ac74acf3 --- diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java index 96abac494..1f0fa805f 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021,2022,2024 Nordix Foundation. + * Copyright (C) 2021-2022,2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,11 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher< @Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published") public void send(UUID participantId) { var message = new ParticipantStatusReq(); - message.setParticipantId(participantId); + if (participantId != null) { + message.setParticipantId(participantId); + message.getParticipantIdList().add(participantId); + } + message.setTimestamp(Instant.now()); LOGGER.debug("Participant StatusReq sent {}", message.getMessageId()); diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java index ddbc06e9d..ab93c26f0 100644 --- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java +++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java @@ -58,6 +58,7 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher message.getParticipantIdList().add(participantDefinition.getParticipantId())); } LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message); super.send(message); @@ -121,12 +124,6 @@ public class ParticipantSyncPublisher extends AbstractParticipantPublisher elementsDeleted, UpdateSync updateSync) { + var participantIds = new HashSet(); for (var elementId : elementsDeleted) { LOGGER.info("Deleting element {} in Migration ", elementId); + participantIds.add(automationComposition.getElements().get(elementId).getParticipantId()); automationComposition.getElements().remove(elementId); acProvider.deleteAutomationCompositionElement(elementId); updateSync.setUpdated(true); } + for (var participantId : participantIds) { + var hasElements = automationComposition.getElements().values().stream().anyMatch(element + -> element.getParticipantId().equals(participantId)); + if (! hasElements) { + participantSyncPublisher.sendDeleteSync(automationComposition, participantId); + } + + } } private void sendNextStage(final AutomationComposition automationComposition, int minStageNotCompleted, diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java index 0a9ae36d7..00512380e 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java @@ -190,6 +190,7 @@ class SupervisionScannerTest { var monitoringScanner = new MonitoringScanner(automationCompositionProvider, acDefinitionProvider, mock(AcDefinitionScanner.class), stageScanner, mock(SimpleScanner.class), mock(PhaseScanner.class), messageProvider); + when(automationCompositionProvider.getAutomationComposition(any())).thenReturn(automationComposition); var supervisionScanner = new SupervisionScanner(automationCompositionProvider, acDefinitionProvider, messageProvider, monitoringScanner); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java index e5f01c86a..776499a36 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java @@ -276,6 +276,11 @@ class SupervisionMessagesTest { automationComposition.setDeployState(DeployState.DELETED); publisher.sendSync(automationComposition); verify(topicSink).send(anyString()); + + clearInvocations(topicSink); + automationComposition.getElements().values().iterator().next().setDeployState(DeployState.DELETED); + publisher.sendDeleteSync(automationComposition, UUID.randomUUID()); + verify(topicSink).send(anyString()); } @Test diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/scanner/StageScannerTest.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/scanner/StageScannerTest.java index 16ee7acd5..42951b18c 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/scanner/StageScannerTest.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/scanner/StageScannerTest.java @@ -20,6 +20,7 @@ package org.onap.policy.clamp.acm.runtime.supervision.scanner; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; @@ -54,18 +55,11 @@ class StageScannerTest { @Test void testSendAutomationCompositionMigrate() { var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud"); - automationComposition.setInstanceId(UUID.randomUUID()); - automationComposition.setDeployState(DeployState.MIGRATING); + assert automationComposition != null; automationComposition.setCompositionId(COMPOSITION_ID); var compositionTargetId = UUID.randomUUID(); automationComposition.setCompositionTargetId(compositionTargetId); - automationComposition.setLockState(LockState.LOCKED); - automationComposition.setLastMsg(TimestampHelper.now()); - automationComposition.setPhase(0); - for (var element : automationComposition.getElements().values()) { - element.setDeployState(DeployState.DEPLOYED); - element.setLockState(LockState.LOCKED); - } + CommonTestData.modifyAcState(automationComposition, DeployState.MIGRATING); // first element is not migrated yet var element = automationComposition.getElements().entrySet().iterator().next().getValue(); element.setDeployState(DeployState.MIGRATING); @@ -74,7 +68,8 @@ class StageScannerTest { when(acProvider.updateAutomationComposition(any())).thenReturn(automationComposition); var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner"); var encryptionUtils = new EncryptionUtils(acRuntimeParameterGroup); - var supervisionScanner = new StageScanner(acProvider, mock(ParticipantSyncPublisher.class), + var participantSyncPublisher = mock(ParticipantSyncPublisher.class); + var supervisionScanner = new StageScanner(acProvider, participantSyncPublisher, mock(AutomationCompositionMigrationPublisher.class), mock(AcPreparePublisher.class), acRuntimeParameterGroup, encryptionUtils); var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML); @@ -102,23 +97,38 @@ class StageScannerTest { assertEquals(DeployState.DEPLOYED, automationComposition.getDeployState()); assertEquals(compositionTargetId, automationComposition.getCompositionId()); + + // remove all element for a participant + clearInvocations(acProvider); + clearInvocations(participantSyncPublisher); + element.setDeployState(DeployState.DELETED); + supervisionScanner.scanStage(automationComposition, acDefinition, new UpdateSync(), UUID.randomUUID()); + verify(acProvider).updateAutomationComposition(any(AutomationComposition.class)); + assertThat(automationComposition.getElements()).doesNotContainKey(element.getId()); //element deleted + verify(participantSyncPublisher, times(1)).sendDeleteSync(automationComposition, element.getParticipantId()); + + // remove one element; participant retains other elements + clearInvocations(acProvider); + clearInvocations(participantSyncPublisher); + for (var e : automationComposition.getElements().values()) { + e.setParticipantId(element.getParticipantId()); + } + automationComposition.getElements().put(element.getId(), element); + supervisionScanner.scanStage(automationComposition, acDefinition, new UpdateSync(), UUID.randomUUID()); + verify(acProvider).updateAutomationComposition(any(AutomationComposition.class)); + assertThat(automationComposition.getElements()).doesNotContainKey(element.getId()); //element deleted + verify(participantSyncPublisher, times(0)).sendDeleteSync(automationComposition, + element.getParticipantId()); } @Test void testSendAutomationCompositionMigrationReverting() { var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud"); - automationComposition.setInstanceId(UUID.randomUUID()); - automationComposition.setDeployState(DeployState.MIGRATION_REVERTING); + assert automationComposition != null; automationComposition.setCompositionId(COMPOSITION_ID); - var compositionTargetId = UUID.randomUUID(); - automationComposition.setCompositionTargetId(compositionTargetId); - automationComposition.setLockState(LockState.LOCKED); - automationComposition.setLastMsg(TimestampHelper.now()); - automationComposition.setPhase(0); - for (var element : automationComposition.getElements().values()) { - element.setDeployState(DeployState.DEPLOYED); - element.setLockState(LockState.LOCKED); - } + automationComposition.setCompositionTargetId(UUID.randomUUID()); + CommonTestData.modifyAcState(automationComposition, DeployState.MIGRATION_REVERTING); + // first element is not migrated yet var element = automationComposition.getElements().entrySet().iterator().next().getValue(); element.setDeployState(DeployState.MIGRATION_REVERTING); diff --git a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java index 4762681a0..566e23421 100644 --- a/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java +++ b/runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2021-2025 Nordix Foundation. + * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,10 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup import org.onap.policy.clamp.acm.runtime.main.parameters.AcmParameters; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; import org.onap.policy.clamp.models.acm.concepts.AcTypeState; +import org.onap.policy.clamp.models.acm.concepts.AutomationComposition; import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition; +import org.onap.policy.clamp.models.acm.concepts.DeployState; +import org.onap.policy.clamp.models.acm.concepts.LockState; import org.onap.policy.clamp.models.acm.concepts.Participant; import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica; import org.onap.policy.clamp.models.acm.concepts.ParticipantState; @@ -176,4 +179,22 @@ public class CommonTestData { return acRuntimeParameterGroup; } + /** + * Modify the state of the AutomationComposition. + * @param automationComposition automationComposition + * @param deployState deployState + */ + public static void modifyAcState(AutomationComposition automationComposition, + DeployState deployState) { + automationComposition.setInstanceId(UUID.randomUUID()); + automationComposition.setDeployState(deployState); + automationComposition.setLockState(LockState.LOCKED); + automationComposition.setLastMsg(TimestampHelper.now()); + automationComposition.setPhase(0); + for (var element : automationComposition.getElements().values()) { + element.setDeployState(DeployState.DEPLOYED); + element.setLockState(LockState.LOCKED); + } + } + }