Add automatic sync up support in ACM-intermediary 77/141577/5
authorFrancescoFioraEst <francesco.fiora@est.tech>
Mon, 28 Jul 2025 08:53:38 +0000 (09:53 +0100)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Mon, 28 Jul 2025 08:53:38 +0000 (09:53 +0100)
Issue-ID: POLICY-5419
Change-Id: I03f24c9ef5f8849beb088b9d2ccbb5f18d3f91b2
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
24 files changed:
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantMessagePublisher.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcLockHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcSubStateHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutor.java [new file with mode: 0644]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/cache/AcDefinition.java [new file with mode: 0644]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/cache/AutomationCompositionMsg.java [new file with mode: 0644]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/cache/CacheProvider.java [moved from participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java with 76% similarity]
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/ParticipantIntermediaryApiImplTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcLockHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcSubStateHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutorTest.java [new file with mode: 0644]
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/cache/CacheProviderTest.java [moved from participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProviderTest.java with 77% similarity]

index 233b559..87a16b0 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -26,7 +26,7 @@ import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
 import org.onap.policy.clamp.acm.participant.intermediary.handler.AutomationCompositionOutHandler;
-import org.onap.policy.clamp.acm.participant.intermediary.handler.CacheProvider;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
@@ -106,7 +106,7 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp
     @Override
     public Map<UUID, Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition>> getAcElementsDefinitions() {
         return PfUtils.mapMap(cacheProvider.getAcElementsDefinitions(),
-                map -> PfUtils.mapMap(map, AutomationCompositionElementDefinition::new));
+            acDefinition -> PfUtils.mapMap(acDefinition.getElements(), AutomationCompositionElementDefinition::new));
     }
 
     @Override
@@ -116,17 +116,17 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp
         if (acElementDefinitions == null) {
             return Map.of();
         }
-        return PfUtils.mapMap(acElementDefinitions, AutomationCompositionElementDefinition::new);
+        return PfUtils.mapMap(acElementDefinitions.getElements(), AutomationCompositionElementDefinition::new);
     }
 
     @Override
     public AutomationCompositionElementDefinition getAcElementDefinition(UUID compositionId,
             ToscaConceptIdentifier elementId) {
-        var acElementDefinitions = cacheProvider.getAcElementsDefinitions().get(compositionId);
-        if (acElementDefinitions == null) {
+        var acDefinition = cacheProvider.getAcElementsDefinitions().get(compositionId);
+        if (acDefinition == null) {
             return null;
         }
-        var acElementDefinition = acElementDefinitions.get(elementId);
+        var acElementDefinition = acDefinition.getElements().get(elementId);
         return acElementDefinition != null ? new AutomationCompositionElementDefinition(acElementDefinition) : null;
     }
 }
index ecf4759..c64317d 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -31,6 +31,7 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCom
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
 import org.onap.policy.common.message.bus.event.TopicSink;
 import org.onap.policy.common.message.bus.event.client.TopicSinkClient;
@@ -65,6 +66,18 @@ public class ParticipantMessagePublisher implements Publisher {
         active = true;
     }
 
+    /**
+     * Method to send Participant Request Sync message to clamp.
+     *
+     * @param participantReqSync the Participant Request Sync
+     */
+    @Timed(value = "publisher.participant_req_sync", description = "PARTICIPANT_REQ_SYNC_MSG messages published")
+    public void sendParticipantReqSync(final ParticipantReqSync participantReqSync) {
+        validate();
+        topicSinkClient.send(participantReqSync);
+        LOGGER.info("Sent Participant Request Sync to CLAMP - {}", participantReqSync);
+    }
+
     /**
      * Method to send Participant Status message to clamp on demand.
      *
index d624677..d1ded25 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation.
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
 
 package org.onap.policy.clamp.acm.participant.intermediary.handler;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
@@ -56,7 +56,8 @@ public class AcDefinitionHandler {
             // prime
             var list = collectAcElementDefinition(participantPrimeMsg.getParticipantDefinitionUpdates());
             if (!list.isEmpty()) {
-                cacheProvider.addElementDefinition(participantPrimeMsg.getCompositionId(), list);
+                cacheProvider.addElementDefinition(participantPrimeMsg.getCompositionId(), list,
+                        participantPrimeMsg.getRevisionIdComposition());
                 prime(participantPrimeMsg.getMessageId(), participantPrimeMsg.getCompositionId(), list);
             }
         } else {
@@ -67,13 +68,12 @@ public class AcDefinitionHandler {
 
     private List<AutomationCompositionElementDefinition> collectAcElementDefinition(
             List<ParticipantDefinition> participantDefinitionList) {
-        List<AutomationCompositionElementDefinition> list = new ArrayList<>();
-        for (var participantDefinition : participantDefinitionList) {
-            if (participantDefinition.getParticipantId().equals(cacheProvider.getParticipantId())) {
-                list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList());
-            }
-        }
-        return list;
+        return participantDefinitionList.stream()
+                .filter(participantDefinition -> participantDefinition.getParticipantId()
+                        .equals(cacheProvider.getParticipantId()))
+                .map(ParticipantDefinition::getAutomationCompositionElementDefinitionList)
+                .flatMap(List::stream)
+                .toList();
     }
 
     private void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
@@ -87,8 +87,8 @@ public class AcDefinitionHandler {
     }
 
     private void deprime(UUID messageId, UUID compositionId) {
-        var acElementsDefinitions = cacheProvider.getAcElementsDefinitions().get(compositionId);
-        if (acElementsDefinitions == null) {
+        var acDefinition = cacheProvider.getAcElementsDefinitions().get(compositionId);
+        if (acDefinition == null) {
             // this participant does not handle this composition
             var participantPrimeAck = new ParticipantPrimeAck();
             participantPrimeAck.setCompositionId(compositionId);
@@ -103,7 +103,7 @@ public class AcDefinitionHandler {
             publisher.sendParticipantPrimeAck(participantPrimeAck);
             return;
         }
-        var list = new ArrayList<>(acElementsDefinitions.values());
+        var list = acDefinition.getElements().values();
         var inPropertiesMap = list.stream().collect(Collectors.toMap(
                 AutomationCompositionElementDefinition::getAcElementDefinitionId,
                 el -> el.getAutomationCompositionElementToscaNodeTemplate().getProperties()));
@@ -132,13 +132,14 @@ public class AcDefinitionHandler {
 
             var list = collectAcElementDefinition(participantSyncMsg.getParticipantDefinitionUpdates());
             if (!list.isEmpty()) {
-                cacheProvider.addElementDefinition(participantSyncMsg.getCompositionId(), list);
+                cacheProvider.addElementDefinition(participantSyncMsg.getCompositionId(), list,
+                        participantSyncMsg.getRevisionIdComposition());
             }
         }
 
         for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
-            cacheProvider
-                    .initializeAutomationComposition(participantSyncMsg.getCompositionId(), automationcomposition);
+            cacheProvider.initializeAutomationComposition(
+                    participantSyncMsg.getCompositionId(), automationcomposition);
             if (StateChangeResult.TIMEOUT.equals(automationcomposition.getStateChangeResult())) {
                 for (var element : automationcomposition.getAcElementList()) {
                     listener.cleanExecution(element.getId(), participantSyncMsg.getMessageId());
index 109bd3f..079b900 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation.
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,6 +24,7 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
 import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
@@ -80,7 +81,7 @@ public class AcLockHandler {
                 element.setLockState(LockState.LOCKING);
                 element.setSubState(SubState.NONE);
                 var compositionElement = cacheProvider.createCompositionElementDto(
-                        automationComposition.getCompositionId(), element, compositionInProperties);
+                        automationComposition.getCompositionId(), element);
                 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
                         element.getProperties(), element.getOutProperties());
                 listener.lock(messageId, compositionElement, instanceElement);
@@ -99,7 +100,7 @@ public class AcLockHandler {
                 element.setLockState(LockState.UNLOCKING);
                 element.setSubState(SubState.NONE);
                 var compositionElement = cacheProvider.createCompositionElementDto(
-                        automationComposition.getCompositionId(), element, compositionInProperties);
+                        automationComposition.getCompositionId(), element);
                 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
                         element.getProperties(), element.getOutProperties());
                 listener.unlock(messageId, compositionElement, instanceElement);
index 5b33221..f00c87b 100644 (file)
@@ -27,6 +27,7 @@ import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ElementState;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
@@ -154,7 +155,7 @@ public class AcSubStateHandler {
                 if (cacheProvider.getParticipantId().equals(participantPrepare.getParticipantId())) {
                     cacheProvider.initializeAutomationComposition(acPrepareMsg.getCompositionId(),
                         acPrepareMsg.getAutomationCompositionId(), participantPrepare, DeployState.UNDEPLOYED,
-                        SubState.PREPARING);
+                        SubState.PREPARING, acPrepareMsg.getRevisionIdInstance());
                     callParticipanPrepare(acPrepareMsg.getMessageId(), participantPrepare.getAcElementList(),
                         acPrepareMsg.getStage(), acPrepareMsg.getAutomationCompositionId());
                 }
@@ -175,7 +176,7 @@ public class AcSubStateHandler {
             var compositionInProperties = cacheProvider
                 .getCommonProperties(automationComposition.getCompositionId(), element.getDefinition());
             var compositionElement = cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(),
-                element, compositionInProperties);
+                element);
             var stageSet = ParticipantUtils.findStageSetPrepare(compositionInProperties);
             if (stageSet.contains(stageMsg)) {
                 var instanceElement =
@@ -188,11 +189,9 @@ public class AcSubStateHandler {
 
     private void callParticipanReview(UUID messageId, AutomationComposition automationComposition) {
         for (var element : automationComposition.getElements().values()) {
-            var compositionInProperties = cacheProvider
-                .getCommonProperties(automationComposition.getCompositionId(), element.getDefinition());
             element.setSubState(SubState.REVIEWING);
             var compositionElement = cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(),
-                element, compositionInProperties);
+                element);
             var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
                 element.getProperties(), element.getOutProperties());
             listener.review(messageId, compositionElement, instanceElement);
index 01c4428..3572945 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElement
 import org.onap.policy.clamp.acm.participant.intermediary.api.ElementState;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
@@ -146,15 +147,16 @@ public class AutomationCompositionHandler {
             if (cacheProvider.getParticipantId().equals(participantDeploy.getParticipantId())) {
                 if (deployMsg.isFirstStartPhase()) {
                     cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(),
-                            deployMsg.getAutomationCompositionId(), participantDeploy);
+                            deployMsg.getAutomationCompositionId(), participantDeploy,
+                            deployMsg.getRevisionIdInstance());
                 }
-                callParticipanDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
+                callParticipantDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
                         deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId());
             }
         }
     }
 
-    private void callParticipanDeploy(UUID messageId, List<AcElementDeploy> acElementDeployList, Integer startPhaseMsg,
+    private void callParticipantDeploy(UUID messageId, List<AcElementDeploy> acElementDeployList, Integer startPhaseMsg,
             UUID instanceId) {
         var automationComposition = cacheProvider.getAutomationComposition(instanceId);
         automationComposition.setDeployState(DeployState.DEPLOYING);
@@ -165,8 +167,7 @@ public class AutomationCompositionHandler {
             int startPhase = ParticipantUtils.findStartPhase(compositionInProperties);
             if (startPhaseMsg.equals(startPhase)) {
                 var compositionElement =
-                        cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element,
-                                compositionInProperties);
+                        cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
                 var instanceElement =
                         new InstanceElementDto(instanceId, elementDeploy.getId(), elementDeploy.getProperties(),
                                 element.getOutProperties());
@@ -257,8 +258,7 @@ public class AutomationCompositionHandler {
             if (startPhaseMsg.equals(startPhase)) {
                 element.setDeployState(DeployState.UNDEPLOYING);
                 var compositionElement =
-                        cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element,
-                                compositionInProperties);
+                        cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
                 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
                         element.getProperties(), element.getOutProperties());
                 listener.undeploy(messageId, compositionElement, instanceElement);
@@ -277,8 +277,7 @@ public class AutomationCompositionHandler {
                 element.setDeployState(DeployState.DELETING);
                 element.setSubState(SubState.NONE);
                 var compositionElement =
-                        cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element,
-                                compositionInProperties);
+                        cacheProvider.createCompositionElementDto(automationComposition.getCompositionId(), element);
                 var instanceElement = new InstanceElementDto(automationComposition.getInstanceId(), element.getId(),
                         element.getProperties(), element.getOutProperties());
                 listener.delete(messageId, compositionElement, instanceElement);
index 9fc6b1d..c95bf4c 100644 (file)
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -156,9 +158,7 @@ public class AutomationCompositionOutHandler {
         }
 
         if (!SubState.NONE.equals(element.getSubState())) {
-            if (!StateChangeResult.NO_ERROR.equals(stateChangeResult)) {
-                handleSubState(automationComposition, element, stateChangeResult);
-            }
+            handleSubState(automationComposition, element, stateChangeResult);
         } else if (deployState != null) {
             handleDeployState(automationComposition, element, deployState, stateChangeResult);
         }
@@ -361,12 +361,12 @@ public class AutomationCompositionOutHandler {
         var statusMsg = createParticipantStatus();
         statusMsg.setCompositionId(compositionId);
         var acElementDefsMap = cacheProvider.getAcElementsDefinitions();
-        var acElementsDefinitions = acElementDefsMap.get(compositionId);
-        if (acElementsDefinitions == null) {
+        var acDefinition = acElementDefsMap.get(compositionId);
+        if (acDefinition == null) {
             LOGGER.error("Cannot send Composition outProperties, id {} is null", compositionId);
             return;
         }
-        var acElementDefinition = getAutomationCompositionElementDefinition(acElementsDefinitions, elementId);
+        var acElementDefinition = getAutomationCompositionElementDefinition(acDefinition, elementId);
         if (acElementDefinition == null) {
             LOGGER.error("Cannot send Composition outProperties, elementId {} not present", elementId);
             return;
@@ -380,16 +380,16 @@ public class AutomationCompositionOutHandler {
     }
 
     private AutomationCompositionElementDefinition getAutomationCompositionElementDefinition(
-            Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> acElementsDefinition,
+            AcDefinition acElementsDefinition,
             ToscaConceptIdentifier elementId) {
 
         if (elementId == null) {
-            if (acElementsDefinition.size() == 1) {
-                return acElementsDefinition.values().iterator().next();
+            if (acElementsDefinition.getElements().size() == 1) {
+                return acElementsDefinition.getElements().values().iterator().next();
             }
             return null;
         }
-        return acElementsDefinition.get(elementId);
+        return acElementsDefinition.getElements().get(elementId);
     }
 
     private ParticipantStatus createParticipantStatus() {
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutor.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutor.java
new file mode 100644 (file)
index 0000000..acd02a4
--- /dev/null
@@ -0,0 +1,109 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import io.opentelemetry.context.Context;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.RequiredArgsConstructor;
+import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class MsgExecutor {
+
+    private final ExecutorService executor = Context.taskWrapping(Executors.newSingleThreadExecutor());
+
+    private final CacheProvider cacheProvider;
+    private final ParticipantMessagePublisher publisher;
+
+    /**
+     * Execute the message if all data are present or put on Hold if something is missing.
+     *
+     * @param message the message
+     */
+    public void execute(AutomationCompositionMsg<?> message) {
+        if (validExecution(message)) {
+            message.execute();
+        } else {
+            cacheProvider.getMessagesOnHold().put(message.getKey(), message);
+            var participantReqSync = new ParticipantReqSync();
+            participantReqSync.setParticipantId(cacheProvider.getParticipantId());
+            participantReqSync.setReplicaId(cacheProvider.getReplicaId());
+            participantReqSync.setCompositionId(message.getCompositionId());
+            participantReqSync.setAutomationCompositionId(message.getInstanceId());
+            participantReqSync.setCompositionTargetId(message.getCompositionTargetId());
+            publisher.sendParticipantReqSync(participantReqSync);
+        }
+    }
+
+    /**
+     * Check if messages on hold can be executed.
+     */
+    public void check() {
+        executor.submit(this::checkAndExecute);
+    }
+
+    private void checkAndExecute() {
+        var executable = cacheProvider.getMessagesOnHold().values().stream()
+                .filter(this::validExecution).toList();
+        executable.forEach(AutomationCompositionMsg::execute);
+        executable.forEach(msg -> cacheProvider.getMessagesOnHold().remove(msg.getKey()));
+    }
+
+    private boolean validExecution(AutomationCompositionMsg<?> message) {
+        var result = true;
+        if (message.getCompositionId() != null) {
+            var valid = cacheProvider.isCompositionDefinitionUpdated(message.getCompositionId(),
+                    message.getRevisionIdComposition());
+            if (valid) {
+                message.setCompositionId(null);
+                message.setRevisionIdComposition(null);
+            } else {
+                result = false;
+            }
+        }
+        if (message.getCompositionTargetId() != null) {
+            var valid = cacheProvider.isCompositionDefinitionUpdated(message.getCompositionTargetId(),
+                    message.getRevisionIdCompositionTarget());
+            if (valid) {
+                message.setCompositionTargetId(null);
+                message.setRevisionIdCompositionTarget(null);
+            } else {
+                result = false;
+            }
+        }
+        if (message.getInstanceId() != null) {
+            var valid = cacheProvider.isInstanceUpdated(message.getInstanceId(), message.getRevisionIdInstance());
+            if (valid) {
+                message.setInstanceId(null);
+                message.setRevisionIdInstance(null);
+            } else {
+                result = false;
+            }
+        }
+        return result;
+    }
+}
index 6615ae1..73a3291 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
@@ -25,6 +25,8 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
 import io.micrometer.core.annotation.Timed;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionMigration;
@@ -60,6 +62,7 @@ public class ParticipantHandler {
     private final AcDefinitionHandler acDefinitionHandler;
     private final ParticipantMessagePublisher publisher;
     private final CacheProvider cacheProvider;
+    private final MsgExecutor msgExecutor;
 
     /**
      * Method which handles a participant health check event from clamp.
@@ -80,7 +83,10 @@ public class ParticipantHandler {
             value = "listener.automation_composition_update",
             description = "AUTOMATION_COMPOSITION_UPDATE messages received")
     public void handleAutomationCompositionDeploy(AutomationCompositionDeploy updateMsg) {
-        automationCompositionHandler.handleAutomationCompositionDeploy(updateMsg);
+        var acMsg = new AutomationCompositionMsg<>(
+                automationCompositionHandler::handleAutomationCompositionDeploy, updateMsg);
+        setCompositionUpdate(updateMsg, acMsg);
+        msgExecutor.execute(acMsg);
     }
 
     /**
@@ -92,11 +98,16 @@ public class ParticipantHandler {
             value = "listener.automation_composition_state_change",
             description = "AUTOMATION_COMPOSITION_STATE_CHANGE messages received")
     public void handleAutomationCompositionStateChange(AutomationCompositionStateChange stateChangeMsg) {
-        if (DeployOrder.NONE.equals(stateChangeMsg.getDeployOrderedState())) {
-            acLockHandler.handleAutomationCompositionStateChange(stateChangeMsg);
-        } else {
-            automationCompositionHandler.handleAutomationCompositionStateChange(stateChangeMsg);
+        var acMsg = DeployOrder.NONE.equals(stateChangeMsg.getDeployOrderedState())
+                ? new AutomationCompositionMsg<>(
+                acLockHandler::handleAutomationCompositionStateChange, stateChangeMsg)
+                : new AutomationCompositionMsg<>(
+                automationCompositionHandler::handleAutomationCompositionStateChange, stateChangeMsg);
+        setCompositionUpdate(stateChangeMsg, acMsg);
+        if (!DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) {
+            setInstanceUpdate(stateChangeMsg, acMsg);
         }
+        msgExecutor.execute(acMsg);
     }
 
     /**
@@ -108,11 +119,15 @@ public class ParticipantHandler {
             value = "listener.automation_composition_migration",
             description = "AUTOMATION_COMPOSITION_MIGRATION messages received")
     public void handleAutomationCompositionMigration(AutomationCompositionMigration migrationMsg) {
-        if (Boolean.TRUE.equals(migrationMsg.getPrecheck())) {
-            acSubStateHandler.handleAcMigrationPrecheck(migrationMsg);
-        } else {
-            automationCompositionHandler.handleAutomationCompositionMigration(migrationMsg);
-        }
+        var acMsg = Boolean.TRUE.equals(migrationMsg.getPrecheck())
+                ? new AutomationCompositionMsg<>(acSubStateHandler::handleAcMigrationPrecheck, migrationMsg)
+                : new AutomationCompositionMsg<>(
+                        automationCompositionHandler::handleAutomationCompositionMigration, migrationMsg);
+        setCompositionUpdate(migrationMsg, acMsg);
+        setInstanceUpdate(migrationMsg, acMsg);
+        acMsg.setCompositionTargetId(migrationMsg.getCompositionTargetId());
+        acMsg.setRevisionIdCompositionTarget(migrationMsg.getRevisionIdCompositionTarget());
+        msgExecutor.execute(acMsg);
     }
 
     /**
@@ -122,12 +137,37 @@ public class ParticipantHandler {
      */
     @Timed(value = "listener.properties_update", description = "PROPERTIES_UPDATE message received")
     public void handleAcPropertyUpdate(PropertiesUpdate propertyUpdateMsg) {
-        automationCompositionHandler.handleAcPropertyUpdate(propertyUpdateMsg);
+        var acMsg = new AutomationCompositionMsg<>(
+                automationCompositionHandler::handleAcPropertyUpdate, propertyUpdateMsg);
+        setCompositionUpdate(propertyUpdateMsg, acMsg);
+        setInstanceUpdate(propertyUpdateMsg, acMsg);
+        msgExecutor.execute(acMsg);
     }
 
+    /**
+     * Handle a automation composition Prepare/Review message.
+     *
+     * @param acPrepareMsg the AutomationComposition Prepare/Review message
+     */
     @Timed(value = "listener.prepare", description = "AUTOMATION_COMPOSITION_PREPARE message received")
     public void handleAutomationCompositionPrepare(AutomationCompositionPrepare acPrepareMsg) {
-        acSubStateHandler.handleAcPrepare(acPrepareMsg);
+        var acMsg = new AutomationCompositionMsg<>(
+                acSubStateHandler::handleAcPrepare, acPrepareMsg);
+        setCompositionUpdate(acPrepareMsg, acMsg);
+        if (!acPrepareMsg.isPreDeploy()) {
+            setInstanceUpdate(acPrepareMsg, acMsg);
+        }
+        msgExecutor.execute(acMsg);
+    }
+
+    private void setCompositionUpdate(ParticipantMessage participantMsg, AutomationCompositionMsg<?> acMsg) {
+        acMsg.setCompositionId(participantMsg.getCompositionId());
+        acMsg.setRevisionIdComposition(participantMsg.getRevisionIdComposition());
+    }
+
+    private void setInstanceUpdate(ParticipantMessage participantMsg, AutomationCompositionMsg<?> acMsg) {
+        acMsg.setInstanceId(participantMsg.getAutomationCompositionId());
+        acMsg.setRevisionIdInstance(participantMsg.getRevisionIdInstance());
     }
 
     /**
@@ -220,6 +260,7 @@ public class ParticipantHandler {
         }
         LOGGER.debug("ParticipantSync message received for participantId {}", participantSyncMsg.getParticipantId());
         acDefinitionHandler.handleParticipantSync(participantSyncMsg);
+        msgExecutor.check();
     }
 
     /**
index 097f94a..45b2488 100644 (file)
@@ -34,6 +34,7 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/cache/AcDefinition.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/cache/AcDefinition.java
new file mode 100644 (file)
index 0000000..b6591c8
--- /dev/null
@@ -0,0 +1,35 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import lombok.Data;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+
+@Data
+public class AcDefinition {
+    private UUID compositionId;
+    private UUID revisionId;
+    private Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> elements = new HashMap<>();
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/cache/AutomationCompositionMsg.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/cache/AutomationCompositionMsg.java
new file mode 100644 (file)
index 0000000..837b9db
--- /dev/null
@@ -0,0 +1,56 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler.cache;
+
+import java.util.UUID;
+import java.util.function.Consumer;
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Setter;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
+
+@Data
+public class AutomationCompositionMsg<T extends ParticipantMessage> {
+
+    private UUID key = UUID.randomUUID();
+
+    @Setter(AccessLevel.NONE)
+    private final Consumer<T> consumer;
+
+    @Setter(AccessLevel.NONE)
+    private final T message;
+
+    private UUID instanceId;
+    private UUID revisionIdInstance;
+    private UUID compositionId;
+    private UUID revisionIdComposition;
+    private UUID compositionTargetId;
+    private UUID revisionIdCompositionTarget;
+
+    public AutomationCompositionMsg(Consumer<T> consumer, T message) {
+        this.consumer = consumer;
+        this.message = message;
+    }
+
+    public void execute() {
+        consumer.accept(message);
+    }
+}
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation.
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.policy.clamp.acm.participant.intermediary.handler;
+package org.onap.policy.clamp.acm.participant.intermediary.handler.cache;
 
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -67,12 +67,14 @@ public class CacheProvider {
     private final Map<UUID, AutomationComposition> automationCompositions = new ConcurrentHashMap<>();
 
     @Getter
-    private final Map<UUID, Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition>> acElementsDefinitions =
-            new ConcurrentHashMap<>();
+    private final Map<UUID, AcDefinition> acElementsDefinitions = new ConcurrentHashMap<>();
 
     @Getter
     private final Map<UUID, UUID> msgIdentification = new ConcurrentHashMap<>();
 
+    @Getter
+    private final Map<UUID, AutomationCompositionMsg<?>> messagesOnHold = new HashMap<>();
+
     /**
      * Constructor.
      *
@@ -112,13 +114,23 @@ public class CacheProvider {
      *
      * @param compositionId the composition Id
      * @param list the list of AutomationCompositionElementDefinition to add
+     * @param revisionId the last Update
      */
-    public void addElementDefinition(@NonNull UUID compositionId, List<AutomationCompositionElementDefinition> list) {
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+    public void addElementDefinition(@NonNull UUID compositionId, List<AutomationCompositionElementDefinition> list,
+            UUID revisionId) {
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(compositionId);
+        acDefinition.setRevisionId(revisionId);
         for (var acElementDefinition : list) {
-            map.put(acElementDefinition.getAcElementDefinitionId(), acElementDefinition);
+            if (acElementDefinition.getAutomationCompositionElementToscaNodeTemplate() == null) {
+                acElementDefinition.setAutomationCompositionElementToscaNodeTemplate(new ToscaNodeTemplate());
+            }
+            if (acElementDefinition.getAutomationCompositionElementToscaNodeTemplate().getProperties() == null) {
+                acElementDefinition.getAutomationCompositionElementToscaNodeTemplate().setProperties(new HashMap<>());
+            }
+            acDefinition.getElements().put(acElementDefinition.getAcElementDefinitionId(), acElementDefinition);
         }
-        acElementsDefinitions.put(compositionId, map);
+        acElementsDefinitions.put(compositionId, acDefinition);
     }
 
     public void removeElementDefinition(@NonNull UUID compositionId) {
@@ -134,10 +146,8 @@ public class CacheProvider {
      */
     public Map<String, Object> getCommonProperties(@NonNull UUID instanceId, @NonNull UUID acElementId) {
         var automationComposition = automationCompositions.get(instanceId);
-        var map = acElementsDefinitions.get(automationComposition.getCompositionId());
         var element = automationComposition.getElements().get(acElementId);
-        return getAcElementDefinition(map, element.getDefinition())
-                .getAutomationCompositionElementToscaNodeTemplate().getProperties();
+        return getCommonProperties(automationComposition.getCompositionId(), element.getDefinition());
     }
 
     /**
@@ -149,20 +159,8 @@ public class CacheProvider {
      */
     public Map<String, Object> getCommonProperties(@NonNull UUID compositionId,
         @NonNull ToscaConceptIdentifier definition) {
-        return getAcElementDefinition(acElementsDefinitions.get(compositionId), definition)
-                .getAutomationCompositionElementToscaNodeTemplate().getProperties();
-    }
-
-    private AutomationCompositionElementDefinition getAcElementDefinition(
-            Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map,
-            ToscaConceptIdentifier definition) {
-        var nodeTemplate = map.get(definition);
-        if (nodeTemplate == null) {
-            nodeTemplate = new AutomationCompositionElementDefinition();
-            nodeTemplate.setAutomationCompositionElementToscaNodeTemplate(new ToscaNodeTemplate());
-            nodeTemplate.getAutomationCompositionElementToscaNodeTemplate().setProperties(new HashMap<>());
-        }
-        return nodeTemplate;
+        var map = acElementsDefinitions.get(compositionId).getElements().get(definition);
+        return map != null ? map.getAutomationCompositionElementToscaNodeTemplate().getProperties() : new HashMap<>();
     }
 
     /**
@@ -171,11 +169,12 @@ public class CacheProvider {
      * @param compositionId the composition Id
      * @param instanceId the Automation Composition Id
      * @param participantDeploy the ParticipantDeploy
+     * @param revisionId the identification of the last update
      */
     public void initializeAutomationComposition(@NonNull UUID compositionId, @NonNull UUID instanceId,
-            ParticipantDeploy participantDeploy) {
+            ParticipantDeploy participantDeploy, UUID revisionId) {
         initializeAutomationComposition(compositionId, instanceId, participantDeploy,
-            DeployState.DEPLOYING, SubState.NONE);
+            DeployState.DEPLOYING, SubState.NONE, revisionId);
     }
 
     /**
@@ -186,9 +185,10 @@ public class CacheProvider {
      * @param participantDeploy the ParticipantDeploy
      * @param deployState the DeployState
      * @param subState the SubState
+     * @param revisionId the identification of the last update
      */
     public void initializeAutomationComposition(@NonNull UUID compositionId, @NonNull UUID instanceId,
-            ParticipantDeploy participantDeploy, DeployState deployState, SubState subState) {
+            ParticipantDeploy participantDeploy, DeployState deployState, SubState subState, UUID revisionId) {
         var acLast = automationCompositions.get(instanceId);
         Map<UUID, AutomationCompositionElement> acElementMap = new LinkedHashMap<>();
         for (var element : participantDeploy.getAcElementList()) {
@@ -210,6 +210,7 @@ public class CacheProvider {
         automationComposition.setElements(acElementMap);
         automationComposition.setDeployState(deployState);
         automationComposition.setSubState(subState);
+        automationComposition.setRevisionId(revisionId);
         automationCompositions.put(instanceId, automationComposition);
     }
 
@@ -247,6 +248,7 @@ public class CacheProvider {
         automationComposition.setInstanceId(participantRestartAc.getAutomationCompositionId());
         automationComposition.setElements(acElementMap);
         automationComposition.setStateChangeResult(participantRestartAc.getStateChangeResult());
+        automationComposition.setRevisionId(participantRestartAc.getRevisionId());
         automationCompositions.put(automationComposition.getInstanceId(), automationComposition);
     }
 
@@ -271,15 +273,17 @@ public class CacheProvider {
      *
      * @param compositionId the composition Id
      * @param element AutomationComposition Element
-     * @param compositionInProperties composition definition InProperties
      * @return the CompositionElementDto
      */
-    public CompositionElementDto createCompositionElementDto(UUID compositionId, AutomationCompositionElement element,
-            Map<String, Object> compositionInProperties) {
-        var compositionOutProperties = getAcElementDefinition(acElementsDefinitions
-                .get(compositionId), element.getDefinition()).getOutProperties();
-        return new CompositionElementDto(compositionId,
-                element.getDefinition(), compositionInProperties, compositionOutProperties);
+    public CompositionElementDto createCompositionElementDto(UUID compositionId, AutomationCompositionElement element) {
+        var acDefinition = acElementsDefinitions.get(compositionId);
+        var acDefinitionElement = acDefinition.getElements().get(element.getDefinition());
+
+        return (acDefinitionElement != null) ? new CompositionElementDto(compositionId, element.getDefinition(),
+                acDefinitionElement.getAutomationCompositionElementToscaNodeTemplate().getProperties(),
+                acDefinitionElement.getOutProperties()) :
+            new CompositionElementDto(compositionId, element.getDefinition(),
+                Map.of(), Map.of(), ElementState.NOT_PRESENT);
     }
 
     /**
@@ -291,14 +295,14 @@ public class CacheProvider {
      */
     public Map<UUID, CompositionElementDto> getCompositionElementDtoMap(AutomationComposition automationComposition,
             UUID compositionId) {
-        var definitions = acElementsDefinitions.get(compositionId);
+        var acDefinition = acElementsDefinitions.get(compositionId);
         Map<UUID, CompositionElementDto> map = new HashMap<>();
         for (var element : automationComposition.getElements().values()) {
-            var definition = definitions.get(element.getDefinition());
-            var compositionElement = (definition != null)
+            var acDefinitionElement = acDefinition.getElements().get(element.getDefinition());
+            var compositionElement = (acDefinitionElement != null)
                     ? new CompositionElementDto(compositionId, element.getDefinition(),
-                            definition.getAutomationCompositionElementToscaNodeTemplate().getProperties(),
-                            definition.getOutProperties()) :
+                    acDefinitionElement.getAutomationCompositionElementToscaNodeTemplate().getProperties(),
+                    acDefinitionElement.getOutProperties()) :
                     new CompositionElementDto(compositionId, element.getDefinition(),
                             Map.of(), Map.of(), ElementState.NOT_PRESENT);
             map.put(element.getId(), compositionElement);
@@ -347,4 +351,42 @@ public class CacheProvider {
         return new CompositionElementDto(compositionElement.compositionId(), compositionElement.elementDefinitionId(),
                 compositionElement.inProperties(), compositionElement.outProperties(), ElementState.NEW);
     }
+
+    /**
+     * Check composition is present and compare the last update.
+     *
+     * @param compositionId the instanceId
+     * @param revisionId the last Update
+     * @return true if the composition is updated
+     */
+    public boolean isCompositionDefinitionUpdated(UUID compositionId, UUID revisionId) {
+        if (revisionId == null) {
+            // old ACM-r
+            return true;
+        }
+        var acDefinition = acElementsDefinitions.get(compositionId);
+        if (acDefinition == null) {
+            return false;
+        }
+        return revisionId.equals(acDefinition.getRevisionId());
+    }
+
+    /**
+     * Check instance is present and compare the last update.
+     *
+     * @param instanceId the instanceId
+     * @param revisionId the last Update
+     * @return true if the instance is updated
+     */
+    public boolean isInstanceUpdated(UUID instanceId, UUID revisionId) {
+        if (revisionId == null) {
+            // old ACM-r
+            return true;
+        }
+        var automationComposition = automationCompositions.get(instanceId);
+        if (automationComposition == null) {
+            return false;
+        }
+        return revisionId.equals(automationComposition.getRevisionId());
+    }
 }
index 9efe2e2..c0bda78 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -31,7 +31,8 @@ import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.handler.AutomationCompositionOutHandler;
-import org.onap.policy.clamp.acm.participant.intermediary.handler.CacheProvider;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
@@ -123,23 +124,27 @@ class ParticipantIntermediaryApiImplTest {
 
     @Test
     void testGetAcElementsDefinitions() {
-        var cacheProvider = mock(CacheProvider.class);
         var acElementDefinition = new AutomationCompositionElementDefinition();
         acElementDefinition.setAcElementDefinitionId(DEFINITION_ELEMENT_ID);
         acElementDefinition.setAutomationCompositionElementToscaNodeTemplate(new ToscaNodeTemplate());
-        var elementsDefinitions = Map.of(DEFINITION_ELEMENT_ID, acElementDefinition);
-        var map = Map.of(COMPOSITION_ID, elementsDefinitions);
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(COMPOSITION_ID);
+        acDefinition.getElements().put(DEFINITION_ELEMENT_ID, acElementDefinition);
+        var map = Map.of(COMPOSITION_ID, acDefinition);
+        var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getAcElementsDefinitions()).thenReturn(map);
         var automationComposiitonHandler = mock(AutomationCompositionOutHandler.class);
         var apiImpl = new ParticipantIntermediaryApiImpl(automationComposiitonHandler, cacheProvider);
         var mapResult = apiImpl.getAcElementsDefinitions();
-        assertEquals(map, mapResult);
+        assertThat(map).hasSameSizeAs(mapResult);
+        assertThat(mapResult.get(COMPOSITION_ID)).isNotEmpty();
+        assertEquals(mapResult.get(COMPOSITION_ID), acDefinition.getElements());
 
         var result = apiImpl.getAcElementsDefinitions(UUID.randomUUID());
         assertThat(result).isEmpty();
 
         result = apiImpl.getAcElementsDefinitions(COMPOSITION_ID);
-        assertEquals(elementsDefinitions, result);
+        assertEquals(acDefinition.getElements(), result);
 
         var element = apiImpl.getAcElementDefinition(UUID.randomUUID(), new ToscaConceptIdentifier("wrong", "0.0.1"));
         assertThat(element).isNull();
index 34c71d7..1fcd9fe 100644 (file)
@@ -40,6 +40,7 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMe
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegisterAck;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantReqSync;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
 import org.onap.policy.common.message.bus.event.TopicSink;
@@ -111,6 +112,9 @@ class ParticipantCommTest {
 
         var automationCompositionAck = mock(AutomationCompositionDeployAck.class);
         assertDoesNotThrow(() -> publisher.sendAutomationCompositionAck(automationCompositionAck));
+
+        var participantReqSync = mock(ParticipantReqSync.class);
+        assertDoesNotThrow(() -> publisher.sendParticipantReqSync(participantReqSync));
     }
 
     @Test
index a6fd103..64883e6 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation.
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,6 +32,8 @@ import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
@@ -45,15 +47,16 @@ class AcDefinitionHandlerTest {
 
     @Test
     void handleCompositionPrimeTest() {
-        var listener = mock(ThreadHandler.class);
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
-        var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
         var participantPrimeMsg = new ParticipantPrime();
         participantPrimeMsg.setCompositionId(UUID.randomUUID());
+        participantPrimeMsg.setRevisionIdComposition(UUID.randomUUID());
         participantPrimeMsg.setParticipantDefinitionUpdates(List.of(createParticipantDefinition()));
+        var listener = mock(ThreadHandler.class);
+        var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
         ach.handlePrime(participantPrimeMsg);
-        verify(cacheProvider).addElementDefinition(any(UUID.class), anyList());
+        verify(cacheProvider).addElementDefinition(any(UUID.class), anyList(), any(UUID.class));
         verify(listener).prime(any(UUID.class), any(CompositionDto.class));
     }
 
@@ -70,12 +73,15 @@ class AcDefinitionHandlerTest {
     void handleCompositionDeprimeTest() {
         var acElementDefinition = CommonTestData.createAutomationCompositionElementDefinition(
                 new ToscaConceptIdentifier("key", "1.0.0"));
+        var acDefinition = new AcDefinition();
         var compositionId = UUID.randomUUID();
+        acDefinition.setCompositionId(compositionId);
+        acDefinition.getElements().put(acElementDefinition.getAcElementDefinitionId(), acElementDefinition);
+
         var listener = mock(ThreadHandler.class);
         var cacheProvider = mock(CacheProvider.class);
         var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
-        when(cacheProvider.getAcElementsDefinitions())
-                .thenReturn(Map.of(compositionId, Map.of(new ToscaConceptIdentifier(), acElementDefinition)));
+        when(cacheProvider.getAcElementsDefinitions()).thenReturn(Map.of(compositionId, acDefinition));
         var participantPrimeMsg = new ParticipantPrime();
         participantPrimeMsg.setCompositionId(compositionId);
         ach.handlePrime(participantPrimeMsg);
@@ -101,6 +107,8 @@ class AcDefinitionHandlerTest {
         participantSyncMsg.setCompositionId(UUID.randomUUID());
         participantSyncMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
         participantSyncMsg.setAutomationcompositionList(List.of(CommonTestData.createParticipantRestartAc()));
+        participantSyncMsg.setRevisionIdComposition(UUID.randomUUID());
+        participantSyncMsg.setRevisionIdInstance(UUID.randomUUID());
 
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
@@ -108,7 +116,7 @@ class AcDefinitionHandlerTest {
         var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
         ach.handleParticipantSync(participantSyncMsg);
         verify(cacheProvider).initializeAutomationComposition(any(UUID.class), any());
-        verify(cacheProvider).addElementDefinition(any(), any());
+        verify(cacheProvider).addElementDefinition(any(UUID.class), any(), any(UUID.class));
     }
 
     @Test
@@ -118,6 +126,8 @@ class AcDefinitionHandlerTest {
         participantSyncMsg.setStateChangeResult(StateChangeResult.TIMEOUT);
         participantSyncMsg.setCompositionId(UUID.randomUUID());
         participantSyncMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
+        participantSyncMsg.setRevisionIdComposition(UUID.randomUUID());
+        participantSyncMsg.setRevisionIdInstance(UUID.randomUUID());
         var participantRestartAc = CommonTestData.createParticipantRestartAc();
         participantRestartAc.setStateChangeResult(StateChangeResult.TIMEOUT);
         participantSyncMsg.setAutomationcompositionList(List.of(participantRestartAc));
@@ -128,7 +138,7 @@ class AcDefinitionHandlerTest {
         var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
         ach.handleParticipantSync(participantSyncMsg);
         verify(cacheProvider).initializeAutomationComposition(any(UUID.class), any());
-        verify(cacheProvider).addElementDefinition(any(), any());
+        verify(cacheProvider).addElementDefinition(any(UUID.class), any(), any(UUID.class));
         verify(listener).cleanExecution(participantSyncMsg.getCompositionId(), participantSyncMsg.getMessageId());
         var elementId = participantRestartAc.getAcElementList().get(0).getId();
         verify(listener).cleanExecution(elementId, participantSyncMsg.getMessageId());
index 9ea3f9f..d38ad6c 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2024 Nordix Foundation.
+ *  Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,10 +29,11 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
@@ -40,7 +41,6 @@ import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionStateChange;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
 class AcLockHandlerTest {
 
@@ -67,16 +67,17 @@ class AcLockHandlerTest {
                 .thenReturn(automationComposition);
         when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
 
-        var listener = mock(ThreadHandler.class);
-        var ach = new AcLockHandler(cacheProvider, listener);
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(automationComposition.getCompositionId());
         for (var element : automationComposition.getElements().values()) {
-            map.put(element.getDefinition(), new AutomationCompositionElementDefinition());
+            acDefinition.getElements().put(element.getDefinition(), new AutomationCompositionElementDefinition());
         }
         when(cacheProvider.getAcElementsDefinitions())
-                .thenReturn(Map.of(automationComposition.getCompositionId(), map));
+                .thenReturn(Map.of(automationComposition.getCompositionId(), acDefinition));
         var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
                 automationComposition.getInstanceId(), DeployOrder.NONE, LockOrder.LOCK);
+        var listener = mock(ThreadHandler.class);
+        var ach = new AcLockHandler(cacheProvider, listener);
         ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
         verify(listener, times(automationComposition.getElements().size())).lock(any(), any(), any());
         for (var element : automationComposition.getElements().values()) {
@@ -99,16 +100,17 @@ class AcLockHandlerTest {
                 .thenReturn(automationComposition);
         when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
 
-        var listener = mock(ThreadHandler.class);
-        var ach = new AcLockHandler(cacheProvider, listener);
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(automationComposition.getCompositionId());
         for (var element : automationComposition.getElements().values()) {
-            map.put(element.getDefinition(), new AutomationCompositionElementDefinition());
+            acDefinition.getElements().put(element.getDefinition(), new AutomationCompositionElementDefinition());
         }
         when(cacheProvider.getAcElementsDefinitions())
-                .thenReturn(Map.of(automationComposition.getCompositionId(), map));
+                .thenReturn(Map.of(automationComposition.getCompositionId(), acDefinition));
         var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
                 automationComposition.getInstanceId(), DeployOrder.NONE, LockOrder.UNLOCK);
+        var listener = mock(ThreadHandler.class);
+        var ach = new AcLockHandler(cacheProvider, listener);
         ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
         verify(listener, times(automationComposition.getElements().size())).unlock(any(), any(), any());
         for (var element : automationComposition.getElements().values()) {
index d9df764..b9b61b4 100644 (file)
@@ -28,11 +28,12 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -74,12 +75,13 @@ class AcSubStateHandlerTest {
         var cacheProvider = new CacheProvider(CommonTestData.getParticipantParameters());
         var definitions =
                 CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition);
-        cacheProvider.addElementDefinition(automationComposition.getCompositionId(), definitions);
-        cacheProvider.addElementDefinition(automationComposition.getCompositionTargetId(), definitions);
+        cacheProvider.addElementDefinition(automationComposition.getCompositionId(), definitions, UUID.randomUUID());
+        cacheProvider.addElementDefinition(
+                automationComposition.getCompositionTargetId(), definitions, UUID.randomUUID());
         var participantDeploy =
                 CommonTestData.createparticipantDeploy(cacheProvider.getParticipantId(), automationComposition);
         cacheProvider.initializeAutomationComposition(automationComposition.getCompositionId(),
-                automationComposition.getInstanceId(), participantDeploy);
+                automationComposition.getInstanceId(), participantDeploy, UUID.randomUUID());
         var migrationMsg = new AutomationCompositionMigration();
         migrationMsg.setStage(0);
         migrationMsg.setCompositionId(automationComposition.getCompositionId());
@@ -102,11 +104,11 @@ class AcSubStateHandlerTest {
         var cacheProvider = new CacheProvider(CommonTestData.getParticipantParameters());
         var definitions =
                 CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition);
-        cacheProvider.addElementDefinition(automationComposition.getCompositionId(), definitions);
+        cacheProvider.addElementDefinition(automationComposition.getCompositionId(), definitions, UUID.randomUUID());
         var participantDeploy =
                 CommonTestData.createparticipantDeploy(cacheProvider.getParticipantId(), automationComposition);
         cacheProvider.initializeAutomationComposition(automationComposition.getCompositionId(),
-                automationComposition.getInstanceId(), participantDeploy);
+                automationComposition.getInstanceId(), participantDeploy, UUID.randomUUID());
 
         var acMigrate = new AutomationComposition(automationComposition);
         acMigrate.setCompositionTargetId(UUID.randomUUID());
@@ -118,7 +120,7 @@ class AcSubStateHandlerTest {
 
         var migrateDefinitions =
                 CommonTestData.createAutomationCompositionElementDefinitionList(acMigrate);
-        cacheProvider.addElementDefinition(acMigrate.getCompositionTargetId(), migrateDefinitions);
+        cacheProvider.addElementDefinition(acMigrate.getCompositionTargetId(), migrateDefinitions, UUID.randomUUID());
 
         var migrationMsg = new AutomationCompositionMigration();
         migrationMsg.setStage(0);
@@ -155,16 +157,17 @@ class AcSubStateHandlerTest {
         acPrepareMsg.setAutomationCompositionId(automationComposition.getInstanceId());
         when(cacheProvider.getAutomationComposition(automationComposition.getInstanceId()))
                 .thenReturn(automationComposition);
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(automationComposition.getCompositionId());
         for (var element : automationComposition.getElements().values()) {
             var acElementDeploy = new AcElementDeploy();
             acElementDeploy.setProperties(Map.of());
             acElementDeploy.setId(element.getId());
             participantDeploy.getAcElementList().add(acElementDeploy);
-            map.put(element.getDefinition(), new AutomationCompositionElementDefinition());
+            acDefinition.getElements().put(element.getDefinition(), new AutomationCompositionElementDefinition());
         }
         when(cacheProvider.getAcElementsDefinitions())
-            .thenReturn(Map.of(automationComposition.getCompositionId(), map));
+            .thenReturn(Map.of(automationComposition.getCompositionId(), acDefinition));
 
         ach.handleAcPrepare(acPrepareMsg);
         verify(listener, times(automationComposition.getElements().size())).prepare(any(), any(), any(), anyInt());
@@ -183,15 +186,16 @@ class AcSubStateHandlerTest {
         acPrepareMsg.setAutomationCompositionId(automationComposition.getInstanceId());
         when(cacheProvider.getAutomationComposition(automationComposition.getInstanceId()))
             .thenReturn(automationComposition);
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(automationComposition.getCompositionId());
         for (var element : automationComposition.getElements().values()) {
             var acElementDeploy = new AcElementDeploy();
             acElementDeploy.setProperties(Map.of());
             acElementDeploy.setId(element.getId());
-            map.put(element.getDefinition(), new AutomationCompositionElementDefinition());
+            acDefinition.getElements().put(element.getDefinition(), new AutomationCompositionElementDefinition());
         }
         when(cacheProvider.getAcElementsDefinitions())
-            .thenReturn(Map.of(automationComposition.getCompositionId(), map));
+            .thenReturn(Map.of(automationComposition.getCompositionId(), acDefinition));
 
         var listener = mock(ThreadHandler.class);
         var ach = new AcSubStateHandler(cacheProvider, listener);
index 3fe549f..d6d47c8 100644 (file)
@@ -30,12 +30,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -84,18 +85,19 @@ class AutomationCompositionHandlerTest {
                 .thenReturn(automationComposition);
         when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
 
-        var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
-        var listener = mock(ThreadHandler.class);
-        var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(automationComposition.getCompositionId());
         for (var element : automationComposition.getElements().values()) {
-            map.put(element.getDefinition(), new AutomationCompositionElementDefinition());
+            acDefinition.getElements().put(element.getDefinition(), new AutomationCompositionElementDefinition());
         }
         when(cacheProvider.getAcElementsDefinitions())
-            .thenReturn(Map.of(automationComposition.getCompositionId(), map));
+            .thenReturn(Map.of(automationComposition.getCompositionId(), acDefinition));
         var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
             automationComposition.getInstanceId(), DeployOrder.UNDEPLOY, LockOrder.NONE);
 
+        var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
+        var listener = mock(ThreadHandler.class);
+        var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
         ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
         verify(listener, times(automationComposition.getElements().size())).undeploy(any(), any(), any());
         for (var element : automationComposition.getElements().values()) {
@@ -116,17 +118,18 @@ class AutomationCompositionHandlerTest {
                 .thenReturn(automationComposition);
         when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
 
-        var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
-        var listener = mock(ThreadHandler.class);
-        var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(automationComposition.getCompositionId());
         for (var element : automationComposition.getElements().values()) {
-            map.put(element.getDefinition(), new AutomationCompositionElementDefinition());
+            acDefinition.getElements().put(element.getDefinition(), new AutomationCompositionElementDefinition());
         }
         when(cacheProvider.getAcElementsDefinitions())
-            .thenReturn(Map.of(automationComposition.getCompositionId(), map));
+            .thenReturn(Map.of(automationComposition.getCompositionId(), acDefinition));
         var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
             automationComposition.getInstanceId(), DeployOrder.DELETE, LockOrder.NONE);
+        var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
+        var listener = mock(ThreadHandler.class);
+        var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
         ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
         verify(listener, times(automationComposition.getElements().size())).delete(any(), any(), any());
         for (var element : automationComposition.getElements().values()) {
@@ -164,12 +167,13 @@ class AutomationCompositionHandlerTest {
         acElementDeploy.setId(automationComposition.getElements().values().iterator().next().getId());
         participantDeploy.getAcElementList().add(acElementDeploy);
 
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(automationComposition.getCompositionId());
         for (var element : automationComposition.getElements().values()) {
-            map.put(element.getDefinition(), new AutomationCompositionElementDefinition());
+            acDefinition.getElements().put(element.getDefinition(), new AutomationCompositionElementDefinition());
         }
         when(cacheProvider.getAcElementsDefinitions())
-            .thenReturn(Map.of(automationComposition.getCompositionId(), map));
+            .thenReturn(Map.of(automationComposition.getCompositionId(), acDefinition));
         ach.handleAcPropertyUpdate(updateMsg);
         verify(listener).update(any(), any(), any(), any());
     }
@@ -194,16 +198,17 @@ class AutomationCompositionHandlerTest {
         deployMsg.setAutomationCompositionId(automationComposition.getInstanceId());
         when(cacheProvider.getAutomationComposition(automationComposition.getInstanceId()))
                 .thenReturn(automationComposition);
-        Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition> map = new HashMap<>();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(automationComposition.getCompositionId());
         for (var element : automationComposition.getElements().values()) {
             var acElementDeploy = new AcElementDeploy();
             acElementDeploy.setProperties(Map.of());
             acElementDeploy.setId(element.getId());
             participantDeploy.getAcElementList().add(acElementDeploy);
-            map.put(element.getDefinition(), new AutomationCompositionElementDefinition());
+            acDefinition.getElements().put(element.getDefinition(), new AutomationCompositionElementDefinition());
         }
         when(cacheProvider.getAcElementsDefinitions())
-            .thenReturn(Map.of(automationComposition.getCompositionId(), map));
+            .thenReturn(Map.of(automationComposition.getCompositionId(), acDefinition));
 
         ach.handleAutomationCompositionDeploy(deployMsg);
         verify(listener, times(automationComposition.getElements().size())).deploy(any(), any(), any());
@@ -365,9 +370,9 @@ class AutomationCompositionHandlerTest {
             UUID compositionId, UUID instanceId, List<AutomationCompositionElementDefinition> definitions,
             UUID compositionTargetId, List<AutomationCompositionElementDefinition> migrateDefinitions) {
         var cacheProvider = new CacheProvider(CommonTestData.getParticipantParameters());
-        cacheProvider.addElementDefinition(compositionId, definitions);
-        cacheProvider.initializeAutomationComposition(compositionId, instanceId, participantDeploy);
-        cacheProvider.addElementDefinition(compositionTargetId, migrateDefinitions);
+        cacheProvider.addElementDefinition(compositionId, definitions, UUID.randomUUID());
+        cacheProvider.initializeAutomationComposition(compositionId, instanceId, participantDeploy, UUID.randomUUID());
+        cacheProvider.addElementDefinition(compositionTargetId, migrateDefinitions, UUID.randomUUID());
         return cacheProvider;
     }
 
index ce62fcb..25b5bf9 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation.
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,6 +32,8 @@ import java.util.Map;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AcDefinition;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
@@ -46,6 +48,8 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
 class AutomationCompositionOutHandlerTest {
 
+    private static final ToscaConceptIdentifier ELEMENT_ID = new ToscaConceptIdentifier("code", "1.0.0");
+
     @Test
     void updateAutomationCompositionElementStateNullTest() {
         var cacheProvider = mock(CacheProvider.class);
@@ -150,6 +154,23 @@ class AutomationCompositionOutHandlerTest {
         verify(publisher).sendAutomationCompositionAck(any(AutomationCompositionDeployAck.class));
     }
 
+    @Test
+    void updateAcElementStatePrepareFailTest() {
+        var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
+        automationComposition.setSubState(SubState.PREPARING);
+        var cacheProvider = mock(CacheProvider.class);
+        when(cacheProvider.getAutomationComposition(automationComposition.getInstanceId()))
+                .thenReturn(automationComposition);
+        var element = automationComposition.getElements().values().iterator().next();
+        element.setSubState(SubState.PREPARING);
+        var elementId = element.getId();
+        var publisher = mock(ParticipantMessagePublisher.class);
+        var acOutHandler = new AutomationCompositionOutHandler(publisher, cacheProvider);
+        acOutHandler.updateAutomationCompositionElementState(automationComposition.getInstanceId(), elementId,
+                DeployState.DEPLOYED, null, StateChangeResult.FAILED, "Prepare failed");
+        verify(publisher).sendAutomationCompositionAck(any(AutomationCompositionDeployAck.class));
+    }
+
     @Test
     void updateAutomationCompositionElementStateLockTest() {
         var publisher = mock(ParticipantMessagePublisher.class);
@@ -267,10 +288,10 @@ class AutomationCompositionOutHandlerTest {
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(UUID.randomUUID());
         var compositionId = UUID.randomUUID();
-        var elementId = new ToscaConceptIdentifier("code", "1.0.0");
-        var mapAcElementsDefinitions =
-                Map.of(compositionId, Map.of(elementId, new AutomationCompositionElementDefinition()));
-        when(cacheProvider.getAcElementsDefinitions()).thenReturn(mapAcElementsDefinitions);
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(compositionId);
+        acDefinition.getElements().put(ELEMENT_ID, new AutomationCompositionElementDefinition());
+        when(cacheProvider.getAcElementsDefinitions()).thenReturn(Map.of(compositionId, acDefinition));
         var publisher = mock(ParticipantMessagePublisher.class);
         var acOutHandler = new AutomationCompositionOutHandler(publisher, cacheProvider);
 
@@ -283,7 +304,24 @@ class AutomationCompositionOutHandlerTest {
         acOutHandler.sendAcDefinitionInfo(compositionId, new ToscaConceptIdentifier("wrong", "1.0.0"), Map.of());
         verify(publisher, times(0)).sendParticipantStatus(any(ParticipantStatus.class));
 
-        acOutHandler.sendAcDefinitionInfo(compositionId, elementId, Map.of());
+        acOutHandler.sendAcDefinitionInfo(compositionId, ELEMENT_ID, Map.of());
+        verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
+    }
+
+    @Test
+    void sendAcDefinitionInfoSingleTest() {
+        var cacheProvider = mock(CacheProvider.class);
+        when(cacheProvider.getParticipantId()).thenReturn(UUID.randomUUID());
+        var compositionId = UUID.randomUUID();
+        var acDefinition = new AcDefinition();
+        acDefinition.setCompositionId(compositionId);
+        acDefinition.getElements().put(ELEMENT_ID, new AutomationCompositionElementDefinition());
+        when(cacheProvider.getAcElementsDefinitions()).thenReturn(Map.of(compositionId, acDefinition));
+        var publisher = mock(ParticipantMessagePublisher.class);
+        var acOutHandler = new AutomationCompositionOutHandler(publisher, cacheProvider);
+
+        // if there is only one element
+        acOutHandler.sendAcDefinitionInfo(compositionId, null, Map.of());
         verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
     }
 
@@ -301,7 +339,7 @@ class AutomationCompositionOutHandlerTest {
         var compositionTarget = UUID.randomUUID();
         automationComposition.setCompositionTargetId(compositionTarget);
         automationComposition.setDeployState(DeployState.DEPLOYED);
-        when(cacheProvider.getAcElementsDefinitions()).thenReturn(Map.of(compositionTarget, Map.of()));
+        when(cacheProvider.getAcElementsDefinitions()).thenReturn(Map.of(compositionTarget, new AcDefinition()));
 
         for (var element : automationComposition.getElements().values()) {
             acOutHandler.updateAutomationCompositionElementState(automationComposition.getInstanceId(), element.getId(),
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutorTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/MsgExecutorTest.java
new file mode 100644 (file)
index 0000000..0a7fcb4
--- /dev/null
@@ -0,0 +1,125 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.AutomationCompositionMsg;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
+import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionStateChange;
+
+class MsgExecutorTest {
+
+    @Test
+    void testExecute() {
+        var parameters = CommonTestData.getParticipantParameters();
+        var cacheProvider = new CacheProvider(parameters);
+        var publisher = mock(ParticipantMessagePublisher.class);
+        var msgExecutor = new MsgExecutor(cacheProvider, publisher);
+        var automationCompositionHandler = mock(AutomationCompositionHandler.class);
+        var updateMsg = new AutomationCompositionDeploy();
+        var acMsg = new AutomationCompositionMsg<>(
+                automationCompositionHandler::handleAutomationCompositionDeploy, updateMsg);
+        msgExecutor.execute(acMsg);
+        verify(automationCompositionHandler).handleAutomationCompositionDeploy(updateMsg);
+    }
+
+    @Test
+    void testExecuteCompositionOutdated() {
+        var parameters = CommonTestData.getParticipantParameters();
+        var cacheProvider = new CacheProvider(parameters);
+        var publisher = mock(ParticipantMessagePublisher.class);
+        var msgExecutor = new MsgExecutor(cacheProvider, publisher);
+        var automationCompositionHandler = mock(AutomationCompositionHandler.class);
+        var updateMsg = new AutomationCompositionDeploy();
+        var acMsg = new AutomationCompositionMsg<>(
+                automationCompositionHandler::handleAutomationCompositionDeploy, updateMsg);
+        var compositionId = UUID.randomUUID();
+        acMsg.setCompositionId(compositionId);
+        var revisionIdComposition = UUID.randomUUID();
+        acMsg.setRevisionIdComposition(revisionIdComposition);
+        msgExecutor.execute(acMsg);
+        verify(automationCompositionHandler, times(0)).handleAutomationCompositionDeploy(updateMsg);
+        verify(publisher).sendParticipantReqSync(any());
+        assertThat(cacheProvider.getMessagesOnHold()).hasSize(1);
+
+        var automationComposition =
+                CommonTestData.getTestAutomationCompositions().getAutomationCompositionList().get(0);
+        automationComposition.setInstanceId(UUID.randomUUID());
+        automationComposition.setCompositionId(compositionId);
+        var definitions =
+                CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition);
+        cacheProvider.addElementDefinition(compositionId, definitions, revisionIdComposition);
+        msgExecutor.check();
+        verify(automationCompositionHandler, timeout(100)).handleAutomationCompositionDeploy(updateMsg);
+        assertThat(cacheProvider.getMessagesOnHold()).isEmpty();
+    }
+
+    @Test
+    void testCheckAndExecuteInstance() {
+        var automationCompositionHandler = mock(AutomationCompositionHandler.class);
+        var stateChangeMsg = new AutomationCompositionStateChange();
+        var acMsg = new AutomationCompositionMsg<>(
+                automationCompositionHandler::handleAutomationCompositionStateChange, stateChangeMsg);
+        var compositionId = UUID.randomUUID();
+        acMsg.setCompositionId(compositionId);
+        var revisionIdComposition = UUID.randomUUID();
+        acMsg.setRevisionIdComposition(revisionIdComposition);
+        var instanceId = UUID.randomUUID();
+        acMsg.setInstanceId(instanceId);
+        acMsg.setRevisionIdInstance(UUID.randomUUID());
+
+        var automationComposition =
+                CommonTestData.getTestAutomationCompositions().getAutomationCompositionList().get(0);
+        automationComposition.setInstanceId(instanceId);
+        automationComposition.setCompositionId(compositionId);
+        var definitions =
+                CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition);
+        var parameters = CommonTestData.getParticipantParameters();
+        var cacheProvider = new CacheProvider(parameters);
+        cacheProvider.addElementDefinition(compositionId, definitions, revisionIdComposition);
+
+        var publisher = mock(ParticipantMessagePublisher.class);
+        var msgExecutor = new MsgExecutor(cacheProvider, publisher);
+        msgExecutor.execute(acMsg);
+        verify(automationCompositionHandler, times(0)).handleAutomationCompositionStateChange(stateChangeMsg);
+        verify(publisher).sendParticipantReqSync(any());
+        assertThat(cacheProvider.getMessagesOnHold()).hasSize(1);
+
+        var participantDeploy =
+                CommonTestData.createparticipantDeploy(cacheProvider.getParticipantId(), automationComposition);
+        cacheProvider.initializeAutomationComposition(compositionId, automationComposition.getInstanceId(),
+                participantDeploy, acMsg.getRevisionIdInstance());
+        msgExecutor.check();
+        verify(automationCompositionHandler, timeout(100)).handleAutomationCompositionStateChange(stateChangeMsg);
+        assertThat(cacheProvider.getMessagesOnHold()).isEmpty();
+    }
+}
index 1fb7281..d036aa8 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2024 Nordix Foundation.
+ *  Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -33,11 +34,13 @@ import java.util.List;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeploy;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionMigration;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionPrepare;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionStateChange;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
@@ -61,9 +64,10 @@ class ParticipantHandlerTest {
         var publisher = mock(ParticipantMessagePublisher.class);
         when(publisher.isActive()).thenReturn(true);
         var cacheProvider = mock(CacheProvider.class);
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
             mock(AcLockHandler.class), mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class),
-            publisher, cacheProvider);
+            publisher, cacheProvider, msgExecutor);
         participantHandler.handleParticipantStatusReq(new ParticipantStatusReq());
         verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
 
@@ -75,25 +79,37 @@ class ParticipantHandlerTest {
 
     @Test
     void handleAutomationCompositionDeployTest() {
+        var cacheProvider = mock(CacheProvider.class);
+        var automationCompositionDeploy = new AutomationCompositionDeploy();
+        automationCompositionDeploy.setAutomationCompositionId(UUID.randomUUID());
+        automationCompositionDeploy.setRevisionIdInstance(UUID.randomUUID());
+        when(cacheProvider.isInstanceUpdated(automationCompositionDeploy.getAutomationCompositionId(),
+                automationCompositionDeploy.getRevisionIdInstance())).thenReturn(true);
         var acHandler = mock(AutomationCompositionHandler.class);
+        var msgExecutor = new MsgExecutor(cacheProvider, mock(ParticipantMessagePublisher.class));
         var participantHandler = new ParticipantHandler(acHandler, mock(AcLockHandler.class),
-            mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
-            mock(CacheProvider.class));
-        var automationCompositionDeploy = new AutomationCompositionDeploy();
+                mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
+                cacheProvider, msgExecutor);
         participantHandler.handleAutomationCompositionDeploy(automationCompositionDeploy);
         verify(acHandler).handleAutomationCompositionDeploy(automationCompositionDeploy);
     }
 
     @Test
     void handleAutomationCompositionStateChangeTest() {
-        var acHandler = mock(AutomationCompositionHandler.class);
-        var acLockHandler = mock(AcLockHandler.class);
-        var participantHandler = new ParticipantHandler(acHandler, acLockHandler, mock(AcSubStateHandler.class),
-            mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class), mock(CacheProvider.class));
         var acStateChange = new AutomationCompositionStateChange();
+        acStateChange.setCompositionId(UUID.randomUUID());
+        acStateChange.setRevisionIdComposition(UUID.randomUUID());
+        var cacheProvider = mock(CacheProvider.class);
+        when(cacheProvider.isCompositionDefinitionUpdated(acStateChange.getCompositionId(),
+                acStateChange.getRevisionIdComposition())).thenReturn(true);
 
         acStateChange.setDeployOrderedState(DeployOrder.DEPLOY);
         acStateChange.setLockOrderedState(LockOrder.NONE);
+        var acHandler = mock(AutomationCompositionHandler.class);
+        var acLockHandler = mock(AcLockHandler.class);
+        var msgExecutor = new MsgExecutor(cacheProvider, mock(ParticipantMessagePublisher.class));
+        var participantHandler = new ParticipantHandler(acHandler, acLockHandler, mock(AcSubStateHandler.class),
+                mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class), cacheProvider, msgExecutor);
         participantHandler.handleAutomationCompositionStateChange(acStateChange);
         verify(acHandler).handleAutomationCompositionStateChange(acStateChange);
 
@@ -105,12 +121,29 @@ class ParticipantHandlerTest {
 
     @Test
     void handleAutomationCompositionMigrationTest() {
+        var cacheProvider = mock(CacheProvider.class);
+        var migrationMsg = new AutomationCompositionMigration();
+        migrationMsg.setCompositionId(UUID.randomUUID());
+        migrationMsg.setRevisionIdComposition(UUID.randomUUID());
+        when(cacheProvider.isCompositionDefinitionUpdated(migrationMsg.getCompositionId(),
+                migrationMsg.getRevisionIdComposition())).thenReturn(true);
+
+        migrationMsg.setAutomationCompositionId(UUID.randomUUID());
+        migrationMsg.setRevisionIdInstance(UUID.randomUUID());
+        when(cacheProvider.isInstanceUpdated(migrationMsg.getAutomationCompositionId(),
+                migrationMsg.getRevisionIdInstance())).thenReturn(true);
+
+        migrationMsg.setCompositionTargetId(UUID.randomUUID());
+        migrationMsg.setRevisionIdCompositionTarget(UUID.randomUUID());
+        when(cacheProvider.isCompositionDefinitionUpdated(migrationMsg.getCompositionTargetId(),
+                migrationMsg.getRevisionIdCompositionTarget())).thenReturn(true);
+
         var acHandler = mock(AutomationCompositionHandler.class);
         var acSubStateHandler = mock(AcSubStateHandler.class);
+        var msgExecutor = new MsgExecutor(cacheProvider, mock(ParticipantMessagePublisher.class));
         var participantHandler = new ParticipantHandler(acHandler, mock(AcLockHandler.class),
-            acSubStateHandler, mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
-            mock(CacheProvider.class));
-        var migrationMsg = new AutomationCompositionMigration();
+                acSubStateHandler, mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
+                cacheProvider, msgExecutor);
         participantHandler.handleAutomationCompositionMigration(migrationMsg);
         verify(acHandler).handleAutomationCompositionMigration(migrationMsg);
 
@@ -121,23 +154,54 @@ class ParticipantHandlerTest {
 
     @Test
     void handleAcPropertyUpdateTest() {
+        var propertyUpdateMsg = new PropertiesUpdate();
+        propertyUpdateMsg.setCompositionId(UUID.randomUUID());
+        propertyUpdateMsg.setRevisionIdComposition(UUID.randomUUID());
+        var cacheProvider = mock(CacheProvider.class);
+        when(cacheProvider.isCompositionDefinitionUpdated(propertyUpdateMsg.getCompositionId(),
+                propertyUpdateMsg.getRevisionIdComposition())).thenReturn(true);
+
+        propertyUpdateMsg.setAutomationCompositionId(UUID.randomUUID());
+        propertyUpdateMsg.setRevisionIdInstance(UUID.randomUUID());
+        when(cacheProvider.isInstanceUpdated(propertyUpdateMsg.getAutomationCompositionId(),
+                propertyUpdateMsg.getRevisionIdInstance())).thenReturn(true);
+
         var acHandler = mock(AutomationCompositionHandler.class);
+        var msgExecutor = new MsgExecutor(cacheProvider, mock(ParticipantMessagePublisher.class));
         var participantHandler = new ParticipantHandler(acHandler, mock(AcLockHandler.class),
-            mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
-            mock(CacheProvider.class));
-        var propertyUpdateMsg = new PropertiesUpdate();
+                mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class), mock(ParticipantMessagePublisher.class),
+                cacheProvider, msgExecutor);
         participantHandler.handleAcPropertyUpdate(propertyUpdateMsg);
         verify(acHandler).handleAcPropertyUpdate(propertyUpdateMsg);
     }
 
+    @Test
+    void sendHandleAutomationCompositionPrepare() {
+        var acPrepareMsg = new AutomationCompositionPrepare();
+        acPrepareMsg.setParticipantId(UUID.randomUUID());
+        acPrepareMsg.setRevisionIdComposition(UUID.randomUUID());
+        acPrepareMsg.setPreDeploy(false);
+
+        var cacheProvider = mock(CacheProvider.class);
+        var acSubStateHandler = mock(AcSubStateHandler.class);
+        var msgExecutor = new MsgExecutor(cacheProvider, mock(ParticipantMessagePublisher.class));
+        var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
+                mock(AcLockHandler.class), acSubStateHandler, mock(AcDefinitionHandler.class),
+                mock(ParticipantMessagePublisher.class), cacheProvider, msgExecutor);
+
+        participantHandler.handleAutomationCompositionPrepare(acPrepareMsg);
+        verify(acSubStateHandler).handleAcPrepare(acPrepareMsg);
+    }
+
     @Test
     void appliesToTest() {
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
         when(cacheProvider.getReplicaId()).thenReturn(CommonTestData.getReplicaId());
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
             mock(AcLockHandler.class), mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class),
-            mock(ParticipantMessagePublisher.class), cacheProvider);
+            mock(ParticipantMessagePublisher.class), cacheProvider, msgExecutor);
 
         var participantAckMsg = new ParticipantAckMessage(ParticipantMessageType.AUTOMATION_COMPOSITION_DEPLOY);
         assertTrue(participantHandler.appliesTo(participantAckMsg));
@@ -155,9 +219,10 @@ class ParticipantHandlerTest {
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
         when(cacheProvider.getSupportedAcElementTypes()).thenReturn(List.of(new ParticipantSupportedElementType()));
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
             mock(AcLockHandler.class), mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class), publisher,
-            cacheProvider);
+            cacheProvider, msgExecutor);
 
         participantHandler.sendParticipantRegister();
         verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
@@ -168,9 +233,10 @@ class ParticipantHandlerTest {
         var publisher = mock(ParticipantMessagePublisher.class);
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
             mock(AcLockHandler.class), mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class), publisher,
-            cacheProvider);
+            cacheProvider, msgExecutor);
 
         participantHandler.handleParticipantRegisterAck(new ParticipantRegisterAck());
         verify(publisher).sendParticipantStatus(any(ParticipantStatus.class));
@@ -181,9 +247,10 @@ class ParticipantHandlerTest {
         var publisher = mock(ParticipantMessagePublisher.class);
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
             mock(AcLockHandler.class), mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class), publisher,
-            cacheProvider);
+            cacheProvider, msgExecutor);
 
         participantHandler.sendParticipantDeregister();
         verify(publisher).sendParticipantDeregister(any(ParticipantDeregister.class));
@@ -191,9 +258,10 @@ class ParticipantHandlerTest {
 
     @Test
     void handleParticipantDeregisterAckTest() {
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
             mock(AcLockHandler.class), mock(AcSubStateHandler.class), mock(AcDefinitionHandler.class),
-            mock(ParticipantMessagePublisher.class), mock(CacheProvider.class));
+            mock(ParticipantMessagePublisher.class), mock(CacheProvider.class), msgExecutor);
         var participantDeregisterAck = new ParticipantDeregisterAck();
         assertDoesNotThrow(() -> participantHandler.handleParticipantDeregisterAck(participantDeregisterAck));
     }
@@ -205,9 +273,10 @@ class ParticipantHandlerTest {
         participantPrime.setMessageId(UUID.randomUUID());
 
         var acHandler = mock(AcDefinitionHandler.class);
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
             mock(AcLockHandler.class), mock(AcSubStateHandler.class), acHandler,
-            mock(ParticipantMessagePublisher.class), mock(CacheProvider.class));
+            mock(ParticipantMessagePublisher.class), mock(CacheProvider.class), msgExecutor);
 
         participantHandler.handleParticipantPrime(participantPrime);
         verify(acHandler).handlePrime(participantPrime);
@@ -224,9 +293,16 @@ class ParticipantHandlerTest {
         when(cacheProvider.getReplicaId()).thenReturn(CommonTestData.getReplicaId());
         var publisher = mock(ParticipantMessagePublisher.class);
         var acHandler = mock(AcDefinitionHandler.class);
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
-            mock(AcLockHandler.class), mock(AcSubStateHandler.class), acHandler, publisher, cacheProvider);
+            mock(AcLockHandler.class), mock(AcSubStateHandler.class), acHandler, publisher, cacheProvider,
+            msgExecutor);
+
+        participantSyncMsg.getExcludeReplicas().add(cacheProvider.getReplicaId());
+        participantHandler.handleParticipantSync(participantSyncMsg);
+        verify(acHandler, times(0)).handleParticipantSync(participantSyncMsg);
 
+        participantSyncMsg.getExcludeReplicas().clear();
         participantHandler.handleParticipantSync(participantSyncMsg);
         verify(acHandler).handleParticipantSync(participantSyncMsg);
     }
@@ -240,8 +316,10 @@ class ParticipantHandlerTest {
         var publisher = mock(ParticipantMessagePublisher.class);
         when(publisher.isActive()).thenReturn(true);
         var acHandler = mock(AcDefinitionHandler.class);
+        var msgExecutor = mock(MsgExecutor.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
-            mock(AcLockHandler.class), mock(AcSubStateHandler.class), acHandler, publisher, cacheProvider);
+            mock(AcLockHandler.class), mock(AcSubStateHandler.class), acHandler, publisher, cacheProvider,
+            msgExecutor);
         participantHandler.sendHeartbeat();
         verify(publisher).sendParticipantRegister(any(ParticipantRegister.class));
 
index 8846442..4ac0b43 100644 (file)
@@ -36,6 +36,7 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.acm.participant.intermediary.handler.cache.CacheProvider;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation.
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * ============LICENSE_END=========================================================
  */
 
-package org.onap.policy.clamp.acm.participant.intermediary.handler;
+package org.onap.policy.clamp.acm.participant.intermediary.handler.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Map;
 import java.util.UUID;
@@ -37,7 +39,7 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 class CacheProviderTest {
 
     @Test
-    void testgetSupportedAcElementTypes() {
+    void testGetSupportedAcElementTypes() {
         var parameter = CommonTestData.getParticipantParameters();
         var cacheProvider = new CacheProvider(parameter);
         assertEquals(parameter.getIntermediaryParameters().getParticipantId(), cacheProvider.getParticipantId());
@@ -52,22 +54,23 @@ class CacheProviderTest {
         var instanceId = UUID.randomUUID();
         var participantDeploy = new ParticipantDeploy();
 
-        assertThatThrownBy(() -> cacheProvider.initializeAutomationComposition(null, instanceId, participantDeploy))
+        assertThatThrownBy(() -> cacheProvider
+                .initializeAutomationComposition(null, instanceId, participantDeploy, null))
                 .isInstanceOf(NullPointerException.class);
-        assertThatThrownBy(() -> cacheProvider.initializeAutomationComposition(instanceId, null, participantDeploy))
+        assertThatThrownBy(() -> cacheProvider
+                .initializeAutomationComposition(instanceId, null, participantDeploy, null))
                 .isInstanceOf(NullPointerException.class);
-        assertThatThrownBy(() -> cacheProvider.initializeAutomationComposition(instanceId, instanceId, null))
-                .isInstanceOf(NullPointerException.class);
-        assertThatThrownBy(() -> cacheProvider.initializeAutomationComposition(null, null))
+        assertThatThrownBy(() -> cacheProvider
+                .initializeAutomationComposition(instanceId, instanceId, null, null))
                 .isInstanceOf(NullPointerException.class);
 
         var deployState = DeployState.DEPLOYED;
         var subState = SubState.NONE;
 
         assertThatThrownBy(() -> cacheProvider.initializeAutomationComposition(null, instanceId, participantDeploy,
-                deployState, subState)).isInstanceOf(NullPointerException.class);
+                deployState, subState, null)).isInstanceOf(NullPointerException.class);
         assertThatThrownBy(() -> cacheProvider.initializeAutomationComposition(instanceId, null, participantDeploy,
-                deployState, subState)).isInstanceOf(NullPointerException.class);
+                deployState, subState, null)).isInstanceOf(NullPointerException.class);
     }
 
     @Test
@@ -76,9 +79,9 @@ class CacheProviderTest {
         var cacheProvider = new CacheProvider(parameter);
         var instanceId = UUID.randomUUID();
 
-        assertThatThrownBy(() -> cacheProvider.addElementDefinition(null, null))
+        assertThatThrownBy(() -> cacheProvider.addElementDefinition(null, null, null))
                 .isInstanceOf(NullPointerException.class);
-        assertThatThrownBy(() -> cacheProvider.addElementDefinition(instanceId, null))
+        assertThatThrownBy(() -> cacheProvider.addElementDefinition(instanceId, null, null))
                 .isInstanceOf(NullPointerException.class);
 
         assertThatThrownBy(() -> cacheProvider.getAutomationComposition(null)).isInstanceOf(NullPointerException.class);
@@ -100,7 +103,7 @@ class CacheProviderTest {
     }
 
     @Test
-    void testinitCommonProperties() {
+    void testInitCommonProperties() {
         var automationComposition =
                 CommonTestData.getTestAutomationCompositions().getAutomationCompositionList().get(0);
         automationComposition.setInstanceId(UUID.randomUUID());
@@ -109,12 +112,12 @@ class CacheProviderTest {
         var definitions =
                 CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition);
         var cacheProvider = new CacheProvider(CommonTestData.getParticipantParameters());
-        cacheProvider.addElementDefinition(compositionId, definitions);
+        cacheProvider.addElementDefinition(compositionId, definitions, UUID.randomUUID());
 
         var participantDeploy =
                 CommonTestData.createparticipantDeploy(cacheProvider.getParticipantId(), automationComposition);
         cacheProvider.initializeAutomationComposition(compositionId, automationComposition.getInstanceId(),
-                participantDeploy);
+                participantDeploy, UUID.randomUUID());
 
         for (var element : automationComposition.getElements().values()) {
             var commonProperties =
@@ -138,7 +141,7 @@ class CacheProviderTest {
     }
 
     @Test
-    void testDeply() {
+    void testDeploy() {
         var automationComposition =
                 CommonTestData.getTestAutomationCompositions().getAutomationCompositionList().get(0);
         automationComposition.setInstanceId(UUID.randomUUID());
@@ -150,7 +153,7 @@ class CacheProviderTest {
         var participantDeploy =
                 CommonTestData.createparticipantDeploy(cacheProvider.getParticipantId(), automationComposition);
         cacheProvider.initializeAutomationComposition(compositionId, automationComposition.getInstanceId(),
-                participantDeploy);
+                participantDeploy, UUID.randomUUID());
 
         var ac = cacheProvider.getAutomationComposition(automationComposition.getInstanceId());
         for (var element : ac.getElements().values()) {
@@ -161,7 +164,7 @@ class CacheProviderTest {
 
         // deploy again
         cacheProvider.initializeAutomationComposition(compositionId, automationComposition.getInstanceId(),
-                participantDeploy);
+                participantDeploy, UUID.randomUUID());
 
         // check UseState, OperationalState and OutProperties have not changed
         ac = cacheProvider.getAutomationComposition(automationComposition.getInstanceId());
@@ -203,9 +206,10 @@ class CacheProviderTest {
                 CommonTestData.getTestAutomationCompositions().getAutomationCompositionList().get(0);
         automationComposition.setCompositionId(compositionId);
         cacheProvider.addElementDefinition(compositionId,
-                CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition));
+                CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition),
+                UUID.randomUUID());
         for (var element : automationComposition.getElements().values()) {
-            var result = cacheProvider.createCompositionElementDto(compositionId, element, Map.of());
+            var result = cacheProvider.createCompositionElementDto(compositionId, element);
             assertEquals(compositionId, result.compositionId());
             assertEquals(element.getDefinition(), result.elementDefinitionId());
         }
@@ -220,7 +224,8 @@ class CacheProviderTest {
                 CommonTestData.getTestAutomationCompositions().getAutomationCompositionList().get(0);
         automationComposition.setCompositionId(compositionId);
         cacheProvider.addElementDefinition(compositionId,
-                CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition));
+                CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition),
+                UUID.randomUUID());
         var result = cacheProvider.getCompositionElementDtoMap(automationComposition);
         for (var element : automationComposition.getElements().values()) {
             var compositionElementDto = result.get(element.getId());
@@ -247,4 +252,49 @@ class CacheProviderTest {
             assertEquals(element.getId(), compositionElementDto.elementId());
         }
     }
+
+    @Test
+    void testIsCompositionDefinitionUpdated() {
+        var parameter = CommonTestData.getParticipantParameters();
+        var cacheProvider = new CacheProvider(parameter);
+        var compositionId = UUID.randomUUID();
+        assertTrue(cacheProvider.isCompositionDefinitionUpdated(compositionId, null));
+
+        var revisionId = UUID.randomUUID();
+        assertFalse(cacheProvider.isCompositionDefinitionUpdated(compositionId, revisionId));
+
+        var automationComposition =
+                CommonTestData.getTestAutomationCompositions().getAutomationCompositionList().get(0);
+        automationComposition.setCompositionId(compositionId);
+        cacheProvider.addElementDefinition(compositionId,
+                CommonTestData.createAutomationCompositionElementDefinitionList(automationComposition),
+                revisionId);
+        assertTrue(cacheProvider.isCompositionDefinitionUpdated(compositionId, revisionId));
+
+        revisionId = UUID.randomUUID();
+        assertFalse(cacheProvider.isCompositionDefinitionUpdated(compositionId, revisionId));
+    }
+
+    @Test
+    void testIsInstanceUpdated() {
+        var parameter = CommonTestData.getParticipantParameters();
+        var cacheProvider = new CacheProvider(parameter);
+        var instanceId = UUID.randomUUID();
+        assertTrue(cacheProvider.isInstanceUpdated(instanceId, null));
+        var revisionId = UUID.randomUUID();
+        assertFalse(cacheProvider.isInstanceUpdated(instanceId, revisionId));
+
+        var automationComposition =
+                CommonTestData.getTestAutomationCompositions().getAutomationCompositionList().get(0);
+        automationComposition.setInstanceId(instanceId);
+
+        var participantDeploy =
+                CommonTestData.createparticipantDeploy(cacheProvider.getParticipantId(), automationComposition);
+        cacheProvider.initializeAutomationComposition(UUID.randomUUID(), automationComposition.getInstanceId(),
+                participantDeploy, revisionId);
+        assertTrue(cacheProvider.isInstanceUpdated(instanceId, revisionId));
+
+        revisionId = UUID.randomUUID();
+        assertFalse(cacheProvider.isInstanceUpdated(instanceId, revisionId));
+    }
 }