Add sync messages support in ACM-intermediary 87/138287/3
authorFrancescoFioraEst <francesco.fiora@est.tech>
Wed, 19 Jun 2024 13:04:57 +0000 (14:04 +0100)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Thu, 20 Jun 2024 10:30:13 +0000 (11:30 +0100)
Issue-ID: POLICY-5048
Change-Id: I4d3a362251931820e1a481f780586afb9e2c60ed
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
22 files changed:
models/src/test/java/org/onap/policy/clamp/models/acm/utils/AcmUtilsTest.java
participant/participant-impl/participant-impl-kserve/src/test/java/org/onap/policy/clamp/acm/participant/kserve/handler/AcElementHandlerTest.java
participant/participant-impl/participant-impl-simulator/src/main/java/org/onap/policy/clamp/acm/participant/sim/main/handler/AutomationCompositionElementHandlerV1.java
participant/participant-impl/participant-impl-simulator/src/main/java/org/onap/policy/clamp/acm/participant/sim/main/handler/AutomationCompositionElementHandlerV2.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV1.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV2.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AutomationCompositionElementListenerV1.java [new file with mode: 0644]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantRestartListener.java [deleted file]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV1Test.java [changed mode: 0755->0644]
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AcElementListenerV2Test.java [changed mode: 0755->0644]
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AcDefinitionHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/DummyAcElementListener.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java

index 024060f..f17eff3 100644 (file)
@@ -47,7 +47,6 @@ import org.onap.policy.clamp.models.acm.document.concepts.DocToscaServiceTemplat
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
 import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.models.base.PfModelRuntimeException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaDataType;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
index ccdb31f..919562a 100644 (file)
@@ -42,7 +42,6 @@ import org.onap.policy.clamp.acm.participant.kserve.exception.KserveException;
 import org.onap.policy.clamp.acm.participant.kserve.k8s.KserveClient;
 import org.onap.policy.clamp.acm.participant.kserve.utils.CommonTestData;
 import org.onap.policy.clamp.acm.participant.kserve.utils.ToscaUtils;
-import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 
 class AcElementHandlerTest {
index 03a0517..aa9d90e 100644 (file)
@@ -26,9 +26,7 @@ import java.util.UUID;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
 import org.onap.policy.clamp.acm.participant.intermediary.api.impl.AcElementListenerV1;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
-import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.models.base.PfModelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
index 28bade2..eaad5d9 100644 (file)
@@ -25,11 +25,6 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElement
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
 import org.onap.policy.clamp.acm.participant.intermediary.api.impl.AcElementListenerV2;
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
-import org.onap.policy.clamp.models.acm.concepts.DeployState;
-import org.onap.policy.clamp.models.acm.concepts.LockState;
-import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
-import org.onap.policy.clamp.models.acm.utils.AcmUtils;
 import org.onap.policy.models.base.PfModelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
index 505f515..6f40392 100644 (file)
@@ -20,9 +20,6 @@
 
 package org.onap.policy.clamp.acm.participant.intermediary.api;
 
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
-import org.onap.policy.clamp.models.acm.concepts.DeployState;
-import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.models.base.PfModelException;
 
 /**
@@ -89,11 +86,6 @@ public interface AutomationCompositionElementListener {
 
     void deprime(CompositionDto composition) throws PfModelException;
 
-    void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException;
-
-    void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
-            DeployState deployState, LockState lockState) throws PfModelException;
-
     /**
      * Handle an update on a automation composition element.
      *
index 5d4e1fe..cf5ac41 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
 
+import jakarta.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -44,9 +45,12 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
  * Wrapper of AutomationCompositionElementListener.
  * Valid since 7.1.0 release.
  */
-public abstract class AcElementListenerV1 implements AutomationCompositionElementListener {
+public abstract class AcElementListenerV1
+        implements AutomationCompositionElementListener, AutomationCompositionElementListenerV1 {
     protected final ParticipantIntermediaryApi intermediaryApi;
 
+    private static final String NOT_SUPPORTED = "not supported!";
+
     protected AcElementListenerV1(ParticipantIntermediaryApi intermediaryApi) {
         this.intermediaryApi = intermediaryApi;
     }
@@ -64,23 +68,19 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
         deploy(instanceElement.instanceId(), element, properties);
     }
 
-    public abstract void deploy(UUID instanceId, AcElementDeploy element, Map<String, Object> properties)
-        throws PfModelException;
-
     @Override
     public void undeploy(CompositionElementDto compositionElement, InstanceElementDto instanceElement)
         throws PfModelException {
         undeploy(instanceElement.instanceId(), instanceElement.elementId());
     }
 
-    public abstract void undeploy(UUID instanceId, UUID elementId) throws PfModelException;
-
     @Override
     public void lock(CompositionElementDto compositionElement, InstanceElementDto instanceElement)
         throws PfModelException {
         lock(instanceElement.instanceId(), instanceElement.elementId());
     }
 
+    @Override
     public void lock(UUID instanceId, UUID elementId) throws PfModelException {
         intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
             StateChangeResult.NO_ERROR, "Locked");
@@ -92,6 +92,7 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
         unlock(instanceElement.instanceId(), instanceElement.elementId());
     }
 
+    @Override
     public void unlock(UUID instanceId, UUID elementId) throws PfModelException {
         intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
             StateChangeResult.NO_ERROR, "Unlocked");
@@ -103,6 +104,7 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
         delete(instanceElement.instanceId(), instanceElement.elementId());
     }
 
+    @Override
     public void delete(UUID instanceId, UUID elementId) throws PfModelException {
         intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DELETED, null,
             StateChangeResult.NO_ERROR, "Deleted");
@@ -150,6 +152,7 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
         prime(composition.compositionId(), createAcElementDefinitionList(composition));
     }
 
+    @Override
     public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
         throws PfModelException {
         intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Primed");
@@ -160,14 +163,14 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
         deprime(composition.compositionId());
     }
 
+    @Override
     public void deprime(UUID compositionId) throws PfModelException {
         intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.NO_ERROR,
             "Deprimed");
     }
 
-    @Override
     public void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException {
-        handleRestartComposition(composition.compositionId(), createAcElementDefinitionList(composition), state);
+        throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
     }
 
     /**
@@ -180,24 +183,12 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
      */
     public void handleRestartComposition(UUID compositionId,
         List<AutomationCompositionElementDefinition> elementDefinitionList, AcTypeState state) throws PfModelException {
-        switch (state) {
-            case PRIMING -> prime(compositionId, elementDefinitionList);
-            case DEPRIMING -> deprime(compositionId);
-            default ->
-                intermediaryApi.updateCompositionState(compositionId, state, StateChangeResult.NO_ERROR, "Restarted");
-        }
+        throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
     }
 
-    @Override
     public void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
         DeployState deployState, LockState lockState) throws PfModelException {
-        var element = new  AcElementDeploy();
-        element.setId(instanceElement.elementId());
-        element.setDefinition(compositionElement.elementDefinitionId());
-        element.setProperties(instanceElement.inProperties());
-        Map<String, Object> properties = new HashMap<>(instanceElement.inProperties());
-        properties.putAll(compositionElement.inProperties());
-        handleRestartInstance(instanceElement.instanceId(), element, properties, deployState, lockState);
+        throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
     }
 
     /**
@@ -212,33 +203,8 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
      */
     public void handleRestartInstance(UUID instanceId, AcElementDeploy element,
         Map<String, Object> properties, DeployState deployState, LockState lockState) throws PfModelException {
+        throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
 
-        if (DeployState.DEPLOYING.equals(deployState)) {
-            deploy(instanceId, element, properties);
-            return;
-        }
-        if (DeployState.UNDEPLOYING.equals(deployState)) {
-            undeploy(instanceId, element.getId());
-            return;
-        }
-        if (DeployState.UPDATING.equals(deployState)) {
-            update(instanceId, element, properties);
-            return;
-        }
-        if (DeployState.DELETING.equals(deployState)) {
-            delete(instanceId, element.getId());
-            return;
-        }
-        if (LockState.LOCKING.equals(lockState)) {
-            lock(instanceId, element.getId());
-            return;
-        }
-        if (LockState.UNLOCKING.equals(lockState)) {
-            unlock(instanceId, element.getId());
-            return;
-        }
-        intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
-            deployState, lockState, StateChangeResult.NO_ERROR, "Restarted");
     }
 
     @Override
@@ -252,6 +218,7 @@ public abstract class AcElementListenerV1 implements AutomationCompositionElemen
             element.getProperties());
     }
 
+    @Override
     public void migrate(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
                         Map<String, Object> properties) throws PfModelException {
         intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(),
index daf9d6e..3fe3319 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
 
+import jakarta.ws.rs.core.Response;
 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
@@ -38,6 +39,8 @@ import org.onap.policy.models.base.PfModelException;
 public abstract class AcElementListenerV2 implements AutomationCompositionElementListener {
     protected final ParticipantIntermediaryApi intermediaryApi;
 
+    private static final String NOT_SUPPORTED = "not supported!";
+
     protected AcElementListenerV2(ParticipantIntermediaryApi intermediaryApi) {
         this.intermediaryApi = intermediaryApi;
     }
@@ -84,46 +87,13 @@ public abstract class AcElementListenerV2 implements AutomationCompositionElemen
             StateChangeResult.NO_ERROR, "Deprimed");
     }
 
-    @Override
     public void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException {
-        switch (state) {
-            case PRIMING -> prime(composition);
-            case DEPRIMING -> deprime(composition);
-            default -> intermediaryApi
-                .updateCompositionState(composition.compositionId(), state, StateChangeResult.NO_ERROR, "Restarted");
-        }
+        throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
     }
 
-    @Override
     public void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
                                       DeployState deployState, LockState lockState) throws PfModelException {
-
-        if (DeployState.DEPLOYING.equals(deployState)) {
-            deploy(compositionElement, instanceElement);
-            return;
-        }
-        if (DeployState.UNDEPLOYING.equals(deployState)) {
-            undeploy(compositionElement, instanceElement);
-            return;
-        }
-        if (DeployState.UPDATING.equals(deployState)) {
-            update(compositionElement, instanceElement, instanceElement);
-            return;
-        }
-        if (DeployState.DELETING.equals(deployState)) {
-            delete(compositionElement, instanceElement);
-            return;
-        }
-        if (LockState.LOCKING.equals(lockState)) {
-            lock(compositionElement, instanceElement);
-            return;
-        }
-        if (LockState.UNLOCKING.equals(lockState)) {
-            unlock(compositionElement, instanceElement);
-            return;
-        }
-        intermediaryApi.updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), deployState, lockState, StateChangeResult.NO_ERROR, "Restarted");
+        throw new PfModelException(Response.Status.BAD_REQUEST, NOT_SUPPORTED);
     }
 
     @Override
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AutomationCompositionElementListenerV1.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/impl/AutomationCompositionElementListenerV1.java
new file mode 100644 (file)
index 0000000..007ba3d
--- /dev/null
@@ -0,0 +1,53 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.models.base.PfModelException;
+
+public interface AutomationCompositionElementListenerV1 {
+
+    void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+    void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+            throws PfModelException;
+
+    void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+    void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+    void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+
+    void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+            throws PfModelException;
+
+    void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
+            throws PfModelException;
+
+    void deprime(UUID compositionId) throws PfModelException;
+
+    void migrate(UUID instanceId, AcElementDeploy element, UUID compositionTargetId,
+            Map<String, Object> properties) throws PfModelException;
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantRestartListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantRestartListener.java
deleted file mode 100644 (file)
index fd59b02..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.clamp.acm.participant.intermediary.comm;
-
-import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
-import org.springframework.stereotype.Component;
-
-@Component
-public class ParticipantRestartListener extends ParticipantListener<ParticipantRestart> {
-
-    /**
-     * Constructs the object.
-     *
-     * @param participantHandler the handler for managing the state of the participant
-     */
-    public ParticipantRestartListener(ParticipantHandler participantHandler) {
-        super(ParticipantRestart.class, participantHandler, participantHandler::handleParticipantRestart);
-    }
-
-    @Override
-    public String getType() {
-        return ParticipantMessageType.PARTICIPANT_RESTART.name();
-    }
-}
index d3ad4cf..b38df51 100644 (file)
@@ -34,7 +34,7 @@ import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -107,32 +107,33 @@ public class AcDefinitionHandler {
     }
 
     /**
-     * Handle a ParticipantRestart message.
+     * Handle a Participant Sync message.
      *
-     * @param participantRestartMsg the participantRestart message
+     * @param participantSyncMsg the participantRestart message
      */
-    public void handleParticipantRestart(ParticipantRestart participantRestartMsg) {
-        List<AutomationCompositionElementDefinition> list = new ArrayList<>();
-        for (var participantDefinition : participantRestartMsg.getParticipantDefinitionUpdates()) {
-            list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList());
+    public void handleParticipantSync(ParticipantSync participantSyncMsg) {
+
+        if (participantSyncMsg.isDelete()) {
+            if (AcTypeState.COMMISSIONED.equals(participantSyncMsg.getState())) {
+                cacheProvider.removeElementDefinition(participantSyncMsg.getCompositionId());
+            }
+            for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
+                cacheProvider.removeAutomationComposition(automationcomposition.getAutomationCompositionId());
+            }
+            return;
         }
-        if (!AcTypeState.COMMISSIONED.equals(participantRestartMsg.getState())) {
-            cacheProvider.addElementDefinition(participantRestartMsg.getCompositionId(), list);
+
+        if (!participantSyncMsg.getParticipantDefinitionUpdates().isEmpty()) {
+            List<AutomationCompositionElementDefinition> list = new ArrayList<>();
+            for (var participantDefinition : participantSyncMsg.getParticipantDefinitionUpdates()) {
+                list.addAll(participantDefinition.getAutomationCompositionElementDefinitionList());
+            }
+            cacheProvider.addElementDefinition(participantSyncMsg.getCompositionId(), list);
         }
 
-        for (var automationcomposition : participantRestartMsg.getAutomationcompositionList()) {
+        for (var automationcomposition : participantSyncMsg.getAutomationcompositionList()) {
             cacheProvider
-                    .initializeAutomationComposition(participantRestartMsg.getCompositionId(), automationcomposition);
+                    .initializeAutomationComposition(participantSyncMsg.getCompositionId(), automationcomposition);
         }
-        var inPropertiesMap = list.stream().collect(Collectors.toMap(
-                AutomationCompositionElementDefinition::getAcElementDefinitionId,
-                el -> el.getAutomationCompositionElementToscaNodeTemplate().getProperties()));
-        var outPropertiesMap = list.stream().collect(Collectors.toMap(
-                AutomationCompositionElementDefinition::getAcElementDefinitionId,
-                AutomationCompositionElementDefinition::getOutProperties));
-        var composition =
-                new CompositionDto(participantRestartMsg.getCompositionId(), inPropertiesMap, outPropertiesMap);
-        listener.restarted(participantRestartMsg.getMessageId(), composition, participantRestartMsg.getState(),
-                participantRestartMsg.getAutomationcompositionList());
     }
 }
index b85a3c3..7a00e08 100644 (file)
@@ -191,6 +191,9 @@ public class CacheProvider {
             ParticipantRestartAc participantRestartAc) {
         Map<UUID, AutomationCompositionElement> acElementMap = new LinkedHashMap<>();
         for (var element : participantRestartAc.getAcElementList()) {
+            if (!getParticipantId().equals(element.getParticipantId())) {
+                continue;
+            }
             var acElement = new AutomationCompositionElement();
             acElement.setId(element.getId());
             acElement.setParticipantId(getParticipantId());
@@ -201,12 +204,13 @@ public class CacheProvider {
             acElement.setUseState(element.getUseState());
             acElement.setProperties(element.getProperties());
             acElement.setOutProperties(element.getOutProperties());
-            acElement.setRestarting(true);
             acElementMap.put(element.getId(), acElement);
         }
 
         var automationComposition = new AutomationComposition();
         automationComposition.setCompositionId(compositionId);
+        automationComposition.setDeployState(participantRestartAc.getDeployState());
+        automationComposition.setLockState(participantRestartAc.getLockState());
         automationComposition.setInstanceId(participantRestartAc.getAutomationCompositionId());
         automationComposition.setElements(acElementMap);
         automationCompositions.put(automationComposition.getInstanceId(), automationComposition);
index caa2c56..5ae8f04 100644 (file)
@@ -36,7 +36,6 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMe
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegisterAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
@@ -197,18 +196,6 @@ public class ParticipantHandler {
         acDefinitionHandler.handlePrime(participantPrimeMsg);
     }
 
-    /**
-     * Handle a ParticipantRestart message.
-     *
-     * @param participantRestartMsg the participantRestart message
-     */
-    @Timed(value = "listener.participant_restart", description = "PARTICIPANT_RESTART messages received")
-    public void handleParticipantRestart(ParticipantRestart participantRestartMsg) {
-        LOGGER.debug("ParticipantRestart message received for participantId {}",
-                participantRestartMsg.getParticipantId());
-        acDefinitionHandler.handleParticipantRestart(participantRestartMsg);
-    }
-
     /**
      * Handle a ParticipantSync message.
      *
@@ -216,8 +203,12 @@ public class ParticipantHandler {
      */
     @Timed(value = "listener.participant_sync_msg", description = "PARTICIPANT_SYNC messages received")
     public void handleParticipantSync(ParticipantSync participantSyncMsg) {
-        LOGGER.debug("ParticipantSync message received for participantId {}",
-                participantSyncMsg.getParticipantId());
+        if (participantSyncMsg.getExcludeReplicas().contains(cacheProvider.getReplicaId())) {
+            LOGGER.debug("Ignore ParticipantSync message {}", participantSyncMsg.getMessageId());
+            return;
+        }
+        LOGGER.debug("ParticipantSync message received for participantId {}", participantSyncMsg.getParticipantId());
+        acDefinitionHandler.handleParticipantSync(participantSyncMsg);
     }
 
     /**
index 9f3e167..00e0044 100644 (file)
@@ -23,7 +23,6 @@ package org.onap.policy.clamp.acm.participant.intermediary.handler;
 import io.opentelemetry.context.Context;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,7 +38,6 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantInterme
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.models.base.PfModelException;
 import org.slf4j.Logger;
@@ -277,55 +275,6 @@ public class ThreadHandler implements Closeable {
         }
     }
 
-    /**
-     * Handles restarted scenario.
-     *
-     * @param messageId the messageId
-     * @param composition the composition
-     * @param state the state of the composition
-     * @param automationCompositionList list of ParticipantRestartAc
-     */
-    public void restarted(UUID messageId, CompositionDto composition,
-            AcTypeState state, List<ParticipantRestartAc> automationCompositionList) {
-        try {
-            listener.handleRestartComposition(composition, state);
-        } catch (PfModelException e) {
-            LOGGER.error("Composition Defintion restarted failed {} {}", composition.compositionId(), e.getMessage());
-            intermediaryApi.updateCompositionState(composition.compositionId(), state, StateChangeResult.FAILED,
-                    "Composition Defintion restarted failed");
-        }
-
-        for (var automationComposition : automationCompositionList) {
-            for (var element : automationComposition.getAcElementList()) {
-                var compositionElement = new CompositionElementDto(composition.compositionId(),
-                    element.getDefinition(), composition.inPropertiesMap().get(element.getDefinition()),
-                    composition.outPropertiesMap().get(element.getDefinition()));
-                var instanceElementDto = new InstanceElementDto(automationComposition.getAutomationCompositionId(),
-                    element.getId(), element.getToscaServiceTemplateFragment(),
-                    element.getProperties(), element.getOutProperties());
-                cleanExecution(element.getId(), messageId);
-                var result = executor.submit(() ->
-                    this.restartedInstanceProcess(compositionElement, instanceElementDto,
-                        element.getDeployState(), element.getLockState()));
-                executionMap.put(element.getId(), result);
-            }
-        }
-    }
-
-    private void restartedInstanceProcess(CompositionElementDto compositionElement,
-        InstanceElementDto instanceElementDto, DeployState deployState, LockState lockState) {
-        try {
-            listener.handleRestartInstance(compositionElement, instanceElementDto, deployState, lockState);
-            executionMap.remove(instanceElementDto.elementId());
-        } catch (PfModelException e) {
-            LOGGER.error("Automation composition element deploy failed {} {}",
-                instanceElementDto.elementId(), e.getMessage());
-            intermediaryApi.updateAutomationCompositionElementState(instanceElementDto.instanceId(),
-                instanceElementDto.elementId(), deployState, lockState, StateChangeResult.FAILED,
-                    "Automation composition element restart failed");
-        }
-    }
-
     /**
      * Closes this stream and releases any system resources associated
      * with it. If the stream is already closed then invoking this
old mode 100755 (executable)
new mode 100644 (file)
index a60e1b8..7355b03
@@ -20,8 +20,8 @@
 
 package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
@@ -51,11 +51,6 @@ class AcElementListenerV1Test {
         var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
         acElementListenerV1.deploy(compositionElement, instanceElement);
         verify(acElementListenerV1).deploy(any(), any(), any());
-
-        clearInvocations(acElementListenerV1);
-        acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DEPLOYING, LockState.NONE);
-        verify(acElementListenerV1).deploy(any(), any(), any());
     }
 
     @Test
@@ -66,11 +61,6 @@ class AcElementListenerV1Test {
         var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
         acElementListenerV1.undeploy(compositionElement, instanceElement);
         verify(acElementListenerV1).undeploy(instanceElement.instanceId(), instanceElement.elementId());
-
-        clearInvocations(acElementListenerV1);
-        acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.UNDEPLOYING, LockState.NONE);
-        verify(acElementListenerV1).undeploy(instanceElement.instanceId(), instanceElement.elementId());
     }
 
     @Test
@@ -147,66 +137,17 @@ class AcElementListenerV1Test {
     }
 
     @Test
-    void handleRestartComposition() throws PfModelException {
-        var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        var acElementListenerV1 = createAcElementListenerV1(intermediaryApi);
-        var compositionId = UUID.randomUUID();
-        var toscaConceptIdentifier = new ToscaConceptIdentifier();
-        var composition = new CompositionDto(compositionId, Map.of(toscaConceptIdentifier, Map.of()), Map.of());
-
-        acElementListenerV1.handleRestartComposition(composition, AcTypeState.PRIMED);
-        verify(intermediaryApi)
-            .updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Restarted");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV1.handleRestartComposition(composition, AcTypeState.PRIMING);
-        verify(intermediaryApi)
-            .updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Primed");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV1.handleRestartComposition(composition, AcTypeState.DEPRIMING);
-        verify(intermediaryApi)
-            .updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.NO_ERROR, "Deprimed");
+    void handleRestartComposition() {
+        var acElementListenerV1 = createAcElementListenerV1(mock(ParticipantIntermediaryApi.class));
+        assertThatThrownBy(() -> acElementListenerV1.handleRestartComposition(null, null))
+                .isInstanceOf(PfModelException.class);
     }
 
     @Test
-    void handleRestartInstance() throws PfModelException {
-        var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        var acElementListenerV1 = createAcElementListenerV1(intermediaryApi);
-        var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
-            Map.of(), Map.of());
-        var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
-
-        acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DEPLOYED, LockState.LOCKED);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), DeployState.DEPLOYED, LockState.LOCKED,
-            StateChangeResult.NO_ERROR, "Restarted");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DEPLOYED, LockState.LOCKING);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.NO_ERROR, "Locked");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DEPLOYED, LockState.UNLOCKING);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.NO_ERROR, "Unlocked");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.UPDATING, LockState.LOCKED);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), DeployState.DEPLOYED, null,
-            StateChangeResult.NO_ERROR, "Update not supported");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV1.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DELETING, LockState.NONE);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), DeployState.DELETED, null, StateChangeResult.NO_ERROR, "Deleted");
+    void handleRestartInstance() {
+        var acElementListenerV1 = createAcElementListenerV1(mock(ParticipantIntermediaryApi.class));
+        assertThatThrownBy(() -> acElementListenerV1.handleRestartInstance(null, null,
+                null, null)).isInstanceOf(PfModelException.class);
     }
 
     @Test
old mode 100755 (executable)
new mode 100644 (file)
index c36e11d..c8ab9e2
 
 package org.onap.policy.clamp.acm.participant.intermediary.api.impl;
 
-import static org.mockito.Mockito.clearInvocations;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import java.util.Map;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
-import org.mockito.Answers;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
@@ -41,28 +40,6 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
 class AcElementListenerV2Test {
 
-    @Test
-    void deployTest() throws PfModelException {
-        var acElementListenerV2 = mock(AcElementListenerV2.class, Answers.CALLS_REAL_METHODS);
-        var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
-            Map.of(), Map.of());
-        var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
-        acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DEPLOYING, LockState.NONE);
-        verify(acElementListenerV2).deploy(compositionElement, instanceElement);
-    }
-
-    @Test
-    void undeployTest() throws PfModelException {
-        var acElementListenerV2 = mock(AcElementListenerV2.class, Answers.CALLS_REAL_METHODS);
-        var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
-            Map.of(), Map.of());
-        var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
-        acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.UNDEPLOYING, LockState.NONE);
-        verify(acElementListenerV2).undeploy(compositionElement, instanceElement);
-    }
-
     @Test
     void lockTest() throws PfModelException {
         var intermediaryApi = mock(ParticipantIntermediaryApi.class);
@@ -137,66 +114,17 @@ class AcElementListenerV2Test {
     }
 
     @Test
-    void handleRestartComposition() throws PfModelException {
-        var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        var acElementListenerV2 = createAcElementListenerV2(intermediaryApi);
-        var compositionId = UUID.randomUUID();
-        var toscaConceptIdentifier = new ToscaConceptIdentifier();
-        var composition = new CompositionDto(compositionId, Map.of(toscaConceptIdentifier, Map.of()), Map.of());
-
-        acElementListenerV2.handleRestartComposition(composition, AcTypeState.PRIMED);
-        verify(intermediaryApi)
-            .updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Restarted");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV2.handleRestartComposition(composition, AcTypeState.PRIMING);
-        verify(intermediaryApi)
-            .updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.NO_ERROR, "Primed");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV2.handleRestartComposition(composition, AcTypeState.DEPRIMING);
-        verify(intermediaryApi)
-            .updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.NO_ERROR, "Deprimed");
+    void handleRestartComposition() {
+        var acElementListenerV2 = createAcElementListenerV2(mock(ParticipantIntermediaryApi.class));
+        assertThatThrownBy(() -> acElementListenerV2.handleRestartComposition(null, null))
+                .isInstanceOf(PfModelException.class);
     }
 
     @Test
-    void handleRestartInstance() throws PfModelException {
-        var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        var acElementListenerV2 = createAcElementListenerV2(intermediaryApi);
-        var compositionElement = new CompositionElementDto(UUID.randomUUID(), new ToscaConceptIdentifier(),
-            Map.of(), Map.of());
-        var instanceElement = new InstanceElementDto(UUID.randomUUID(), UUID.randomUUID(), null, Map.of(), Map.of());
-
-        acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DEPLOYED, LockState.LOCKED);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), DeployState.DEPLOYED, LockState.LOCKED,
-            StateChangeResult.NO_ERROR, "Restarted");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DEPLOYED, LockState.LOCKING);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), null, LockState.LOCKED, StateChangeResult.NO_ERROR, "Locked");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DEPLOYED, LockState.UNLOCKING);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), null, LockState.UNLOCKED, StateChangeResult.NO_ERROR, "Unlocked");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.UPDATING, LockState.LOCKED);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), DeployState.DEPLOYED, null,
-            StateChangeResult.NO_ERROR, "Update not supported");
-
-        clearInvocations(intermediaryApi);
-        acElementListenerV2.handleRestartInstance(compositionElement, instanceElement,
-            DeployState.DELETING, LockState.NONE);
-        verify(intermediaryApi).updateAutomationCompositionElementState(instanceElement.instanceId(),
-            instanceElement.elementId(), DeployState.DELETED, null, StateChangeResult.NO_ERROR, "Deleted");
+    void handleRestartInstance() {
+        var acElementListenerV2 = createAcElementListenerV2(mock(ParticipantIntermediaryApi.class));
+        assertThatThrownBy(() -> acElementListenerV2.handleRestartInstance(null, null,
+                null, null)).isInstanceOf(PfModelException.class);
     }
 
     @Test
index 3332512..cfc5c6f 100644 (file)
@@ -77,10 +77,6 @@ class ParticipantCommTest {
         assertEquals(ParticipantMessageType.AUTOMATION_COMPOSITION_STATE_CHANGE.name(),
                 automationCompositionStateChangeListener.getType());
 
-        var participantRestartListener = new ParticipantRestartListener(participantHandler);
-        assertEquals(ParticipantMessageType.PARTICIPANT_RESTART.name(),
-                participantRestartListener.getType());
-
         var participantSyncListener = new ParticipantSyncListener(participantHandler);
         assertEquals(ParticipantMessageType.PARTICIPANT_SYNC_MSG.name(),
                 participantSyncListener.getType());
index 0c73e87..c6259a2 100644 (file)
@@ -37,7 +37,7 @@ import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
 class AcDefinitionHandlerTest {
@@ -94,18 +94,36 @@ class AcDefinitionHandlerTest {
     }
 
     @Test
-    void restartedTest() {
-        var participantRestartMsg = new ParticipantRestart();
-        participantRestartMsg.setState(AcTypeState.PRIMED);
-        participantRestartMsg.setCompositionId(UUID.randomUUID());
-        participantRestartMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
-        participantRestartMsg.setAutomationcompositionList(List.of(CommonTestData.createParticipantRestartAc()));
+    void syncTest() {
+        var participantSyncMsg = new ParticipantSync();
+        participantSyncMsg.setState(AcTypeState.PRIMED);
+        participantSyncMsg.setCompositionId(UUID.randomUUID());
+        participantSyncMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
+        participantSyncMsg.setAutomationcompositionList(List.of(CommonTestData.createParticipantRestartAc()));
 
         var cacheProvider = mock(CacheProvider.class);
         var listener = mock(ThreadHandler.class);
         var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
-        ach.handleParticipantRestart(participantRestartMsg);
+        ach.handleParticipantSync(participantSyncMsg);
         verify(cacheProvider).initializeAutomationComposition(any(UUID.class), any());
         verify(cacheProvider).addElementDefinition(any(), any());
     }
+
+    @Test
+    void syncDeleteTest() {
+        var participantSyncMsg = new ParticipantSync();
+        participantSyncMsg.setState(AcTypeState.COMMISSIONED);
+        participantSyncMsg.setDelete(true);
+        participantSyncMsg.setCompositionId(UUID.randomUUID());
+        participantSyncMsg.getParticipantDefinitionUpdates().add(createParticipantDefinition());
+        var restartAc = CommonTestData.createParticipantRestartAc();
+        participantSyncMsg.setAutomationcompositionList(List.of(restartAc));
+
+        var cacheProvider = mock(CacheProvider.class);
+        var listener = mock(ThreadHandler.class);
+        var ach = new AcDefinitionHandler(cacheProvider, mock(ParticipantMessagePublisher.class), listener);
+        ach.handleParticipantSync(participantSyncMsg);
+        verify(cacheProvider).removeElementDefinition(participantSyncMsg.getCompositionId());
+        verify(cacheProvider).removeAutomationComposition(restartAc.getAutomationCompositionId());
+    }
 }
index 24935c1..173ed03 100644 (file)
@@ -24,9 +24,6 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationComposit
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
-import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
-import org.onap.policy.clamp.models.acm.concepts.DeployState;
-import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.models.base.PfModelException;
 
 public class DummyAcElementListener implements AutomationCompositionElementListener {
@@ -73,15 +70,6 @@ public class DummyAcElementListener implements AutomationCompositionElementListe
     public void deprime(CompositionDto composition) throws PfModelException {
     }
 
-    @Override
-    public void handleRestartComposition(CompositionDto composition, AcTypeState state) throws PfModelException {
-    }
-
-    @Override
-    public void handleRestartInstance(CompositionElementDto compositionElement, InstanceElementDto instanceElement,
-        DeployState deployState, LockState lockState) throws PfModelException {
-    }
-
     @Override
     public void migrate(CompositionElementDto compositionElement, CompositionElementDto compositionElementTarget,
                         InstanceElementDto instanceElement, InstanceElementDto instanceElementMigrate)
index eb1db47..8c2b247 100644 (file)
@@ -47,9 +47,9 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMe
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrime;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegisterAck;
-import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
@@ -203,18 +203,20 @@ class ParticipantHandlerTest {
 
     @Test
     void handleParticipantRestartTest() {
-        var participantRestartMsg = new ParticipantRestart();
-        participantRestartMsg.setState(AcTypeState.PRIMED);
-        participantRestartMsg.setCompositionId(UUID.randomUUID());
+        var participantSyncMsg = new ParticipantSync();
+        participantSyncMsg.setState(AcTypeState.PRIMED);
+        participantSyncMsg.setCompositionId(UUID.randomUUID());
+        participantSyncMsg.setReplicaId(CommonTestData.getReplicaId());
 
         var cacheProvider = mock(CacheProvider.class);
+        when(cacheProvider.getReplicaId()).thenReturn(CommonTestData.getReplicaId());
         var publisher = mock(ParticipantMessagePublisher.class);
         var acHandler = mock(AcDefinitionHandler.class);
         var participantHandler = new ParticipantHandler(mock(AutomationCompositionHandler.class),
                 mock(AcLockHandler.class), acHandler, publisher, cacheProvider);
 
-        participantHandler.handleParticipantRestart(participantRestartMsg);
-        verify(acHandler).handleParticipantRestart(participantRestartMsg);
+        participantHandler.handleParticipantSync(participantSyncMsg);
+        verify(acHandler).handleParticipantSync(participantSyncMsg);
     }
 
     @Test
index e4c3484..f3471e6 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.onap.policy.clamp.acm.participant.intermediary.handler;
 
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -29,7 +28,6 @@ import static org.mockito.Mockito.verify;
 
 import jakarta.ws.rs.core.Response.Status;
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
@@ -39,11 +37,9 @@ import org.onap.policy.clamp.acm.participant.intermediary.api.CompositionElement
 import org.onap.policy.clamp.acm.participant.intermediary.api.InstanceElementDto;
 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
-import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.models.base.PfModelException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -189,30 +185,6 @@ class ThreadHandlerTest {
             threadHandler.deprime(messageId, composition);
             verify(intermediaryApi, timeout(TIMEOUT)).updateCompositionState(compositionId, AcTypeState.PRIMED,
                     StateChangeResult.FAILED, "Composition Defintion deprime failed");
-
-            clearInvocations(listener);
-            doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener)
-                    .handleRestartComposition(composition, AcTypeState.PRIMING);
-            threadHandler.restarted(messageId, composition, AcTypeState.PRIMING, List.of());
-            verify(intermediaryApi).updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
-                    "Composition Defintion deprime failed");
-        }
-    }
-
-    @Test
-    void testRestarted() throws IOException, PfModelException {
-        var listener = mock(AutomationCompositionElementListener.class);
-        var intermediaryApi = mock(ParticipantIntermediaryApi.class);
-        var cacheProvider = mock(CacheProvider.class);
-        try (var threadHandler = new ThreadHandler(listener, intermediaryApi, cacheProvider)) {
-            var messageId = UUID.randomUUID();
-            var compositionId = UUID.randomUUID();
-            var participantRestartAc = new ParticipantRestartAc();
-            participantRestartAc.setAutomationCompositionId(UUID.randomUUID());
-            participantRestartAc.getAcElementList().add(new AcElementRestart());
-            var composition = new CompositionDto(compositionId, Map.of(), Map.of());
-            threadHandler.restarted(messageId, composition, AcTypeState.PRIMED, List.of(participantRestartAc));
-            verify(listener, timeout(TIMEOUT)).handleRestartInstance(any(), any(), any(), any());
         }
     }
 }
index e8cafa9..b6b95ba 100644 (file)
@@ -184,10 +184,6 @@ public class CommonTestData {
         return REPLICA_ID;
     }
 
-    public static UUID getRndParticipantId() {
-        return UUID.randomUUID();
-    }
-
     public static ToscaConceptIdentifier getDefinition() {
         return new ToscaConceptIdentifier("org.onap.domain.pmsh.PMSH_DCAEMicroservice", "1.2.3");
     }
@@ -196,8 +192,6 @@ public class CommonTestData {
      * Returns a Map of ToscaConceptIdentifier and AutomationComposition for test cases.
      *
      * @return automationCompositionMap
-     *
-     * @throws CoderException if there is an error with .json file.
      */
     public static Map<UUID, AutomationComposition> getTestAutomationCompositionMap() {
         var automationCompositions = getTestAutomationCompositions();
@@ -211,8 +205,6 @@ public class CommonTestData {
      * Returns List of AutomationComposition for test cases.
      *
      * @return AutomationCompositions
-     *
-     * @throws CoderException if there is an error with .json file.
      */
     public static AutomationCompositions getTestAutomationCompositions() {
         try {
@@ -257,8 +249,11 @@ public class CommonTestData {
     public static ParticipantRestartAc createParticipantRestartAc() {
         var participantRestartAc = new ParticipantRestartAc();
         participantRestartAc.setAutomationCompositionId(AC_ID_0);
+        participantRestartAc.setDeployState(DeployState.DEPLOYED);
+        participantRestartAc.setLockState(LockState.LOCKED);
         var acElementRestart = new AcElementRestart();
         acElementRestart.setDefinition(getDefinition());
+        acElementRestart.setParticipantId(PARTCICIPANT_ID);
         acElementRestart.setDeployState(DeployState.DEPLOYED);
         acElementRestart.setLockState(LockState.LOCKED);
         acElementRestart.setOperationalState("OperationalState");
index 3fe46a9..96d6338 100644 (file)
@@ -28,7 +28,6 @@ import lombok.AllArgsConstructor;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
 import org.slf4j.Logger;