import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg;
import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionMigration;
value = "listener.automation_composition_migration",
description = "AUTOMATION_COMPOSITION_MIGRATION messages received")
public void handleAutomationCompositionMigration(AutomationCompositionMigration migrationMsg) {
+ var stateNew = false;
+ var stateDefault = false;
+ var stateRemoved = false;
+ for (var ac : migrationMsg.getParticipantUpdatesList()) {
+ if (cacheProvider.getParticipantId().equals(ac.getParticipantId())) {
+ for (var element : ac.getAcElementList()) {
+ switch (element.getMigrationState()) {
+ case NEW -> stateNew = true;
+ case REMOVED -> stateRemoved = true;
+ default -> stateDefault = true;
+ }
+ }
+ }
+ }
var acMsg = Boolean.TRUE.equals(migrationMsg.getPrecheck())
? new AutomationCompositionMsg<>(acSubStateHandler::handleAcMigrationPrecheck, migrationMsg)
: new AutomationCompositionMsg<>(
- automationCompositionHandler::handleAutomationCompositionMigration, migrationMsg);
- setCompositionUpdate(migrationMsg, acMsg);
- setInstanceUpdate(migrationMsg, acMsg);
- acMsg.setCompositionTargetId(migrationMsg.getCompositionTargetId());
- acMsg.setRevisionIdCompositionTarget(migrationMsg.getRevisionIdCompositionTarget());
+ automationCompositionHandler::handleAutomationCompositionMigration, migrationMsg);
+ setUpdate(migrationMsg, acMsg, stateNew, stateDefault, stateRemoved);
msgExecutor.execute(acMsg);
}
+ private void setUpdate(AutomationCompositionMigration migrationMsg,
+ AutomationCompositionMsg<AutomationCompositionMigration> acMsg, boolean stateNew, boolean stateDefault,
+ boolean stateRemoved) {
+ if (stateDefault || stateRemoved) {
+ setCompositionUpdate(migrationMsg, acMsg);
+ setInstanceUpdate(migrationMsg, acMsg);
+ } else {
+ checkAutomationComposition(migrationMsg);
+ }
+ if (stateDefault || stateNew) {
+ acMsg.setCompositionTargetId(migrationMsg.getCompositionTargetId());
+ acMsg.setRevisionIdCompositionTarget(migrationMsg.getRevisionIdCompositionTarget());
+ }
+ }
+
+ private void checkAutomationComposition(AutomationCompositionMigration migrationMsg) {
+ var ac = cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId());
+ if (ac != null && migrationMsg.getRevisionIdInstance().equals(ac.getRevisionId())) {
+ return;
+ }
+ var restart = new ParticipantRestartAc();
+ restart.setCompositionTargetId(migrationMsg.getCompositionTargetId());
+ restart.setAutomationCompositionId(migrationMsg.getAutomationCompositionId());
+ restart.setRevisionId(migrationMsg.getRevisionIdInstance());
+ restart.setDeployState(Boolean.TRUE.equals(migrationMsg.getRollback())
+ ? DeployState.MIGRATION_REVERTING : DeployState.MIGRATING);
+ restart.setLockState(LockState.LOCKED);
+ cacheProvider.initializeAutomationComposition(migrationMsg.getCompositionId(), restart);
+ }
+
/**
* Handle a automation composition property update message.
*
import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.MigrationState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionMigration;
verify(acLockHandler).handleAutomationCompositionStateChange(acStateChange);
}
+ @Test
+ void handleMigrationAddTest() {
+ var migrationMsg = new AutomationCompositionMigration();
+ migrationMsg.setCompositionId(UUID.randomUUID());
+ migrationMsg.setRevisionIdComposition(UUID.randomUUID());
+ migrationMsg.setAutomationCompositionId(UUID.randomUUID());
+ migrationMsg.setRevisionIdInstance(UUID.randomUUID());
+
+ migrationMsg.setCompositionTargetId(UUID.randomUUID());
+ migrationMsg.setRevisionIdCompositionTarget(UUID.randomUUID());
+ var cacheProvider = mock(CacheProvider.class);
+ when(cacheProvider.isCompositionDefinitionUpdated(migrationMsg.getCompositionTargetId(),
+ migrationMsg.getRevisionIdCompositionTarget())).thenReturn(true);
+
+ when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+ var participantDeploy = new ParticipantDeploy();
+ participantDeploy.setParticipantId(CommonTestData.getParticipantId());
+ var acElementDeploy = new AcElementDeploy();
+ acElementDeploy.setMigrationState(MigrationState.NEW);
+ participantDeploy.setAcElementList(List.of(acElementDeploy));
+ migrationMsg.setParticipantUpdatesList(List.of(participantDeploy));
+
+ var acHandler = mock(AutomationCompositionHandler.class);
+ var acSubStateHandler = mock(AcSubStateHandler.class);
+ var msgExecutor = new MsgExecutor(cacheProvider, mock(ParticipantMessagePublisher.class));
+ var participantHandler = new ParticipantHandler(acHandler, mock(AcLockHandler.class),
+ acSubStateHandler, mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
+ cacheProvider, msgExecutor);
+ participantHandler.handleAutomationCompositionMigration(migrationMsg);
+ verify(acHandler).handleAutomationCompositionMigration(migrationMsg);
+ verify(cacheProvider).initializeAutomationComposition(any(), any());
+
+ clearInvocations(acHandler, cacheProvider);
+ migrationMsg.setRollback(true);
+ var automationComposition = new AutomationComposition();
+ automationComposition.setInstanceId(migrationMsg.getAutomationCompositionId());
+ when(cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId()))
+ .thenReturn(automationComposition);
+ participantHandler.handleAutomationCompositionMigration(migrationMsg);
+ verify(acHandler).handleAutomationCompositionMigration(migrationMsg);
+ verify(cacheProvider).initializeAutomationComposition(any(), any());
+
+ clearInvocations(acHandler, cacheProvider);
+ automationComposition.setRevisionId(migrationMsg.getRevisionIdInstance());
+ when(cacheProvider.getAutomationComposition(migrationMsg.getAutomationCompositionId()))
+ .thenReturn(automationComposition);
+ participantHandler.handleAutomationCompositionMigration(migrationMsg);
+ verify(acHandler).handleAutomationCompositionMigration(migrationMsg);
+ verify(cacheProvider, times(0)).initializeAutomationComposition(any(), any());
+ }
+
+ @Test
+ void handleMigrationRemovedTest() {
+ var migrationMsg = new AutomationCompositionMigration();
+ migrationMsg.setCompositionId(UUID.randomUUID());
+ migrationMsg.setRevisionIdComposition(UUID.randomUUID());
+ var cacheProvider = mock(CacheProvider.class);
+ when(cacheProvider.isCompositionDefinitionUpdated(migrationMsg.getCompositionId(),
+ migrationMsg.getRevisionIdComposition())).thenReturn(true);
+
+ migrationMsg.setAutomationCompositionId(UUID.randomUUID());
+ migrationMsg.setRevisionIdInstance(UUID.randomUUID());
+ when(cacheProvider.isInstanceUpdated(migrationMsg.getAutomationCompositionId(),
+ migrationMsg.getRevisionIdInstance())).thenReturn(true);
+
+ migrationMsg.setCompositionTargetId(UUID.randomUUID());
+ migrationMsg.setRevisionIdCompositionTarget(UUID.randomUUID());
+
+ when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+ var participantDeploy = new ParticipantDeploy();
+ participantDeploy.setParticipantId(CommonTestData.getParticipantId());
+ var acElementDeploy = new AcElementDeploy();
+ acElementDeploy.setMigrationState(MigrationState.REMOVED);
+ participantDeploy.setAcElementList(List.of(acElementDeploy));
+ migrationMsg.setParticipantUpdatesList(List.of(participantDeploy));
+
+ var acHandler = mock(AutomationCompositionHandler.class);
+ var acSubStateHandler = mock(AcSubStateHandler.class);
+ var msgExecutor = new MsgExecutor(cacheProvider, mock(ParticipantMessagePublisher.class));
+ var participantHandler = new ParticipantHandler(acHandler, mock(AcLockHandler.class),
+ acSubStateHandler, mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
+ cacheProvider, msgExecutor);
+ participantHandler.handleAutomationCompositionMigration(migrationMsg);
+ verify(acHandler).handleAutomationCompositionMigration(migrationMsg);
+ }
+
@Test
void handleAutomationCompositionMigrationTest() {
var cacheProvider = mock(CacheProvider.class);
when(cacheProvider.isCompositionDefinitionUpdated(migrationMsg.getCompositionTargetId(),
migrationMsg.getRevisionIdCompositionTarget())).thenReturn(true);
+ when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+ var participantDeploy = new ParticipantDeploy();
+ participantDeploy.setParticipantId(CommonTestData.getParticipantId());
+ var acElementDeploy = new AcElementDeploy();
+ acElementDeploy.setMigrationState(MigrationState.DEFAULT);
+ participantDeploy.setAcElementList(List.of(acElementDeploy));
+ var participantDeploy2 = new ParticipantDeploy();
+ participantDeploy2.setParticipantId(UUID.randomUUID());
+ migrationMsg.setParticipantUpdatesList(List.of(participantDeploy, participantDeploy2));
+
var acHandler = mock(AutomationCompositionHandler.class);
var acSubStateHandler = mock(AcSubStateHandler.class);
var msgExecutor = new MsgExecutor(cacheProvider, mock(ParticipantMessagePublisher.class));