Update ACM-r message handling architecture in ACM 44/140244/2
authorFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 18 Feb 2025 14:11:52 +0000 (14:11 +0000)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 18 Feb 2025 17:18:52 +0000 (17:18 +0000)
Issue-ID: POLICY-5286
Change-Id: I8e74bc1dcdfcbed258778e88542bdef5fe1e9149
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
15 files changed:
models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/AcDefinitionProvider.java
models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/AutomationCompositionProvider.java
models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/MessageProvider.java [new file with mode: 0644]
models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/AcDefinitionProviderTest.java
models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/AutomationCompositionProviderTest.java
models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/MessageProviderTest.java [new file with mode: 0644]
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/scanner/SimpleScanner.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java

index 6de27f7..bd99a75 100644 (file)
@@ -24,18 +24,16 @@ import jakarta.ws.rs.core.Response;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
-import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.document.base.ToscaServiceTemplateValidation;
 import org.onap.policy.clamp.models.acm.document.concepts.DocToscaServiceTemplate;
 import org.onap.policy.clamp.models.acm.persistence.concepts.JpaAutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.persistence.concepts.JpaNodeTemplateState;
 import org.onap.policy.clamp.models.acm.persistence.repository.AutomationCompositionDefinitionRepository;
-import org.onap.policy.clamp.models.acm.persistence.repository.NodeTemplateStateRepository;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 import org.onap.policy.common.parameters.BeanValidationResult;
@@ -43,7 +41,6 @@ import org.onap.policy.models.base.PfModelRuntimeException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 import org.springframework.data.domain.Example;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
 @Service
@@ -54,7 +51,6 @@ public class AcDefinitionProvider {
     private static final String NAME = "AutomationCompositionDefinition";
 
     private final AutomationCompositionDefinitionRepository acmDefinitionRepository;
-    private final NodeTemplateStateRepository nodeTemplateStateRepository;
 
     /**
      * Create Automation Composition Definition.
@@ -146,40 +142,6 @@ public class AcDefinitionProvider {
         acmDefinitionRepository.flush();
     }
 
-    /**
-     * Update Ac Definition AcTypeState, StateChangeResult and restarting.
-     *
-     * @param compositionId The UUID of the automation composition definition to update
-     * @param state the AcTypeState
-     * @param stateChangeResult the StateChangeResult
-     */
-    public void updateAcDefinitionState(UUID compositionId, AcTypeState state, StateChangeResult stateChangeResult) {
-        var jpaUpdate = acmDefinitionRepository.findById(compositionId.toString());
-        if (jpaUpdate.isEmpty()) {
-            String errorMessage = "update of Automation Composition Definition \"" + compositionId
-                + "\" failed, Automation Composition Definition does not exist";
-            throw new PfModelRuntimeException(Response.Status.NOT_FOUND, errorMessage);
-        }
-        var acDefinition = jpaUpdate.get();
-        acDefinition.setState(state);
-        acDefinition.setStateChangeResult(stateChangeResult);
-        acmDefinitionRepository.save(acDefinition);
-        acmDefinitionRepository.flush();
-    }
-
-    /**
-     * Update Ac DefinitionElement.
-     *
-     * @param nodeTemplateState the NodeTemplateState
-     * @param compositionId The UUID of the automation composition definition
-     */
-    public void updateAcDefinitionElement(NodeTemplateState nodeTemplateState, UUID compositionId) {
-        var jpaNodeTemplateState = new JpaNodeTemplateState(
-            nodeTemplateState.getNodeTemplateStateId().toString(), compositionId.toString());
-        jpaNodeTemplateState.fromAuthorative(nodeTemplateState);
-        nodeTemplateStateRepository.save(jpaNodeTemplateState);
-    }
-
     /**
      * Delete Automation Composition Definition.
      *
@@ -222,7 +184,7 @@ public class AcDefinitionProvider {
      * @param compositionId The UUID of the automation composition definition to delete
      * @return the automation composition definition
      */
-    @Transactional(readOnly = true, isolation = Isolation.READ_UNCOMMITTED)
+    @Transactional(readOnly = true)
     public Optional<AutomationCompositionDefinition> findAcDefinition(UUID compositionId) {
         var jpaGet = acmDefinitionRepository.findById(compositionId.toString());
         return jpaGet.stream().map(JpaAutomationCompositionDefinition::toAuthorative).findFirst();
@@ -234,9 +196,10 @@ public class AcDefinitionProvider {
      * @return the Automation Composition Definitions found
      */
     @Transactional(readOnly = true)
-    public List<AutomationCompositionDefinition> getAllAcDefinitionsInTransition() {
+    public Set<UUID> getAllAcDefinitionsInTransition() {
         var jpaList = acmDefinitionRepository.findByStateIn(List.of(AcTypeState.PRIMING, AcTypeState.DEPRIMING));
-        return ProviderUtils.asEntityList(jpaList);
+        return jpaList.stream().map(JpaAutomationCompositionDefinition::getCompositionId)
+                .map(UUID::fromString).collect(Collectors.toSet());
     }
 
     /**
index ab80bc2..3bfcf72 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2021-2024 Nordix Foundation.
+ * Copyright (C) 2021-2025 Nordix Foundation.
  * ================================================================================
  * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
@@ -24,20 +24,19 @@ package org.onap.policy.clamp.models.acm.persistence.provider;
 
 import jakarta.ws.rs.core.Response;
 import jakarta.ws.rs.core.Response.Status;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import lombok.AllArgsConstructor;
 import lombok.NonNull;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.SubState;
 import org.onap.policy.clamp.models.acm.persistence.concepts.JpaAutomationComposition;
-import org.onap.policy.clamp.models.acm.persistence.concepts.JpaAutomationCompositionElement;
 import org.onap.policy.clamp.models.acm.persistence.repository.AutomationCompositionElementRepository;
 import org.onap.policy.clamp.models.acm.persistence.repository.AutomationCompositionRepository;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
@@ -118,23 +117,6 @@ public class AutomationCompositionProvider {
         return result.toAuthorative();
     }
 
-
-    /**
-     * Update automation composition state.
-     *
-     * @param acSource the automation composition to update
-     * @return the updated automation composition
-     */
-    public AutomationComposition updateAcState(final AutomationComposition acSource) {
-        var automationComposition = automationCompositionRepository
-                .getReferenceById(acSource.getInstanceId().toString());
-        automationComposition.fromAuthorativeBase(acSource);
-        var result = automationCompositionRepository.save(automationComposition);
-        automationCompositionRepository.flush();
-        // Return the saved automation composition
-        return result.toAuthorative();
-    }
-
     /**
      * Update automation composition.
      *
@@ -168,14 +150,15 @@ public class AutomationCompositionProvider {
      * @return all automation compositions found
      */
     @Transactional(readOnly = true)
-    public List<AutomationComposition> getAcInstancesInTransition() {
+    public Set<UUID> getAcInstancesInTransition() {
         var jpaList = automationCompositionRepository.findByDeployStateIn(List.of(DeployState.DEPLOYING,
             DeployState.UNDEPLOYING, DeployState.DELETING, DeployState.UPDATING, DeployState.MIGRATING));
         jpaList.addAll(automationCompositionRepository.findByLockStateIn(
             List.of(LockState.LOCKING, LockState.UNLOCKING)));
         jpaList.addAll(automationCompositionRepository.findBySubStateIn(
                 List.of(SubState.PREPARING, SubState.MIGRATION_PRECHECKING, SubState.REVIEWING)));
-        return ProviderUtils.asEntityList(jpaList);
+        return jpaList.stream().map(JpaAutomationComposition::getInstanceId)
+                .map(UUID::fromString).collect(Collectors.toSet());
     }
 
     /**
@@ -226,49 +209,6 @@ public class AutomationCompositionProvider {
         return jpaDeleteAutomationComposition.get().toAuthorative();
     }
 
-    /**
-     * Upgrade States.
-     *
-     * @param automationCompositionInfoList list of AutomationCompositionInfo
-     */
-    public void upgradeStates(@NonNull final List<AutomationCompositionInfo> automationCompositionInfoList) {
-        if (automationCompositionInfoList.isEmpty()) {
-            return;
-        }
-        List<JpaAutomationCompositionElement> jpaList = new ArrayList<>();
-        for (var acInstance : automationCompositionInfoList) {
-            for (var element : acInstance.getElements()) {
-                var jpa = acElementRepository.getReferenceById(element.getAutomationCompositionElementId().toString());
-                jpa.setUseState(element.getUseState());
-                jpa.setOperationalState(element.getOperationalState());
-                jpa.setOutProperties(element.getOutProperties());
-                jpaList.add(jpa);
-            }
-        }
-        acElementRepository.saveAll(jpaList);
-    }
-
-    /**
-     * Update AutomationCompositionElement.
-     *
-     * @param element the AutomationCompositionElement
-     */
-    public void updateAutomationCompositionElement(@NonNull final AutomationCompositionElement element) {
-        var jpaAcElement = acElementRepository.getReferenceById(element.getId().toString());
-        jpaAcElement.setMessage(element.getMessage());
-        jpaAcElement.setOutProperties(element.getOutProperties());
-        jpaAcElement.setOperationalState(element.getOperationalState());
-        jpaAcElement.setUseState(element.getUseState());
-        jpaAcElement.setDeployState(element.getDeployState());
-        jpaAcElement.setLockState(element.getLockState());
-        jpaAcElement.setSubState(element.getSubState());
-        jpaAcElement.setStage(element.getStage());
-        jpaAcElement.setRestarting(element.getRestarting());
-
-        ProviderUtils.validate(element, jpaAcElement, "AutomationCompositionElement");
-        acElementRepository.save(jpaAcElement);
-    }
-
     /**
      * Delete AutomationCompositionElement.
      *
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/MessageProvider.java b/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/MessageProvider.java
new file mode 100644 (file)
index 0000000..c3e5543
--- /dev/null
@@ -0,0 +1,238 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.persistence.provider;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import org.onap.policy.clamp.models.acm.document.concepts.DocMessage;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
+import org.onap.policy.clamp.models.acm.persistence.concepts.JpaMessage;
+import org.onap.policy.clamp.models.acm.persistence.concepts.JpaMessageJob;
+import org.onap.policy.clamp.models.acm.persistence.repository.MessageJobRepository;
+import org.onap.policy.clamp.models.acm.persistence.repository.MessageRepository;
+import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+@Service
+@Transactional
+@AllArgsConstructor
+public class MessageProvider {
+
+    private final MessageRepository messageRepository;
+    private final MessageJobRepository messageJobRepository;
+
+    /**
+     * Save ParticipantPrimeAck message.
+     *
+     * @param message the ParticipantPrimeAck message
+     */
+    public void save(ParticipantPrimeAck message) {
+        var doc = from(message);
+        doc.setCompositionState(message.getCompositionState());
+        doc.setMessage(AcmUtils.validatedMessage(message.getMessage()));
+        var jpa = new JpaMessage(message.getCompositionId().toString(), doc);
+        ProviderUtils.validate(doc, jpa, "ParticipantPrimeAck message");
+        messageRepository.save(jpa);
+    }
+
+    /**
+     * Save AutomationCompositionDeployAck message.
+     *
+     * @param message the AutomationCompositionDeployAck message
+     */
+    public void save(AutomationCompositionDeployAck message) {
+        for (var entry : message.getAutomationCompositionResultMap().entrySet()) {
+            var doc = from(message);
+            doc.setStage(message.getStage());
+            doc.setInstanceElementId(entry.getKey());
+            doc.setInstanceId(message.getAutomationCompositionId());
+            doc.setMessage(AcmUtils.validatedMessage(entry.getValue().getMessage()));
+            doc.setDeployState(entry.getValue().getDeployState());
+            doc.setLockState(entry.getValue().getLockState());
+            var jpa = new JpaMessage(message.getAutomationCompositionId().toString(), doc);
+            ProviderUtils.validate(doc, jpa, "AutomationCompositionDeployAck message");
+            messageRepository.save(jpa);
+        }
+    }
+
+    /**
+     * Save ParticipantStatus message.
+     *
+     * @param message the ParticipantStatus message
+     */
+    public void save(ParticipantStatus message) {
+        if (!message.getAutomationCompositionInfoList().isEmpty()) {
+            saveInstanceOutProperties(message);
+        }
+        if (!message.getParticipantDefinitionUpdates().isEmpty()) {
+            saveCompositionOutProperties(message);
+        }
+    }
+
+    private void saveInstanceOutProperties(ParticipantStatus message) {
+        for (var instance : message.getAutomationCompositionInfoList()) {
+            for (var element : instance.getElements()) {
+                var jpa = new JpaMessage();
+                jpa.setIdentificationId(instance.getAutomationCompositionId().toString());
+                var doc = from(message);
+                doc.setInstanceId(instance.getAutomationCompositionId());
+                doc.setUseState(element.getUseState());
+                doc.setOperationalState(element.getOperationalState());
+                doc.setOutProperties(element.getOutProperties());
+                doc.setInstanceElementId(element.getAutomationCompositionElementId());
+                jpa.fromAuthorative(doc);
+                ProviderUtils.validate(doc, jpa, "ParticipantStatus instance message");
+                messageRepository.save(jpa);
+            }
+        }
+    }
+
+    private void saveCompositionOutProperties(ParticipantStatus message) {
+        for (var acDefinition : message.getParticipantDefinitionUpdates()) {
+            for (var element : acDefinition.getAutomationCompositionElementDefinitionList()) {
+                var jpa = new JpaMessage();
+                jpa.setIdentificationId(message.getCompositionId().toString());
+                var doc = from(message);
+                doc.setOutProperties(element.getOutProperties());
+                doc.setAcElementDefinitionId(element.getAcElementDefinitionId());
+                jpa.fromAuthorative(doc);
+                ProviderUtils.validate(doc, jpa, "ParticipantStatus composition message");
+                messageRepository.save(jpa);
+            }
+        }
+    }
+
+    private DocMessage from(ParticipantStatus message) {
+        var doc = new DocMessage();
+        doc.setMessageType(message.getMessageType());
+        doc.setCompositionId(message.getCompositionId());
+        doc.setParticipantId(message.getParticipantId());
+        doc.setReplicaId(message.getReplicaId());
+        doc.setMessageType(message.getMessageType());
+        return doc;
+    }
+
+    private DocMessage from(ParticipantAckMessage message) {
+        var doc = new DocMessage();
+        doc.setMessageType(message.getMessageType());
+        doc.setCompositionId(message.getCompositionId());
+        doc.setStateChangeResult(message.getStateChangeResult());
+        doc.setParticipantId(message.getParticipantId());
+        doc.setReplicaId(message.getReplicaId());
+        return doc;
+    }
+
+    @Transactional(readOnly = true)
+    public List<DocMessage> getAllMessages(UUID identificationId) {
+        var result = messageRepository.findByIdentificationIdOrderByLastMsgDesc(identificationId.toString());
+        return result.stream().map(JpaMessage::toAuthorative).toList();
+    }
+
+    /**
+     * Find all Composition ids from Messages.
+     *
+     * @return set of Composition ids
+     */
+    @Transactional(readOnly = true)
+    public Set<UUID> findCompositionMessages() {
+        var result = messageRepository.findAll();
+        return result.stream()
+                .map(JpaMessage::toAuthorative)
+                .filter(doc -> doc.getInstanceId() == null)
+                .map(DocMessage::getCompositionId)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Find all Instance ids from Messages.
+     *
+     * @return set of Instance ids
+     */
+    @Transactional(readOnly = true)
+    public Set<UUID> findInstanceMessages() {
+        var result = messageRepository.findAll();
+        return result.stream()
+                .map(JpaMessage::toAuthorative)
+                .map(DocMessage::getInstanceId)
+                .filter(Objects::nonNull)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Remove the message.
+     *
+     * @param messageId the messageId
+     */
+    public void removeMessage(String messageId) {
+        messageRepository.deleteById(messageId);
+    }
+
+    /**
+     * Remove old jobs.
+     */
+    public void removeOldJobs() {
+        var list = messageJobRepository.findAll();
+        var old = Timestamp.from(Instant.now().minusSeconds(200));
+        for (var job : list) {
+            if (job.getJobStarted().before(old)) {
+                messageJobRepository.deleteById(job.getJobId());
+            }
+        }
+    }
+
+    /**
+     * Create new Job related to the identificationId.
+     *
+     * @param identificationId the instanceId or compositionId
+     *
+     * @return the jobId if the job has been created
+     */
+    public Optional<String> createJob(UUID identificationId) {
+        var opt = messageJobRepository.findByIdentificationId(identificationId.toString());
+        if (opt.isPresent()) {
+            // already exist a job with this identificationId
+            return Optional.empty();
+        }
+        var job = new JpaMessageJob(identificationId.toString());
+        var result = messageJobRepository.save(job);
+        return Optional.of(result.getJobId());
+    }
+
+    /**
+     * Remove the job by jobId.
+     *
+     * @param jobId the jobId
+     */
+    public void removeJob(String jobId) {
+        messageJobRepository.deleteById(jobId);
+    }
+}
index b6fbe09..bc0a093 100644 (file)
@@ -23,7 +23,6 @@ package org.onap.policy.clamp.models.acm.persistence.provider;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -39,12 +38,9 @@ import org.mockito.Mockito;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
-import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.document.concepts.DocToscaServiceTemplate;
 import org.onap.policy.clamp.models.acm.persistence.concepts.JpaAutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.persistence.concepts.JpaNodeTemplateState;
 import org.onap.policy.clamp.models.acm.persistence.repository.AutomationCompositionDefinitionRepository;
-import org.onap.policy.clamp.models.acm.persistence.repository.NodeTemplateStateRepository;
 import org.onap.policy.clamp.models.acm.utils.CommonTestData;
 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -68,7 +64,7 @@ class AcDefinitionProviderTest {
     @Test
     void testBadRequest() {
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
 
         var compositionId = UUID.randomUUID();
         var serviceTemplate = new ToscaServiceTemplate();
@@ -82,10 +78,6 @@ class AcDefinitionProviderTest {
         var acmDefinition = getAcDefinition(docServiceTemplate);
         assertThatThrownBy(() -> acDefinitionProvider.updateAcDefinition(acmDefinition, "CompositionName"))
                 .hasMessageStartingWith("\"AutomationCompositionDefinition\" INVALID, item has status INVALID");
-
-        assertThatThrownBy(() -> acDefinitionProvider.updateAcDefinitionState(compositionId, AcTypeState.PRIMED,
-                StateChangeResult.NO_ERROR))
-                .hasMessageStartingWith("update of Automation Composition Definition");
     }
 
     @BeforeAll
@@ -120,7 +112,7 @@ class AcDefinitionProviderTest {
         when(acmDefinitionRepository.save(any(JpaAutomationCompositionDefinition.class)))
                 .thenReturn(new JpaAutomationCompositionDefinition(acmDefinition));
 
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var result = acDefinitionProvider
                 .createAutomationCompositionDefinition(inputServiceTemplate, ELEMENT_NAME, NODE_TYPE);
 
@@ -132,7 +124,7 @@ class AcDefinitionProviderTest {
     void testToscaWithInvalidElement() {
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
 
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
 
         assertThatThrownBy(() -> acDefinitionProvider
                 .createAutomationCompositionDefinition(inputServiceTemplate, INVALID_ELEMENT_NAME, NODE_TYPE))
@@ -143,7 +135,7 @@ class AcDefinitionProviderTest {
     void testToscaWithInvalidNodeType() {
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
 
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
 
         assertThatThrownBy(() -> acDefinitionProvider
                 .createAutomationCompositionDefinition(inputServiceTemplate, ELEMENT_NAME, INVALID_NODE_TYPE))
@@ -160,7 +152,7 @@ class AcDefinitionProviderTest {
         when(acmDefinitionRepository.save(any(JpaAutomationCompositionDefinition.class)))
             .thenReturn(new JpaAutomationCompositionDefinition(acmDefinition));
 
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         inputServiceTemplate.setMetadata(new HashMap<>());
         var result = acDefinitionProvider
                 .createAutomationCompositionDefinition(inputServiceTemplate, ELEMENT_NAME, NODE_TYPE);
@@ -172,7 +164,7 @@ class AcDefinitionProviderTest {
     @Test
     void testUpdateServiceTemplate() {
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         acDefinitionProvider.updateServiceTemplate(UUID.randomUUID(), inputServiceTemplate, ELEMENT_NAME, NODE_TYPE);
         verify(acmDefinitionRepository).save(any(JpaAutomationCompositionDefinition.class));
     }
@@ -180,7 +172,7 @@ class AcDefinitionProviderTest {
     @Test
     void testUpdateAcDefinition() {
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var acmDefinition = getAcDefinition(new DocToscaServiceTemplate(inputServiceTemplate));
         acDefinitionProvider.updateAcDefinition(acmDefinition, NODE_TYPE);
         verify(acmDefinitionRepository).save(any(JpaAutomationCompositionDefinition.class));
@@ -189,40 +181,20 @@ class AcDefinitionProviderTest {
     @Test
     void testUpdateAcDefinitionState() {
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var acmDefinition = getAcDefinition(new DocToscaServiceTemplate(inputServiceTemplate));
         acmDefinition.setState(AcTypeState.PRIMING);
-        var jpa = new JpaAutomationCompositionDefinition(acmDefinition);
-        when(acmDefinitionRepository.findById(acmDefinition.getCompositionId().toString()))
-            .thenReturn(Optional.of(jpa));
-        acDefinitionProvider.updateAcDefinitionState(acmDefinition.getCompositionId(), AcTypeState.PRIMED,
-            StateChangeResult.NO_ERROR);
-        verify(acmDefinitionRepository).save(jpa);
-
-        clearInvocations(acmDefinitionRepository);
         acDefinitionProvider.updateAcDefinitionState(acmDefinition);
         verify(acmDefinitionRepository).save(any());
     }
 
-    @Test
-    void testUpdateAcDefinitionElement() {
-        var nodeTemplateState = new NodeTemplateState();
-        nodeTemplateState.setNodeTemplateId(new ToscaConceptIdentifier("name", "1.0.0"));
-        nodeTemplateState.setNodeTemplateStateId(UUID.randomUUID());
-        nodeTemplateState.setState(AcTypeState.PRIMED);
-        var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
-        var acDefinitionProvider = new AcDefinitionProvider(null, nodeTemplateStateRepository);
-        acDefinitionProvider.updateAcDefinitionElement(nodeTemplateState, UUID.randomUUID());
-        verify(nodeTemplateStateRepository).save(any(JpaNodeTemplateState.class));
-    }
-
     @Test
     void testGetAcDefinition() {
         var jpa = new JpaAutomationCompositionDefinition();
         jpa.fromAuthorative(getAcDefinition(new DocToscaServiceTemplate(inputServiceTemplate)));
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
         when(acmDefinitionRepository.findById(jpa.getCompositionId())).thenReturn(Optional.of(jpa));
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var result = acDefinitionProvider.getAcDefinition(UUID.fromString(jpa.getCompositionId()));
         assertThat(result).isNotNull();
     }
@@ -230,7 +202,7 @@ class AcDefinitionProviderTest {
     @Test
     void testGetAcDefinitionNotFound() {
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var compositionId = UUID.randomUUID();
         assertThatThrownBy(() -> acDefinitionProvider.getAcDefinition(compositionId))
                 .hasMessage("Get serviceTemplate \"" + compositionId + "\" failed, serviceTemplate does not exist");
@@ -242,7 +214,7 @@ class AcDefinitionProviderTest {
         jpa.fromAuthorative(getAcDefinition(new DocToscaServiceTemplate(inputServiceTemplate)));
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
         when(acmDefinitionRepository.findById(jpa.getCompositionId())).thenReturn(Optional.of(jpa));
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var compositionId = UUID.fromString(jpa.getCompositionId());
         var result = acDefinitionProvider.findAcDefinition(compositionId);
         assertThat(result).isNotEmpty();
@@ -257,7 +229,7 @@ class AcDefinitionProviderTest {
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
         when(acmDefinitionRepository.findByStateIn(List.of(AcTypeState.PRIMING, AcTypeState.DEPRIMING)))
             .thenReturn(List.of(jpa));
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var result = acDefinitionProvider.getAllAcDefinitionsInTransition();
         assertThat(result).isNotEmpty();
     }
@@ -271,7 +243,7 @@ class AcDefinitionProviderTest {
         when(acmDefinitionRepository.findById(acmDefinition.getCompositionId().toString()))
                 .thenReturn(Optional.of(new JpaAutomationCompositionDefinition(acmDefinition)));
 
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var result = acDefinitionProvider.deleteAcDefintion(acmDefinition.getCompositionId());
 
         assertThat(result).isEqualTo(docServiceTemplate.toAuthorative());
@@ -281,7 +253,7 @@ class AcDefinitionProviderTest {
     void testDeleteServiceTemplateEmpty() {
         var compositionId = UUID.randomUUID();
         var acmDefinitionRepository = mock(AutomationCompositionDefinitionRepository.class);
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         assertThatThrownBy(() -> acDefinitionProvider.deleteAcDefintion(compositionId))
                 .hasMessage("delete of Automation Composition Definition \"" + compositionId
                         + "\" failed, Automation Composition Definition does not exist");
@@ -295,7 +267,7 @@ class AcDefinitionProviderTest {
         when(acmDefinitionRepository.findAll(Mockito.<Example<JpaAutomationCompositionDefinition>>any()))
                 .thenReturn(List.of(new JpaAutomationCompositionDefinition(acmDefinition)));
 
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var result = acDefinitionProvider.getServiceTemplateList(inputServiceTemplate.getName(),
                 inputServiceTemplate.getVersion());
 
@@ -311,7 +283,7 @@ class AcDefinitionProviderTest {
         when(acmDefinitionRepository.findAll(Mockito.<Example<JpaAutomationCompositionDefinition>>any()))
             .thenReturn(List.of(new JpaAutomationCompositionDefinition(acmDefinition)));
 
-        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository, null);
+        var acDefinitionProvider = new AcDefinitionProvider(acmDefinitionRepository);
         var result = acDefinitionProvider.getServiceTemplateList(null,
             inputServiceTemplate.getVersion());
 
index c2368fe..67a05be 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2021-2024 Nordix Foundation.
+ * Copyright (C) 2021-2025 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,11 +22,9 @@ package org.onap.policy.clamp.models.acm.persistence.provider;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyIterable;
-import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -38,8 +36,6 @@ import java.util.UUID;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementInfo;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositions;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
@@ -55,7 +51,6 @@ import org.springframework.data.domain.Example;
 class AutomationCompositionProviderTest {
 
     private static final String AC_IS_NULL = "automationComposition is marked non-null but is null";
-    private static final String ACELEMENT_IS_NULL = "element is marked non-null but is null";
     private static final String ACELEMENT_ID_IS_NULL = "elementId is marked non-null but is null";
 
     private static final Coder CODER = new StandardCoder();
@@ -199,7 +194,9 @@ class AutomationCompositionProviderTest {
         when(automationCompositionRepository.findByLockStateIn(List.of(LockState.LOCKING, LockState.UNLOCKING)))
             .thenReturn(List.of(inputAutomationCompositionsJpa.get(1)));
         var acList = automationCompositionProvider.getAcInstancesInTransition();
-        assertEquals(inputAutomationCompositions.getAutomationCompositionList(), acList);
+        assertThat(acList).hasSize(2)
+                .contains(inputAutomationCompositions.getAutomationCompositionList().get(0).getInstanceId())
+                .contains(inputAutomationCompositions.getAutomationCompositionList().get(1).getInstanceId());
     }
 
     @Test
@@ -220,27 +217,6 @@ class AutomationCompositionProviderTest {
         assertEquals(automationComposition, deletedAc);
     }
 
-    @Test
-    void testAutomationCompositionElementUpdate() {
-        var acElementRepository = mock(AutomationCompositionElementRepository.class);
-        var automationCompositionProvider = new AutomationCompositionProvider(
-            mock(AutomationCompositionRepository.class), acElementRepository);
-
-        assertThatThrownBy(() -> automationCompositionProvider.updateAutomationCompositionElement(null))
-            .hasMessageMatching(ACELEMENT_IS_NULL);
-
-        var acElement = inputAutomationCompositions.getAutomationCompositionList().get(0).getElements().values()
-            .iterator().next();
-        var jpa = new JpaAutomationCompositionElement();
-        jpa.setElementId(acElement.getId().toString());
-        jpa.setInstanceId(UUID.randomUUID().toString());
-        jpa.fromAuthorative(acElement);
-        when(acElementRepository.getReferenceById(acElement.getId().toString())).thenReturn(jpa);
-
-        automationCompositionProvider.updateAutomationCompositionElement(acElement);
-        verify(acElementRepository).save(any());
-    }
-
     @Test
     void testDeleteElementById() {
         var acElementRepository = mock(AutomationCompositionElementRepository.class);
@@ -282,26 +258,4 @@ class AutomationCompositionProviderTest {
         result = automationCompositionProvider.validateElementIds(ac);
         assertThat(result.isValid()).isTrue();
     }
-
-    @Test
-    void testUpgradeStates() {
-        var acElementRepository = mock(AutomationCompositionElementRepository.class);
-        var automationCompositionProvider = new AutomationCompositionProvider(
-            mock(AutomationCompositionRepository.class), acElementRepository);
-
-        assertDoesNotThrow(() -> automationCompositionProvider.upgradeStates(List.of()));
-        var acElement = inputAutomationCompositions.getAutomationCompositionList().get(0).getElements().values()
-            .iterator().next();
-
-        var acInfo = new AutomationCompositionInfo();
-        var acElementInfo = new AutomationCompositionElementInfo();
-        acInfo.setElements(List.of(acElementInfo));
-        acElementInfo.setAutomationCompositionElementId(acElement.getId());
-
-        when(acElementRepository.getReferenceById(acElement.getId().toString()))
-            .thenReturn(new JpaAutomationCompositionElement(acElement));
-
-        automationCompositionProvider.upgradeStates(List.of(acInfo));
-        verify(acElementRepository).saveAll(anyList());
-    }
 }
diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/MessageProviderTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/MessageProviderTest.java
new file mode 100644 (file)
index 0000000..8276d56
--- /dev/null
@@ -0,0 +1,227 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.persistence.provider;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementInfo;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.clamp.models.acm.document.concepts.DocMessage;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
+import org.onap.policy.clamp.models.acm.persistence.concepts.JpaMessage;
+import org.onap.policy.clamp.models.acm.persistence.concepts.JpaMessageJob;
+import org.onap.policy.clamp.models.acm.persistence.repository.MessageJobRepository;
+import org.onap.policy.clamp.models.acm.persistence.repository.MessageRepository;
+import org.onap.policy.clamp.models.acm.utils.CommonTestData;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+
+class MessageProviderTest {
+
+    @Test
+    void testSaveParticipantPrimeAck() {
+        var message = new ParticipantPrimeAck();
+        message.setCompositionState(AcTypeState.PRIMED);
+        message.setCompositionId(UUID.randomUUID());
+        message.setParticipantId(UUID.randomUUID());
+        message.setReplicaId(UUID.randomUUID());
+        var messageRepository = mock(MessageRepository.class);
+        var messageProvider = new MessageProvider(messageRepository, mock(MessageJobRepository.class));
+        messageProvider.save(message);
+        verify(messageRepository).save(any());
+    }
+
+    @Test
+    void testSaveAutomationCompositionDeployAck() {
+        var message = new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
+        message.setAutomationCompositionId(UUID.randomUUID());
+        message.setCompositionId(UUID.randomUUID());
+        message.setStateChangeResult(StateChangeResult.NO_ERROR);
+        message.setParticipantId(UUID.randomUUID());
+        message.setReplicaId(UUID.randomUUID());
+        var element = new AcElementDeployAck(DeployState.DEPLOYED,
+                LockState.LOCKED, null, null, Map.of(), true, "");
+        message.setAutomationCompositionResultMap(Map.of(UUID.randomUUID(), element));
+        var messageRepository = mock(MessageRepository.class);
+        var messageProvider = new MessageProvider(messageRepository, mock(MessageJobRepository.class));
+        messageProvider.save(message);
+        verify(messageRepository).save(any());
+    }
+
+    @Test
+    void testSaveParticipantStatusComposition() {
+        var message = new ParticipantStatus();
+        message.setCompositionId(UUID.randomUUID());
+        message.setParticipantId(UUID.randomUUID());
+        message.setReplicaId(UUID.randomUUID());
+        var participantDefinition = new ParticipantDefinition();
+        participantDefinition.setParticipantId(message.getParticipantId());
+        var element = CommonTestData.getAcElementDefinition(new ToscaConceptIdentifier("name", "1.0.0"));
+        element.setOutProperties(Map.of("compositionProperty", "value"));
+        participantDefinition.setAutomationCompositionElementDefinitionList(List.of(element));
+        message.setParticipantDefinitionUpdates(List.of(participantDefinition));
+        var messageRepository = mock(MessageRepository.class);
+        var messageProvider = new MessageProvider(messageRepository, mock(MessageJobRepository.class));
+        messageProvider.save(message);
+        verify(messageRepository).save(any());
+    }
+
+    @Test
+    void testSaveParticipantStatusInstance() {
+        var message = new ParticipantStatus();
+        message.setCompositionId(UUID.randomUUID());
+        message.setParticipantId(UUID.randomUUID());
+        message.setReplicaId(UUID.randomUUID());
+        var automationCompositionInfo = new AutomationCompositionInfo();
+        automationCompositionInfo.setAutomationCompositionId(UUID.randomUUID());
+        var element = new AutomationCompositionElementInfo();
+        element.setAutomationCompositionElementId(UUID.randomUUID());
+        element.setOutProperties(Map.of("instanceProperty", "value"));
+        automationCompositionInfo.setElements(List.of(element));
+        message.setAutomationCompositionInfoList(List.of(automationCompositionInfo));
+        var messageRepository = mock(MessageRepository.class);
+        var messageProvider = new MessageProvider(messageRepository, mock(MessageJobRepository.class));
+        messageProvider.save(message);
+        verify(messageRepository).save(any());
+    }
+
+    @Test
+    void testGetAllMessages() {
+        var messageRepository = mock(MessageRepository.class);
+        var instanceId = UUID.randomUUID();
+        var jpaMessage = new JpaMessage();
+        when(messageRepository.findByIdentificationIdOrderByLastMsgDesc(instanceId.toString()))
+                .thenReturn(List.of(jpaMessage));
+        var messageProvider = new MessageProvider(messageRepository, mock(MessageJobRepository.class));
+        var result = messageProvider.getAllMessages(instanceId);
+        assertThat(result).hasSize(1);
+        var doc = result.iterator().next();
+        assertEquals(jpaMessage.getMessageId(), doc.getMessageId());
+    }
+
+    @Test
+    void testFindCompositionMessages() {
+        var jpa1 = createJpaCompositionMessage();
+        var jpa2 = createJpaInstanceMessage();
+        var messageRepository = mock(MessageRepository.class);
+        when(messageRepository.findAll()).thenReturn(List.of(jpa1, jpa2));
+        var messageProvider = new MessageProvider(messageRepository, mock(MessageJobRepository.class));
+        var result = messageProvider.findCompositionMessages();
+        assertThat(result).hasSize(1);
+        var compositionId = result.iterator().next();
+        assertEquals(jpa1.getDocMessage().getCompositionId(), compositionId);
+    }
+
+    private JpaMessage createJpaCompositionMessage() {
+        var message = new DocMessage();
+        message.setCompositionId(UUID.randomUUID());
+        return new JpaMessage(message.getCompositionId().toString(), message);
+    }
+
+    private JpaMessage createJpaInstanceMessage() {
+        var message = new DocMessage();
+        message.setCompositionId(UUID.randomUUID());
+        message.setInstanceId(UUID.randomUUID());
+        return new JpaMessage(message.getInstanceId().toString(), message);
+    }
+
+    @Test
+    void testFindInstanceMessages() {
+        var jpa1 = createJpaCompositionMessage();
+        var jpa2 = createJpaInstanceMessage();
+        var messageRepository = mock(MessageRepository.class);
+        when(messageRepository.findAll()).thenReturn(List.of(jpa1, jpa2));
+        var messageProvider = new MessageProvider(messageRepository, mock(MessageJobRepository.class));
+        var result = messageProvider.findInstanceMessages();
+        assertThat(result).hasSize(1);
+        var instanceId = result.iterator().next();
+        assertEquals(jpa2.getDocMessage().getInstanceId(), instanceId);
+    }
+
+    @Test
+    void testRemoveMessage() {
+        var messageRepository = mock(MessageRepository.class);
+        var messageProvider = new MessageProvider(messageRepository, mock(MessageJobRepository.class));
+        var messageId = UUID.randomUUID();
+        messageProvider.removeMessage(messageId.toString());
+        verify(messageRepository).deleteById(messageId.toString());
+    }
+
+    @Test
+    void testRemoveOldJobs() {
+        var messageJobRepository = mock(MessageJobRepository.class);
+        var jpaJob1 = new JpaMessageJob(UUID.randomUUID().toString());
+        var jpaJob2 = new JpaMessageJob(UUID.randomUUID().toString());
+        var old = Timestamp.from(Instant.now().minusSeconds(200));
+        jpaJob2.setJobStarted(old);
+        when(messageJobRepository.findAll()).thenReturn(List.of(jpaJob1, jpaJob2));
+        var messageProvider = new MessageProvider(mock(MessageRepository.class), messageJobRepository);
+        messageProvider.removeOldJobs();
+        verify(messageJobRepository, times(0)).deleteById(jpaJob1.getJobId());
+        verify(messageJobRepository).deleteById(jpaJob2.getJobId());
+    }
+
+    @Test
+    void testCreateJob() {
+        var messageJobRepository = mock(MessageJobRepository.class);
+        var identificationId = UUID.randomUUID();
+        var jpaJob = new JpaMessageJob(identificationId.toString());
+        when(messageJobRepository.save(any())).thenReturn(jpaJob);
+        var messageProvider = new MessageProvider(mock(MessageRepository.class), messageJobRepository);
+        var opt = messageProvider.createJob(identificationId);
+        assertThat(opt).isNotEmpty();
+        assertEquals(jpaJob.getJobId(), opt.get());
+
+        when(messageJobRepository.findByIdentificationId(identificationId.toString())).thenReturn(Optional.of(jpaJob));
+        opt = messageProvider.createJob(identificationId);
+        assertThat(opt).isEmpty();
+    }
+
+    @Test
+    void testRemoveJob() {
+        var messageJobRepository = mock(MessageJobRepository.class);
+        var messageProvider = new MessageProvider(mock(MessageRepository.class), messageJobRepository);
+        var jobId = UUID.randomUUID().toString();
+        messageProvider.removeJob(jobId);
+        verify(messageJobRepository).deleteById(jobId);
+    }
+}
index 05a866e..126ffaf 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation.
+ *  Copyright (C) 2023-2025 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,9 +22,8 @@ package org.onap.policy.clamp.acm.runtime.supervision;
 
 import io.micrometer.core.annotation.Timed;
 import io.opentelemetry.context.Context;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import lombok.AllArgsConstructor;
@@ -33,7 +32,6 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcPreparePublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
-import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -44,6 +42,7 @@ import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.concepts.SubState;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 import org.slf4j.Logger;
@@ -66,8 +65,8 @@ public class SupervisionAcHandler {
     private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
     private final AcElementPropertiesPublisher acElementPropertiesPublisher;
     private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher;
-    private final ParticipantSyncPublisher participantSyncPublisher;
     private final AcPreparePublisher acPreparePublisher;
+    private final MessageProvider messageProvider;
 
     private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1));
 
@@ -270,22 +269,24 @@ public class SupervisionAcHandler {
         if (automationCompositionAckMessage.getAutomationCompositionResultMap() == null
                 || automationCompositionAckMessage.getAutomationCompositionResultMap().isEmpty()) {
             if (DeployState.DELETING.equals(automationComposition.getDeployState())) {
-                deleteAcInstance(automationComposition, automationCompositionAckMessage.getParticipantId());
+                // scenario automationComposition has never deployed
+                automationCompositionAckMessage.setAutomationCompositionResultMap(new HashMap<>());
+                for (var element : automationComposition.getElements().values()) {
+                    if (element.getParticipantId().equals(automationCompositionAckMessage.getParticipantId())) {
+                        var acElement = new AcElementDeployAck(DeployState.DELETED, LockState.NONE,
+                                null, null, Map.of(), true, "");
+                        automationCompositionAckMessage.getAutomationCompositionResultMap()
+                                .put(element.getId(), acElement);
+                    }
+                }
             } else {
                 LOGGER.warn("Empty AutomationCompositionResultMap  {} {}",
                         automationCompositionAckMessage.getAutomationCompositionId(),
                         automationCompositionAckMessage.getMessage());
+                return;
             }
-            return;
-        }
-
-        var updated = updateState(automationComposition,
-                automationCompositionAckMessage.getAutomationCompositionResultMap().entrySet(),
-                automationCompositionAckMessage.getStateChangeResult(), automationCompositionAckMessage.getStage());
-        if (updated) {
-            automationComposition = automationCompositionProvider.updateAcState(automationComposition);
-            participantSyncPublisher.sendSync(automationComposition);
         }
+        messageProvider.save(automationCompositionAckMessage);
     }
 
     private boolean validateMessage(AutomationCompositionDeployAck acAckMessage) {
@@ -301,7 +302,8 @@ public class SupervisionAcHandler {
             return false;
         }
 
-        if (acAckMessage.getStage() == null) {
+        if ((acAckMessage.getStage() == null)
+            && (acAckMessage.getAutomationCompositionResultMap() != null)) {
             for (var el : acAckMessage.getAutomationCompositionResultMap().values()) {
                 if (AcmUtils.isInTransitionalState(el.getDeployState(), el.getLockState(), SubState.NONE)) {
                     LOGGER.error("Not valid AutomationCompositionDeployAck message, states are not valid");
@@ -312,43 +314,6 @@ public class SupervisionAcHandler {
         return true;
     }
 
-    private void deleteAcInstance(AutomationComposition automationComposition, UUID participantId) {
-        // scenario when Automation Composition instance has never been deployed
-        for (var element : automationComposition.getElements().values()) {
-            if (element.getParticipantId().equals(participantId)) {
-                element.setDeployState(DeployState.DELETED);
-                automationCompositionProvider.updateAutomationCompositionElement(element);
-            }
-        }
-    }
-
-    private boolean updateState(AutomationComposition automationComposition,
-            Set<Map.Entry<UUID, AcElementDeployAck>> automationCompositionResultSet,
-            StateChangeResult stateChangeResult, Integer stage) {
-        var updated = false;
-        boolean inProgress = !StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult());
-        if (inProgress && !stateChangeResult.equals(automationComposition.getStateChangeResult())) {
-            automationComposition.setStateChangeResult(stateChangeResult);
-            updated = true;
-        }
-
-        for (var acElementAck : automationCompositionResultSet) {
-            var element = automationComposition.getElements().get(acElementAck.getKey());
-            if (element != null) {
-                element.setMessage(AcmUtils.validatedMessage(acElementAck.getValue().getMessage()));
-                if (stage == null) {
-                    element.setSubState(SubState.NONE);
-                }
-                element.setDeployState(acElementAck.getValue().getDeployState());
-                element.setLockState(acElementAck.getValue().getLockState());
-                element.setStage(stage);
-                automationCompositionProvider.updateAutomationCompositionElement(element);
-            }
-        }
-
-        return updated;
-    }
-
     /**
      * Handle Migration of an AutomationComposition instance to other ACM Definition.
      *
index f13f5da..276a26f 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2025 Nordix Foundation.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,14 +23,11 @@ package org.onap.policy.clamp.acm.runtime.supervision;
 
 import io.micrometer.core.annotation.Timed;
 import lombok.AllArgsConstructor;
-import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
-import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -45,13 +42,14 @@ public class SupervisionHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class);
 
     private final AcDefinitionProvider acDefinitionProvider;
-    private final ParticipantSyncPublisher participantSyncPublisher;
+    private final MessageProvider messageProvider;
 
     /**
      * Handle a ParticipantPrimeAck message from a participant.
      *
      * @param participantPrimeAckMessage the ParticipantPrimeAck message received from a participant
      */
+    @MessageIntercept
     @Timed(value = "listener.participant_prime_ack", description = "PARTICIPANT_PRIME_ACK messages received")
     public void handleParticipantMessage(ParticipantPrimeAck participantPrimeAckMessage) {
         if (participantPrimeAckMessage.getCompositionId() == null
@@ -82,50 +80,6 @@ public class SupervisionHandler {
                     participantPrimeAckMessage.getCompositionId(), participantPrimeAckMessage.getParticipantId());
             return;
         }
-        handleParticipantPrimeAck(participantPrimeAckMessage, acDefinition);
-    }
-
-    private void handleParticipantPrimeAck(ParticipantPrimeAck participantPrimeAckMessage,
-            AutomationCompositionDefinition acDefinition) {
-        var finalState = AcTypeState.PRIMING.equals(acDefinition.getState())
-                || AcTypeState.PRIMED.equals(acDefinition.getState()) ? AcTypeState.PRIMED : AcTypeState.COMMISSIONED;
-        var msgInErrors = StateChangeResult.FAILED.equals(participantPrimeAckMessage.getStateChangeResult());
-        boolean inProgress = !StateChangeResult.FAILED.equals(acDefinition.getStateChangeResult());
-        boolean toUpdate = false;
-        if (inProgress && msgInErrors) {
-            acDefinition.setStateChangeResult(StateChangeResult.FAILED);
-            toUpdate = true;
-        }
-
-        boolean completed = true;
-        for (var element : acDefinition.getElementStateMap().values()) {
-            handlePrimeAckElement(participantPrimeAckMessage, element);
-            if (!finalState.equals(element.getState())) {
-                completed = false;
-            }
-        }
-
-        if (inProgress && !msgInErrors && completed) {
-            toUpdate = true;
-            acDefinition.setState(finalState);
-            if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) {
-                acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR);
-            }
-        }
-        if (toUpdate) {
-            acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
-                acDefinition.getStateChangeResult());
-            if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) {
-                participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId());
-            }
-        }
-    }
-
-    private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) {
-        if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
-            element.setMessage(AcmUtils.validatedMessage(participantPrimeAckMessage.getMessage()));
-            element.setState(participantPrimeAckMessage.getCompositionState());
-            acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId());
-        }
+        messageProvider.save(participantPrimeAckMessage);
     }
 }
index 5de6a4c..d0b7b62 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation.
+ *  Copyright (C) 2023-2025 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,16 +27,13 @@ import java.util.Map;
 import java.util.UUID;
 import lombok.AllArgsConstructor;
 import org.apache.commons.collections4.MapUtils;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
@@ -45,6 +42,7 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRe
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 import org.slf4j.Logger;
@@ -65,7 +63,7 @@ public class SupervisionParticipantHandler {
     private final AutomationCompositionProvider automationCompositionProvider;
     private final AcDefinitionProvider acDefinitionProvider;
     private final ParticipantSyncPublisher participantSyncPublisher;
-    private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+    private final MessageProvider messageProvider;
 
     /**
      * Handle a ParticipantRegister message from a participant.
@@ -104,18 +102,18 @@ public class SupervisionParticipantHandler {
      *
      * @param participantStatusMsg the ParticipantStatus message received from a participant
      */
+    @MessageIntercept
     @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
     public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
         saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
                 participantStatusMsg.getParticipantSupportedElementType(), false);
 
         if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
-            updateAcOutProperties(participantStatusMsg.getAutomationCompositionInfoList());
+            messageProvider.save(participantStatusMsg);
         }
         if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
                 && participantStatusMsg.getCompositionId() != null) {
-            updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(),
-                participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates());
+            messageProvider.save(participantStatusMsg);
         }
     }
 
@@ -151,34 +149,6 @@ public class SupervisionParticipantHandler {
 
     }
 
-    private void updateAcOutProperties(List<AutomationCompositionInfo> automationCompositionInfoList) {
-        automationCompositionProvider.upgradeStates(automationCompositionInfoList);
-        for (var acInfo : automationCompositionInfoList) {
-            var ac = automationCompositionProvider.getAutomationComposition(acInfo.getAutomationCompositionId());
-            participantSyncPublisher.sendSync(ac);
-        }
-    }
-
-    private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) {
-        var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId);
-        if (acDefinitionOpt.isEmpty()) {
-            LOGGER.error("Ac Definition with id {} not found", compositionId);
-            return;
-        }
-        var acDefinition = acDefinitionOpt.get();
-        for (var acElements : list) {
-            for (var element : acElements.getAutomationCompositionElementDefinitionList()) {
-                var state = acDefinition.getElementStateMap().get(element.getAcElementDefinitionId().getName());
-                if (state != null) {
-                    state.setOutProperties(element.getOutProperties());
-                }
-            }
-        }
-        acDefinitionProvider.updateAcDefinition(acDefinition,
-                acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
-        participantSyncPublisher.sendSync(acDefinition, replicaId);
-    }
-
     private void checkOnline(ParticipantReplica replica) {
         if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
             replica.setParticipantState(ParticipantState.ON_LINE);
index 3b17565..718bcce 100644 (file)
@@ -38,6 +38,7 @@ import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.concepts.SubState;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public class SupervisionScanner {
     private final StageScanner stageScanner;
     private final SimpleScanner simpleScanner;
     private final PhaseScanner phaseScanner;
+    private final MessageProvider messageProvider;
 
     /**
      * Run Scanning.
@@ -65,30 +67,54 @@ public class SupervisionScanner {
     public void run() {
         LOGGER.debug("Scanning automation compositions in the database . . .");
 
-        var acDefinitions = acDefinitionProvider.getAllAcDefinitionsInTransition();
-        for (var acDefinition : acDefinitions) {
-            scanAcDefinition(acDefinition.getCompositionId());
+        messageProvider.removeOldJobs();
+
+        var compositionIds = acDefinitionProvider.getAllAcDefinitionsInTransition();
+        compositionIds.addAll(messageProvider.findCompositionMessages());
+        for (var compositionId : compositionIds) {
+            scanAcDefinition(compositionId);
         }
 
-        var instances = automationCompositionProvider.getAcInstancesInTransition();
+        var instanceIds = automationCompositionProvider.getAcInstancesInTransition();
+        instanceIds.addAll(messageProvider.findInstanceMessages());
         Map<UUID, AutomationCompositionDefinition> acDefinitionMap = new HashMap<>();
-        for (var instance : instances) {
-            scanAutomationComposition(instance.getInstanceId(), acDefinitionMap);
+        for (var instanceId : instanceIds) {
+            scanAutomationComposition(instanceId, acDefinitionMap);
         }
         LOGGER.debug("Automation composition scan complete . . .");
     }
 
     private void scanAcDefinition(UUID compositionId) {
+        var optJobId = messageProvider.createJob(compositionId);
+        if (optJobId.isEmpty()) {
+            return;
+        }
+        var messages = messageProvider.getAllMessages(compositionId);
         var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId);
         var updateSync = new UpdateSync();
+        for (var message : messages) {
+            acDefinitionOpt.ifPresent(
+                    acDefinition -> updateSync.or(acDefinitionScanner.scanMessage(acDefinition, message)));
+            messageProvider.removeMessage(message.getMessageId());
+        }
         acDefinitionOpt.ifPresent(acDefinition ->
                 acDefinitionScanner.scanAutomationCompositionDefinition(acDefinition, updateSync));
+        messageProvider.removeJob(optJobId.get());
     }
 
     private void scanAutomationComposition(UUID instanceId,
             Map<UUID, AutomationCompositionDefinition> acDefinitionMap) {
+        var optJobId = messageProvider.createJob(instanceId);
+        if (optJobId.isEmpty()) {
+            return;
+        }
+        var messages = messageProvider.getAllMessages(instanceId);
         var automationCompositionOpt = automationCompositionProvider.findAutomationComposition(instanceId);
         var updateSync = new UpdateSync();
+        for (var message : messages) {
+            automationCompositionOpt.ifPresent(ac -> updateSync.or(simpleScanner.scanMessage(ac, message)));
+            messageProvider.removeMessage(message.getMessageId());
+        }
         if (automationCompositionOpt.isPresent()) {
             var automationComposition = automationCompositionOpt.get();
             var compositionId = automationComposition.getCompositionTargetId() != null
@@ -96,6 +122,8 @@ public class SupervisionScanner {
             var acDefinition = acDefinitionMap.computeIfAbsent(compositionId, acDefinitionProvider::getAcDefinition);
             scanAutomationComposition(automationComposition, acDefinition.getServiceTemplate(), updateSync);
         }
+
+        messageProvider.removeJob(optJobId.get());
     }
 
     private void scanAutomationComposition(final AutomationComposition automationComposition,
@@ -107,6 +135,7 @@ public class SupervisionScanner {
                 || StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) {
             LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId());
             simpleScanner.saveAndSync(automationComposition, updateSync);
+            return;
         }
 
         if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
index e35d5f0..c102412 100644 (file)
@@ -80,10 +80,10 @@ public class SimpleScanner extends AbstractScanner {
         if (element != null) {
             element.setDeployState(message.getDeployState());
             element.setLockState(message.getLockState());
-            if (element.getStage() == null) {
+            if (message.getStage() == null) {
                 element.setSubState(SubState.NONE);
             }
-            element.setStage(element.getStage());
+            element.setStage(message.getStage());
             element.setMessage(message.getMessage());
             result.setUpdated(true);
         }
index c5345f2..448a96b 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation.
+ *  Copyright (C) 2023-2025 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -40,30 +40,30 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcPreparePublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
-import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
-import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 
 class SupervisionAcHandlerTest {
     private static final String AC_INSTANTIATION_CREATE_JSON = "src/test/resources/rest/acm/AutomationComposition.json";
     private static final UUID IDENTIFIER = UUID.randomUUID();
 
     @Test
-    void testAutomationCompositionDeployAckNull() {
+    void testAutomationCompositionDeployAckValidation() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+        var messageProvider = mock(MessageProvider.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), messageProvider);
 
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud");
@@ -89,7 +89,13 @@ class SupervisionAcHandlerTest {
                         automationComposition, DeployState.DEPLOYING, LockState.UNLOCKED);
         handler.handleAutomationCompositionStateChangeAckMessage(automationCompositionAckMessage);
 
-        verify(automationCompositionProvider, times(0)).updateAutomationCompositionElement(any());
+        verify(messageProvider, times(0)).save(any(AutomationCompositionDeployAck.class));
+
+        when(automationCompositionProvider.findAutomationComposition(IDENTIFIER))
+                .thenReturn(Optional.of(automationComposition));
+        automationCompositionAckMessage.setAutomationCompositionResultMap(null);
+        handler.handleAutomationCompositionStateChangeAckMessage(automationCompositionAckMessage);
+        verify(messageProvider, times(0)).save(any(AutomationCompositionDeployAck.class));
     }
 
     @Test
@@ -100,13 +106,12 @@ class SupervisionAcHandlerTest {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         when(automationCompositionProvider.findAutomationComposition(IDENTIFIER))
                 .thenReturn(Optional.of(automationComposition));
-        when(automationCompositionProvider.updateAcState(any(AutomationComposition.class)))
-                .thenReturn(automationComposition);
+        var messageProvider = mock(MessageProvider.class);
 
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), messageProvider);
 
         var automationCompositionAckMessage =
                 getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK,
@@ -114,8 +119,7 @@ class SupervisionAcHandlerTest {
         automationCompositionAckMessage.setStage(1);
         handler.handleAutomationCompositionStateChangeAckMessage(automationCompositionAckMessage);
 
-        verify(automationCompositionProvider, times(3))
-                .updateAutomationCompositionElement(any(AutomationCompositionElement.class));
+        verify(messageProvider).save(any(AutomationCompositionDeployAck.class));
     }
 
     @Test
@@ -126,21 +130,19 @@ class SupervisionAcHandlerTest {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         when(automationCompositionProvider.findAutomationComposition(IDENTIFIER))
                 .thenReturn(Optional.of(automationComposition));
-        when(automationCompositionProvider.updateAcState(any(AutomationComposition.class)))
-                .thenReturn(automationComposition);
+        var messageProvider = mock(MessageProvider.class);
 
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), messageProvider);
 
         var automationCompositionAckMessage =
                 getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK,
                         automationComposition, DeployState.DEPLOYED, LockState.UNLOCKED);
         handler.handleAutomationCompositionStateChangeAckMessage(automationCompositionAckMessage);
 
-        verify(automationCompositionProvider, times(3))
-            .updateAutomationCompositionElement(any(AutomationCompositionElement.class));
+        verify(messageProvider).save(any(AutomationCompositionDeployAck.class));
     }
 
     private AutomationCompositionDeployAck getAutomationCompositionDeployAck(ParticipantMessageType messageType,
@@ -165,8 +167,7 @@ class SupervisionAcHandlerTest {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         when(automationCompositionProvider.findAutomationComposition(IDENTIFIER))
                 .thenReturn(Optional.of(automationComposition));
-        when(automationCompositionProvider.updateAcState(any(AutomationComposition.class)))
-                .thenReturn(automationComposition);
+        var messageProvider = mock(MessageProvider.class);
 
         var automationCompositionAckMessage =
                 getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_DEPLOY_ACK,
@@ -175,12 +176,12 @@ class SupervisionAcHandlerTest {
 
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), messageProvider);
 
         handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
 
-        verify(automationCompositionProvider).updateAcState(any(AutomationComposition.class));
+        verify(messageProvider).save(any(AutomationCompositionDeployAck.class));
     }
 
     @Test
@@ -210,15 +211,16 @@ class SupervisionAcHandlerTest {
         automationCompositionAckMessage.setAutomationCompositionId(IDENTIFIER);
 
         var automationCompositionStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
+        var messageProvider = mock(MessageProvider.class);
 
         var handler = new SupervisionAcHandler(automationCompositionProvider,
-                mock(AutomationCompositionDeployPublisher.class), automationCompositionStateChangePublisher, null,
-                null, mock(ParticipantSyncPublisher.class), null);
+                mock(AutomationCompositionDeployPublisher.class), automationCompositionStateChangePublisher,
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), messageProvider);
 
         handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
 
-        verify(automationCompositionProvider)
-            .updateAutomationCompositionElement(any(AutomationCompositionElement.class));
+        verify(messageProvider).save(any(AutomationCompositionDeployAck.class));
     }
 
     @Test
@@ -227,8 +229,8 @@ class SupervisionAcHandlerTest {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 automationCompositionDeployPublisher, mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
 
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
@@ -246,8 +248,8 @@ class SupervisionAcHandlerTest {
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -264,8 +266,8 @@ class SupervisionAcHandlerTest {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
 
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
@@ -285,8 +287,8 @@ class SupervisionAcHandlerTest {
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -303,8 +305,8 @@ class SupervisionAcHandlerTest {
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -323,8 +325,8 @@ class SupervisionAcHandlerTest {
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -341,8 +343,8 @@ class SupervisionAcHandlerTest {
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -370,16 +372,16 @@ class SupervisionAcHandlerTest {
                 .setParticipantId(automationComposition.getElements().values().iterator().next().getParticipantId());
         automationCompositionAckMessage.setAutomationCompositionId(IDENTIFIER);
         automationCompositionAckMessage.setStateChangeResult(StateChangeResult.NO_ERROR);
+        var messageProvider = mock(MessageProvider.class);
 
         var handler = new SupervisionAcHandler(automationCompositionProvider,
                 mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), messageProvider);
 
         handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
 
-        verify(automationCompositionProvider)
-            .updateAutomationCompositionElement(any(AutomationCompositionElement.class));
+        verify(messageProvider).save(any(AutomationCompositionDeployAck.class));
     }
 
     @Test
@@ -387,8 +389,9 @@ class SupervisionAcHandlerTest {
         var acElementPropertiesPublisher = mock(AcElementPropertiesPublisher.class);
         var handler = new SupervisionAcHandler(mock(AutomationCompositionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class),
-                mock(AutomationCompositionStateChangePublisher.class), acElementPropertiesPublisher, null,
-                mock(ParticipantSyncPublisher.class), null);
+                mock(AutomationCompositionStateChangePublisher.class), acElementPropertiesPublisher,
+                mock(AutomationCompositionMigrationPublisher.class),
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Lock");
         handler.update(automationComposition);
@@ -400,8 +403,9 @@ class SupervisionAcHandlerTest {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acCompositionMigrationPublisher = mock(AutomationCompositionMigrationPublisher.class);
         var handler = new SupervisionAcHandler(automationCompositionProvider,
-                null, null, null,
-                acCompositionMigrationPublisher, mock(ParticipantSyncPublisher.class), null);
+                mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
+                mock(AcElementPropertiesPublisher.class), acCompositionMigrationPublisher,
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Migrate");
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
@@ -413,8 +417,10 @@ class SupervisionAcHandlerTest {
     void testMigratePrecheck() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acCompositionMigrationPublisher = mock(AutomationCompositionMigrationPublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider, null, null,
-                null, acCompositionMigrationPublisher, null, null);
+        var handler = new SupervisionAcHandler(automationCompositionProvider,
+                mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
+                mock(AcElementPropertiesPublisher.class), acCompositionMigrationPublisher,
+                mock(AcPreparePublisher.class), mock(MessageProvider.class));
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Migrate");
         handler.migratePrecheck(automationComposition);
@@ -425,8 +431,10 @@ class SupervisionAcHandlerTest {
     void testPrepare() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acPreparePublisher = mock(AcPreparePublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider, null, null,
-                null, null, null, acPreparePublisher);
+        var handler = new SupervisionAcHandler(automationCompositionProvider,
+                mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                acPreparePublisher, mock(MessageProvider.class));
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Migrate");
         handler.prepare(automationComposition);
@@ -437,8 +445,10 @@ class SupervisionAcHandlerTest {
     void testReview() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acPreparePublisher = mock(AcPreparePublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider, null, null,
-                null, null, null, acPreparePublisher);
+        var handler = new SupervisionAcHandler(automationCompositionProvider,
+                mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
+                mock(AcElementPropertiesPublisher.class), mock(AutomationCompositionMigrationPublisher.class),
+                acPreparePublisher, mock(MessageProvider.class));
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Migrate");
         handler.review(automationComposition);
index 09a79d8..6e2e66f 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2025 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -31,19 +31,19 @@ import java.util.Optional;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
-import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 
 class SupervisionHandlerTest {
 
     @Test
     void testParticipantPrimeAckNull() {
         var acDefinitionProvider = mock(AcDefinitionProvider.class);
-        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(MessageProvider.class));
 
         var participantPrimeAckMessage = new ParticipantPrimeAck();
         participantPrimeAckMessage.setParticipantId(CommonTestData.getParticipantId());
@@ -66,8 +66,11 @@ class SupervisionHandlerTest {
         participantPrimeAckMessage.setCompositionState(AcTypeState.DEPRIMING);
         handler.handleParticipantMessage(participantPrimeAckMessage);
 
+        participantPrimeAckMessage.setCompositionState(AcTypeState.COMMISSIONED);
+        participantPrimeAckMessage.setStateChangeResult(StateChangeResult.TIMEOUT);
+        handler.handleParticipantMessage(participantPrimeAckMessage);
+
         verify(acDefinitionProvider, times(0)).findAcDefinition(any());
-        verify(acDefinitionProvider, times(0)).updateAcDefinitionElement(any(), any());
     }
 
     @Test
@@ -78,10 +81,9 @@ class SupervisionHandlerTest {
         participantPrimeAckMessage.setCompositionId(UUID.randomUUID());
         participantPrimeAckMessage.setCompositionState(AcTypeState.PRIMED);
         var acDefinitionProvider = mock(AcDefinitionProvider.class);
-        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(MessageProvider.class));
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(participantPrimeAckMessage.getCompositionId());
-        verify(acDefinitionProvider, times(0)).updateAcDefinitionElement(any(), any());
     }
 
     @Test
@@ -98,7 +100,7 @@ class SupervisionHandlerTest {
         var acDefinitionProvider = mock(AcDefinitionProvider.class);
         when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
                 .thenReturn(Optional.of(acDefinition));
-        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(MessageProvider.class));
 
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(any());
@@ -123,14 +125,10 @@ class SupervisionHandlerTest {
         when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
                 .thenReturn(Optional.of(acDefinition));
 
-        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(MessageProvider.class));
 
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(any());
-        verify(acDefinitionProvider, times(acDefinition.getElementStateMap().size()))
-            .updateAcDefinitionElement(any(), any());
-        verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), AcTypeState.PRIMED,
-            StateChangeResult.NO_ERROR);
     }
 
     @Test
@@ -150,12 +148,9 @@ class SupervisionHandlerTest {
         when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
                 .thenReturn(Optional.of(acDefinition));
 
-        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(MessageProvider.class));
 
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(any());
-        verify(acDefinitionProvider).updateAcDefinitionElement(any(), any());
-        verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), AcTypeState.PRIMING,
-            StateChangeResult.FAILED);
     }
 }
index e3387c6..315fb8c 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation.
+ *  Copyright (C) 2023-2025 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -35,7 +35,6 @@ import java.util.Set;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
@@ -54,6 +53,7 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRe
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
@@ -72,7 +72,7 @@ class SupervisionParticipantHandlerTest {
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         participantDeregisterAckPublisher, mock(AutomationCompositionProvider.class),
                         mock(AcDefinitionProvider.class), mock(ParticipantSyncPublisher.class),
-                        mock(AcRuntimeParameterGroup.class));
+                        mock(MessageProvider.class));
 
         handler.handleParticipantMessage(participantDeregisterMessage);
         verify(participantDeregisterAckPublisher).send(participantDeregisterMessage.getMessageId());
@@ -100,7 +100,7 @@ class SupervisionParticipantHandlerTest {
         var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher,
                 mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class),
                 mock(AcDefinitionProvider.class), mock(ParticipantSyncPublisher.class),
-                mock(AcRuntimeParameterGroup.class));
+                mock(MessageProvider.class));
         handler.handleParticipantMessage(participantRegisterMessage);
 
         verify(participantProvider).saveParticipant(any());
@@ -157,7 +157,7 @@ class SupervisionParticipantHandlerTest {
         var participantSyncPublisher = mock(ParticipantSyncPublisher.class);
         var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher,
                 mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider,
-                participantSyncPublisher, CommonTestData.getTestParamaterGroup());
+                participantSyncPublisher, mock(MessageProvider.class));
         handler.handleParticipantMessage(participantRegisterMessage);
 
         verify(participantRegisterAckPublisher)
@@ -189,20 +189,19 @@ class SupervisionParticipantHandlerTest {
                 .thenReturn(Optional.of(replica));
 
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+        var messageProvider = mock(MessageProvider.class);
         var handler =
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider,
-                        acDefinitionProvider, mock(ParticipantSyncPublisher.class),
-                        mock(AcRuntimeParameterGroup.class));
+                        acDefinitionProvider, mock(ParticipantSyncPublisher.class), messageProvider);
         handler.handleParticipantMessage(participantStatusMessage);
 
-        verify(automationCompositionProvider).upgradeStates(any());
+        verify(messageProvider).save(any(ParticipantStatus.class));
     }
 
     @Test
     void testAcDefinitionOutProperties() {
         var participantStatusMessage = createParticipantStatus();
-        participantStatusMessage.setAutomationCompositionInfoList(List.of(new AutomationCompositionInfo()));
         var participantDefinition = new ParticipantDefinition();
         participantStatusMessage.setParticipantDefinitionUpdates(List.of(participantDefinition));
         participantDefinition.setParticipantId(participantStatusMessage.getParticipantId());
@@ -219,6 +218,7 @@ class SupervisionParticipantHandlerTest {
         acDefinition.setElementStateMap(
                 Map.of(acElementDefinition.getAcElementDefinitionId().getName(), nodeTemplateState));
         var acDefinitionProvider = mock(AcDefinitionProvider.class);
+        var messageProvider = mock(MessageProvider.class);
         when(acDefinitionProvider.findAcDefinition(compositionId)).thenReturn(Optional.of(acDefinition));
         when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
 
@@ -226,11 +226,33 @@ class SupervisionParticipantHandlerTest {
         var handler =
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class),
-                        acDefinitionProvider, mock(ParticipantSyncPublisher.class),
-                        CommonTestData.getTestParamaterGroup());
+                        acDefinitionProvider, mock(ParticipantSyncPublisher.class), messageProvider);
         handler.handleParticipantMessage(participantStatusMessage);
+        verify(messageProvider).save(participantStatusMessage);
+    }
+
+    @Test
+    void testAcOutProperties() {
+        var participantStatusMessage = createParticipantStatus();
+        participantStatusMessage.setAutomationCompositionInfoList(List.of(new AutomationCompositionInfo()));
+
+        var compositionId = UUID.randomUUID();
+        participantStatusMessage.setCompositionId(compositionId);
+        var acDefinition = new AutomationCompositionDefinition();
+        acDefinition.setState(AcTypeState.COMMISSIONED);
+        acDefinition.setCompositionId(compositionId);
+        var acDefinitionProvider = mock(AcDefinitionProvider.class);
+        var messageProvider = mock(MessageProvider.class);
+        when(acDefinitionProvider.findAcDefinition(compositionId)).thenReturn(Optional.of(acDefinition));
+        when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
 
-        verify(acDefinitionProvider).updateAcDefinition(acDefinition, CommonTestData.TOSCA_COMP_NAME);
+        var participantProvider = mock(ParticipantProvider.class);
+        var handler =
+                new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
+                        mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class),
+                        acDefinitionProvider, mock(ParticipantSyncPublisher.class), messageProvider);
+        handler.handleParticipantMessage(participantStatusMessage);
+        verify(messageProvider).save(participantStatusMessage);
     }
 
     @Test
@@ -244,7 +266,7 @@ class SupervisionParticipantHandlerTest {
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class),
                         mock(AcDefinitionProvider.class), mock(ParticipantSyncPublisher.class),
-                        mock(AcRuntimeParameterGroup.class));
+                        mock(MessageProvider.class));
         handler.handleParticipantMessage(participantStatusMessage);
 
         verify(participantProvider).saveParticipant(any());
@@ -263,18 +285,18 @@ class SupervisionParticipantHandlerTest {
 
         var participantProvider = mock(ParticipantProvider.class);
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+        var messageProvider = mock(MessageProvider.class);
         var handler =
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider,
-                        acDefinitionProvider, mock(ParticipantSyncPublisher.class),
-                        mock(AcRuntimeParameterGroup.class));
+                        acDefinitionProvider, mock(ParticipantSyncPublisher.class), messageProvider);
         var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
         when(participantProvider.findParticipant(CommonTestData.getParticipantId()))
                 .thenReturn(Optional.of(participant));
         handler.handleParticipantMessage(participantStatusMessage);
 
         verify(participantProvider).saveParticipant(any());
-        verify(automationCompositionProvider).upgradeStates(any());
+        verify(messageProvider).save(any(ParticipantStatus.class));
     }
 
     private ParticipantStatus createParticipantStatus() {
index a555d82..ab1564b 100644 (file)
@@ -28,10 +28,12 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.onap.policy.clamp.acm.runtime.util.CommonTestData.TOSCA_SERVICE_TEMPLATE_YAML;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
@@ -48,23 +50,27 @@ import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.concepts.SubState;
+import org.onap.policy.clamp.models.acm.document.concepts.DocMessage;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.MessageProvider;
 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 
 class SupervisionScannerTest {
 
     private static final String AC_JSON = "src/test/resources/rest/acm/AutomationCompositionSmoke.json";
 
-    private static final UUID compositionId = UUID.randomUUID();
+    private static final UUID COMPOSITION_ID = UUID.randomUUID();
+    private static final UUID INSTANCE_ID = UUID.randomUUID();
+    private static final String JOB_ID = "JOB_ID";
 
-    private AutomationCompositionDefinition createAutomationCompositionDefinition(AcTypeState acTypeState,
-                                                                                  StateChangeResult stateChangeResult) {
+    private AutomationCompositionDefinition createAutomationCompositionDefinition(
+            AcTypeState acTypeState, StateChangeResult stateChangeResult) {
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = new AutomationCompositionDefinition();
         acDefinition.setState(acTypeState);
         acDefinition.setStateChangeResult(stateChangeResult);
-        acDefinition.setCompositionId(compositionId);
+        acDefinition.setCompositionId(COMPOSITION_ID);
         acDefinition.setLastMsg(TimestampHelper.now());
         acDefinition.setServiceTemplate(Objects.requireNonNull(serviceTemplate));
         var node = new NodeTemplateState();
@@ -78,13 +84,15 @@ class SupervisionScannerTest {
         var acDefinitionProvider = mock(AcDefinitionProvider.class);
         var acTypeState = acDefinition.getState();
         if (AcTypeState.PRIMING.equals(acTypeState) || AcTypeState.DEPRIMING.equals(acTypeState)) {
-            when(acDefinitionProvider.getAllAcDefinitionsInTransition()).thenReturn(List.of(acDefinition));
+            Set<UUID> set = new HashSet<>();
+            set.add(acDefinition.getCompositionId());
+            when(acDefinitionProvider.getAllAcDefinitionsInTransition()).thenReturn(set);
             when(acDefinitionProvider.getAcDefinition(acDefinition.getCompositionId()))
                     .thenReturn(Objects.requireNonNull(acDefinition));
             when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
                     .thenReturn(Optional.of(Objects.requireNonNull(acDefinition)));
         }
-        when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
+        when(acDefinitionProvider.getAcDefinition(COMPOSITION_ID)).thenReturn(acDefinition);
         return acDefinitionProvider;
     }
 
@@ -101,36 +109,95 @@ class SupervisionScannerTest {
     void testAcDefinition() {
         var acDefinitionProvider = createAcDefinitionProvider(AcTypeState.PRIMING, StateChangeResult.NO_ERROR);
         var acDefinitionScanner = mock(AcDefinitionScanner.class);
+        when(acDefinitionScanner.scanMessage(any(), any())).thenReturn(new UpdateSync());
+        var messageProvider = mock(MessageProvider.class);
+        when(messageProvider.createJob(COMPOSITION_ID)).thenReturn(Optional.of(JOB_ID));
+        when(messageProvider.findCompositionMessages()).thenReturn(Set.of(COMPOSITION_ID));
+        var message = new DocMessage();
+        when(messageProvider.getAllMessages(COMPOSITION_ID)).thenReturn(List.of(message));
         var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
-                acDefinitionScanner, mock(StageScanner.class), mock(SimpleScanner.class), mock(PhaseScanner.class));
+                acDefinitionScanner, mock(StageScanner.class), mock(SimpleScanner.class), mock(PhaseScanner.class),
+                messageProvider);
         supervisionScanner.run();
         verify(acDefinitionScanner).scanAutomationCompositionDefinition(any(), any());
+        verify(messageProvider).removeMessage(message.getMessageId());
+        verify(messageProvider).removeJob(JOB_ID);
     }
 
     @Test
-    void testAcNotInTransitionOrFailed() {
-        var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+    void testAcDefinitionJobExist() {
+        var acDefinitionProvider = createAcDefinitionProvider(AcTypeState.PRIMING, StateChangeResult.NO_ERROR);
+        var acDefinitionScanner = mock(AcDefinitionScanner.class);
+        var messageProvider = mock(MessageProvider.class);
+        when(messageProvider.createJob(COMPOSITION_ID)).thenReturn(Optional.empty());
+        when(messageProvider.findCompositionMessages()).thenReturn(Set.of());
+        var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
+                acDefinitionScanner, mock(StageScanner.class), mock(SimpleScanner.class), mock(PhaseScanner.class),
+                messageProvider);
+        supervisionScanner.run();
+        verify(acDefinitionScanner, times(0)).scanAutomationCompositionDefinition(any(), any());
+    }
 
+    @Test
+    void testAcNotInTransitionOrFailed() {
         var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud");
-        automationComposition.setCompositionId(Objects.requireNonNull(compositionId));
-        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(List.of(automationComposition));
+        automationComposition.setInstanceId(Objects.requireNonNull(INSTANCE_ID));
+        automationComposition.setCompositionId(Objects.requireNonNull(COMPOSITION_ID));
+        Set<UUID> set = new HashSet<>();
+        set.add(automationComposition.getInstanceId());
+        var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(set);
 
         var stageScanner = mock(StageScanner.class);
         var simpleScanner = mock(SimpleScanner.class);
         var phaseScanner = mock(PhaseScanner.class);
+        var messageProvider = mock(MessageProvider.class);
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
-                mock(AcDefinitionScanner.class), stageScanner, simpleScanner, phaseScanner);
+                mock(AcDefinitionScanner.class), stageScanner, simpleScanner, phaseScanner, messageProvider);
 
         // not in transition
         supervisionScanner.run();
-        verify(stageScanner, times(0)).scanStage(any(), any(), any());
-        verify(simpleScanner, times(0)).simpleScan(any(), any());
-        verify(phaseScanner, times(0)).scanWithPhase(any(), any(), any());
+        verifyNoInteraction(stageScanner, simpleScanner, phaseScanner);
 
+        // failed
         automationComposition.setDeployState(DeployState.DEPLOYING);
         automationComposition.setStateChangeResult(StateChangeResult.FAILED);
         supervisionScanner.run();
-        // failed
+        verifyNoInteraction(stageScanner, simpleScanner, phaseScanner);
+
+        // job already exist
+        automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
+        when(messageProvider.createJob(automationComposition.getInstanceId())).thenReturn(Optional.empty());
+        supervisionScanner.run();
+        verifyNoInteraction(stageScanner, simpleScanner, phaseScanner);
+    }
+
+    @Test
+    void testAcRemoved() {
+        var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud");
+        automationComposition.setInstanceId(Objects.requireNonNull(INSTANCE_ID));
+        automationComposition.setCompositionId(Objects.requireNonNull(COMPOSITION_ID));
+        Set<UUID> set = new HashSet<>();
+        set.add(automationComposition.getInstanceId());
+        var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(set);
+
+        var stageScanner = mock(StageScanner.class);
+        var simpleScanner = mock(SimpleScanner.class);
+        var phaseScanner = mock(PhaseScanner.class);
+        var messageProvider = mock(MessageProvider.class);
+        when(messageProvider.createJob(automationComposition.getInstanceId())).thenReturn(Optional.of(JOB_ID));
+        var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
+                mock(AcDefinitionScanner.class), stageScanner, simpleScanner, phaseScanner, messageProvider);
+
+        // automationComposition not present in DB
+        supervisionScanner.run();
+        verifyNoInteraction(stageScanner, simpleScanner, phaseScanner);
+        verify(messageProvider).removeJob(JOB_ID);
+    }
+
+    private void verifyNoInteraction(
+            StageScanner stageScanner, SimpleScanner simpleScanner, PhaseScanner phaseScanner) {
         verify(stageScanner, times(0)).scanStage(any(), any(), any());
         verify(simpleScanner, times(0)).simpleScan(any(), any());
         verify(phaseScanner, times(0)).scanWithPhase(any(), any(), any());
@@ -139,31 +206,44 @@ class SupervisionScannerTest {
     @Test
     void testScanner() {
         var automationComposition = new AutomationComposition();
-        automationComposition.setCompositionId(compositionId);
+        automationComposition.setInstanceId(INSTANCE_ID);
+        automationComposition.setCompositionId(COMPOSITION_ID);
         automationComposition.setDeployState(DeployState.DEPLOYING);
+        Set<UUID> set = new HashSet<>();
+        set.add(automationComposition.getInstanceId());
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
-        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(List.of(automationComposition));
+        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(set);
         when(automationCompositionProvider.findAutomationComposition(automationComposition.getInstanceId()))
                 .thenReturn(Optional.of(automationComposition));
 
         var stageScanner = mock(StageScanner.class);
         var simpleScanner = mock(SimpleScanner.class);
+        when(simpleScanner.scanMessage(any(), any())).thenReturn(new UpdateSync());
         var phaseScanner = mock(PhaseScanner.class);
 
+        var messageProvider = mock(MessageProvider.class);
+        when(messageProvider.createJob(automationComposition.getInstanceId())).thenReturn(Optional.of(JOB_ID));
+        var message = new  DocMessage();
+        when(messageProvider.getAllMessages(INSTANCE_ID)).thenReturn(List.of(message));
+        when(messageProvider.findInstanceMessages()).thenReturn(Set.of(INSTANCE_ID));
+
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
-                mock(AcDefinitionScanner.class), stageScanner, simpleScanner, phaseScanner);
+                mock(AcDefinitionScanner.class), stageScanner, simpleScanner, phaseScanner, messageProvider);
 
         supervisionScanner.run();
         verify(stageScanner, times(0)).scanStage(any(), any(), any());
         verify(simpleScanner, times(0)).simpleScan(any(), any());
         verify(phaseScanner).scanWithPhase(any(), any(), any());
+        verify(messageProvider).removeMessage(message.getMessageId());
+        verify(messageProvider).removeJob(JOB_ID);
     }
 
     @Test
     void testSendAutomationCompositionMigrate() {
         var automationComposition = InstantiationUtils.getAutomationCompositionFromResource(AC_JSON, "Crud");
         automationComposition.setDeployState(DeployState.MIGRATING);
-        automationComposition.setCompositionId(compositionId);
+        automationComposition.setInstanceId(INSTANCE_ID);
+        automationComposition.setCompositionId(COMPOSITION_ID);
         var compositionTargetId = UUID.randomUUID();
         automationComposition.setCompositionTargetId(compositionTargetId);
         automationComposition.setLockState(LockState.LOCKED);
@@ -175,7 +255,9 @@ class SupervisionScannerTest {
         }
 
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
-        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(List.of(automationComposition));
+        Set<UUID> set = new HashSet<>();
+        set.add(automationComposition.getInstanceId());
+        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(set);
         when(automationCompositionProvider.findAutomationComposition(automationComposition.getInstanceId()))
                 .thenReturn(Optional.of(automationComposition));
 
@@ -185,12 +267,17 @@ class SupervisionScannerTest {
         when(acDefinitionProvider.getAcDefinition(compositionTargetId)).thenReturn(definitionTarget);
         var stageScanner = mock(StageScanner.class);
 
+        var messageProvider = mock(MessageProvider.class);
+        when(messageProvider.createJob(automationComposition.getInstanceId())).thenReturn(Optional.of(JOB_ID));
+
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, acDefinitionProvider,
-                mock(AcDefinitionScanner.class), stageScanner, mock(SimpleScanner.class), mock(PhaseScanner.class));
+                mock(AcDefinitionScanner.class), stageScanner, mock(SimpleScanner.class),
+                mock(PhaseScanner.class), messageProvider);
 
         supervisionScanner.run();
         verify(stageScanner).scanStage(automationComposition, definitionTarget.getServiceTemplate(),
                 new UpdateSync());
+        verify(messageProvider).removeJob(JOB_ID);
     }
 
     @Test
@@ -200,35 +287,86 @@ class SupervisionScannerTest {
         automationComposition.setDeployState(DeployState.DEPLOYED);
         automationComposition.setSubState(SubState.MIGRATION_PRECHECKING);
         automationComposition.setLockState(LockState.NONE);
-        automationComposition.setCompositionId(compositionId);
+        automationComposition.setInstanceId(INSTANCE_ID);
+        automationComposition.setCompositionId(COMPOSITION_ID);
         automationComposition.setLastMsg(TimestampHelper.now());
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
-        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(List.of(automationComposition));
+        Set<UUID> set = new HashSet<>();
+        set.add(automationComposition.getInstanceId());
+        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(set);
         when(automationCompositionProvider.findAutomationComposition(automationComposition.getInstanceId()))
                 .thenReturn(Optional.of(automationComposition));
 
+        var messageProvider = mock(MessageProvider.class);
+        when(messageProvider.createJob(automationComposition.getInstanceId())).thenReturn(Optional.of(JOB_ID));
+
         var simpleScanner = mock(SimpleScanner.class);
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
-                mock(AcDefinitionScanner.class), mock(StageScanner.class), simpleScanner, mock(PhaseScanner.class));
+                mock(AcDefinitionScanner.class), mock(StageScanner.class), simpleScanner, mock(PhaseScanner.class),
+                messageProvider);
         supervisionScanner.run();
         verify(simpleScanner).simpleScan(automationComposition, new UpdateSync());
+        verify(messageProvider).removeJob(JOB_ID);
 
         clearInvocations(simpleScanner);
+        clearInvocations(messageProvider);
         automationComposition.setDeployState(DeployState.UNDEPLOYED);
         automationComposition.setSubState(SubState.PREPARING);
         supervisionScanner.run();
         verify(simpleScanner).simpleScan(automationComposition, new UpdateSync());
+        verify(messageProvider).removeJob(JOB_ID);
 
         clearInvocations(simpleScanner);
+        clearInvocations(messageProvider);
         automationComposition.setDeployState(DeployState.DEPLOYED);
         automationComposition.setSubState(SubState.REVIEWING);
         supervisionScanner.run();
         verify(simpleScanner).simpleScan(automationComposition, new UpdateSync());
+        verify(messageProvider).removeJob(JOB_ID);
 
         clearInvocations(simpleScanner);
+        clearInvocations(messageProvider);
         automationComposition.setDeployState(DeployState.UPDATING);
         automationComposition.setSubState(SubState.NONE);
         supervisionScanner.run();
         verify(simpleScanner).simpleScan(automationComposition, new UpdateSync());
+        verify(messageProvider).removeJob(JOB_ID);
+    }
+
+    @Test
+    void testSaveAcByMessageUpdate() {
+        var automationComposition = new AutomationComposition();
+        automationComposition.setInstanceId(INSTANCE_ID);
+        automationComposition.setCompositionId(COMPOSITION_ID);
+        automationComposition.setDeployState(DeployState.DEPLOYED);
+        automationComposition.setLockState(LockState.LOCKED);
+        automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
+        var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+        when(automationCompositionProvider.getAcInstancesInTransition()).thenReturn(new HashSet<>());
+        when(automationCompositionProvider.findAutomationComposition(automationComposition.getInstanceId()))
+                .thenReturn(Optional.of(automationComposition));
+
+        var simpleScanner = mock(SimpleScanner.class);
+        var updateSync = new UpdateSync();
+        updateSync.setUpdated(true);
+        when(simpleScanner.scanMessage(any(), any())).thenReturn(updateSync);
+
+        var messageProvider = mock(MessageProvider.class);
+        when(messageProvider.createJob(automationComposition.getInstanceId())).thenReturn(Optional.of(JOB_ID));
+        var message = new  DocMessage();
+        when(messageProvider.getAllMessages(INSTANCE_ID)).thenReturn(List.of(message));
+        when(messageProvider.findInstanceMessages()).thenReturn(Set.of(INSTANCE_ID));
+
+        var phaseScanner = mock(PhaseScanner.class);
+        var stageScanner = mock(StageScanner.class);
+        var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
+                mock(AcDefinitionScanner.class), stageScanner, simpleScanner, phaseScanner, messageProvider);
+
+        supervisionScanner.run();
+        verifyNoInteraction(stageScanner, simpleScanner, phaseScanner);
+        verify(simpleScanner).saveAndSync(any(), any());
+        verify(messageProvider).removeMessage(message.getMessageId());
+        verify(messageProvider).removeJob(JOB_ID);
     }
+
 }