/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021,2022,2024 Nordix Foundation.
+ * Copyright (C) 2021-2022,2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published")
public void send(UUID participantId) {
var message = new ParticipantStatusReq();
- message.setParticipantId(participantId);
+ if (participantId != null) {
+ message.setParticipantId(participantId);
+ message.getParticipantIdList().add(participantId);
+ }
+
message.setTimestamp(Instant.now());
LOGGER.debug("Participant StatusReq sent {}", message.getMessageId());
var message = new ParticipantSync();
message.setParticipantId(participantId);
+ message.getParticipantIdList().add(participantId);
message.setReplicaId(replicaId);
message.setRestarting(true);
message.setCompositionId(acmDefinition.getCompositionId());
} else {
message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition,
acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
+ message.getParticipantDefinitionUpdates().forEach(participantDefinition
+ -> message.getParticipantIdList().add(participantDefinition.getParticipantId()));
}
LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message);
super.send(message);
*/
@Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
public void sendSync(AutomationComposition automationComposition) {
- var message = new ParticipantSync();
- message.setCompositionId(automationComposition.getCompositionId());
- message.setAutomationCompositionId(automationComposition.getInstanceId());
- message.setState(AcTypeState.PRIMED);
- message.setMessageId(UUID.randomUUID());
- message.setTimestamp(Instant.now());
var syncAc = new ParticipantRestartAc();
syncAc.setCompositionTargetId(automationComposition.getCompositionTargetId());
syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
syncAc.setDeployState(automationComposition.getDeployState());
syncAc.setLockState(automationComposition.getLockState());
syncAc.setStateChangeResult(automationComposition.getStateChangeResult());
+ var message = createSyncMsg(automationComposition);
if (DeployState.DELETED.equals(automationComposition.getDeployState())) {
message.setDelete(true);
} else {
for (var element : automationComposition.getElements().values()) {
var acElementSync = AcmUtils.createAcElementRestart(element);
syncAc.getAcElementList().add(acElementSync);
+ message.getParticipantIdList().add(acElementSync.getParticipantId());
}
}
LOGGER.debug("Participant AutomationComposition Sync sent {}", message.getMessageId());
super.send(message);
}
+
+ /**
+ * Sync event for a participant with all elements deleted in Migration.
+ * @param automationComposition automationComposition
+ * @param participantId participantId
+ */
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void sendDeleteSync(AutomationComposition automationComposition, UUID participantId) {
+ var syncAc = new ParticipantRestartAc();
+ syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+ syncAc.setRevisionId(automationComposition.getRevisionId());
+ syncAc.setDeployState(DeployState.DELETED);
+ syncAc.setLockState(automationComposition.getLockState());
+ syncAc.setStateChangeResult(automationComposition.getStateChangeResult());
+ var message = createSyncMsg(automationComposition);
+ message.setDelete(true);
+ message.setParticipantId(participantId);
+ message.getParticipantIdList().add(participantId);
+ message.getAutomationcompositionList().add(syncAc);
+
+ LOGGER.debug("Participant AutomationComposition Sync sent {}", message.getMessageId());
+ super.send(message);
+ }
+
+ private ParticipantSync createSyncMsg(AutomationComposition automationComposition) {
+ var message = new ParticipantSync();
+ message.setCompositionId(automationComposition.getCompositionId());
+ message.setAutomationCompositionId(automationComposition.getInstanceId());
+ message.setState(AcTypeState.PRIMED);
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+
+ return message;
+ }
}
protected final long maxOperationWaitMs;
protected final AutomationCompositionProvider acProvider;
- private final ParticipantSyncPublisher participantSyncPublisher;
+ protected final ParticipantSyncPublisher participantSyncPublisher;
private final EncryptionUtils encryptionUtils;
protected AbstractScanner(final AutomationCompositionProvider acProvider,
package org.onap.policy.clamp.acm.runtime.supervision.scanner;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
private void removeDeletedElements(AutomationComposition automationComposition, List<UUID> elementsDeleted,
UpdateSync updateSync) {
+ var participantIds = new HashSet<UUID>();
for (var elementId : elementsDeleted) {
LOGGER.info("Deleting element {} in Migration ", elementId);
+ participantIds.add(automationComposition.getElements().get(elementId).getParticipantId());
automationComposition.getElements().remove(elementId);
acProvider.deleteAutomationCompositionElement(elementId);
updateSync.setUpdated(true);
}
+ for (var participantId : participantIds) {
+ var hasElements = automationComposition.getElements().values().stream().anyMatch(element
+ -> element.getParticipantId().equals(participantId));
+ if (! hasElements) {
+ participantSyncPublisher.sendDeleteSync(automationComposition, participantId);
+ }
+
+ }
}
private void sendNextStage(final AutomationComposition automationComposition, int minStageNotCompleted,
var monitoringScanner = new MonitoringScanner(automationCompositionProvider, acDefinitionProvider,
mock(AcDefinitionScanner.class), stageScanner, mock(SimpleScanner.class), mock(PhaseScanner.class),
messageProvider);
+ when(automationCompositionProvider.getAutomationComposition(any())).thenReturn(automationComposition);
var supervisionScanner = new SupervisionScanner(automationCompositionProvider, acDefinitionProvider,
messageProvider, monitoringScanner);
automationComposition.setDeployState(DeployState.DELETED);
publisher.sendSync(automationComposition);
verify(topicSink).send(anyString());
+
+ clearInvocations(topicSink);
+ automationComposition.getElements().values().iterator().next().setDeployState(DeployState.DELETED);
+ publisher.sendDeleteSync(automationComposition, UUID.randomUUID());
+ verify(topicSink).send(anyString());
}
@Test
package org.onap.policy.clamp.acm.runtime.supervision.scanner;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
@Test
void testSendAutomationCompositionMigrate() {
var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud");
- automationComposition.setInstanceId(UUID.randomUUID());
- automationComposition.setDeployState(DeployState.MIGRATING);
+ assert automationComposition != null;
automationComposition.setCompositionId(COMPOSITION_ID);
var compositionTargetId = UUID.randomUUID();
automationComposition.setCompositionTargetId(compositionTargetId);
- automationComposition.setLockState(LockState.LOCKED);
- automationComposition.setLastMsg(TimestampHelper.now());
- automationComposition.setPhase(0);
- for (var element : automationComposition.getElements().values()) {
- element.setDeployState(DeployState.DEPLOYED);
- element.setLockState(LockState.LOCKED);
- }
+ CommonTestData.modifyAcState(automationComposition, DeployState.MIGRATING);
// first element is not migrated yet
var element = automationComposition.getElements().entrySet().iterator().next().getValue();
element.setDeployState(DeployState.MIGRATING);
when(acProvider.updateAutomationComposition(any())).thenReturn(automationComposition);
var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
var encryptionUtils = new EncryptionUtils(acRuntimeParameterGroup);
- var supervisionScanner = new StageScanner(acProvider, mock(ParticipantSyncPublisher.class),
+ var participantSyncPublisher = mock(ParticipantSyncPublisher.class);
+ var supervisionScanner = new StageScanner(acProvider, participantSyncPublisher,
mock(AutomationCompositionMigrationPublisher.class), mock(AcPreparePublisher.class),
acRuntimeParameterGroup, encryptionUtils);
var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
assertEquals(DeployState.DEPLOYED, automationComposition.getDeployState());
assertEquals(compositionTargetId, automationComposition.getCompositionId());
+
+ // remove all element for a participant
+ clearInvocations(acProvider);
+ clearInvocations(participantSyncPublisher);
+ element.setDeployState(DeployState.DELETED);
+ supervisionScanner.scanStage(automationComposition, acDefinition, new UpdateSync(), UUID.randomUUID());
+ verify(acProvider).updateAutomationComposition(any(AutomationComposition.class));
+ assertThat(automationComposition.getElements()).doesNotContainKey(element.getId()); //element deleted
+ verify(participantSyncPublisher, times(1)).sendDeleteSync(automationComposition, element.getParticipantId());
+
+ // remove one element; participant retains other elements
+ clearInvocations(acProvider);
+ clearInvocations(participantSyncPublisher);
+ for (var e : automationComposition.getElements().values()) {
+ e.setParticipantId(element.getParticipantId());
+ }
+ automationComposition.getElements().put(element.getId(), element);
+ supervisionScanner.scanStage(automationComposition, acDefinition, new UpdateSync(), UUID.randomUUID());
+ verify(acProvider).updateAutomationComposition(any(AutomationComposition.class));
+ assertThat(automationComposition.getElements()).doesNotContainKey(element.getId()); //element deleted
+ verify(participantSyncPublisher, times(0)).sendDeleteSync(automationComposition,
+ element.getParticipantId());
}
@Test
void testSendAutomationCompositionMigrationReverting() {
var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud");
- automationComposition.setInstanceId(UUID.randomUUID());
- automationComposition.setDeployState(DeployState.MIGRATION_REVERTING);
+ assert automationComposition != null;
automationComposition.setCompositionId(COMPOSITION_ID);
- var compositionTargetId = UUID.randomUUID();
- automationComposition.setCompositionTargetId(compositionTargetId);
- automationComposition.setLockState(LockState.LOCKED);
- automationComposition.setLastMsg(TimestampHelper.now());
- automationComposition.setPhase(0);
- for (var element : automationComposition.getElements().values()) {
- element.setDeployState(DeployState.DEPLOYED);
- element.setLockState(LockState.LOCKED);
- }
+ automationComposition.setCompositionTargetId(UUID.randomUUID());
+ CommonTestData.modifyAcState(automationComposition, DeployState.MIGRATION_REVERTING);
+
// first element is not migrated yet
var element = automationComposition.getElements().entrySet().iterator().next().getValue();
element.setDeployState(DeployState.MIGRATION_REVERTING);
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2025 Nordix Foundation.
+ * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.onap.policy.clamp.acm.runtime.main.parameters.AcmParameters;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
import org.onap.policy.clamp.models.acm.concepts.Participant;
import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
return acRuntimeParameterGroup;
}
+ /**
+ * Modify the state of the AutomationComposition.
+ * @param automationComposition automationComposition
+ * @param deployState deployState
+ */
+ public static void modifyAcState(AutomationComposition automationComposition,
+ DeployState deployState) {
+ automationComposition.setInstanceId(UUID.randomUUID());
+ automationComposition.setDeployState(deployState);
+ automationComposition.setLockState(LockState.LOCKED);
+ automationComposition.setLastMsg(TimestampHelper.now());
+ automationComposition.setPhase(0);
+ for (var element : automationComposition.getElements().values()) {
+ element.setDeployState(DeployState.DEPLOYED);
+ element.setLockState(LockState.LOCKED);
+ }
+ }
+
}