Add support for sync messages in ACM-runtime 79/138279/1
authorFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 18 Jun 2024 15:32:30 +0000 (16:32 +0100)
committerFrancesco Fiora <francesco.fiora@est.tech>
Wed, 19 Jun 2024 08:16:09 +0000 (08:16 +0000)
Issue-ID: POLICY-5035
Change-Id: Ibcf1c6a414a7ba9d1cafd42041551bb0fb198088
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
34 files changed:
models/src/main/java/org/onap/policy/clamp/models/acm/concepts/Participant.java
models/src/main/java/org/onap/policy/clamp/models/acm/concepts/ParticipantRestartAc.java
models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java
models/src/main/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipant.java
models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java
models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantInformationTest.java
models/src/test/java/org/onap/policy/clamp/models/acm/concepts/ParticipantTest.java
models/src/test/java/org/onap/policy/clamp/models/acm/persistence/concepts/JpaParticipantTest.java
models/src/test/java/org/onap/policy/clamp/models/acm/utils/AcmUtilsTest.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProvider.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProvider.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/participants/AcmParticipantProvider.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspect.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandler.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScanner.java [moved from runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionPartecipantScanner.java with 65% similarity]
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScanner.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantPrimePublisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantStatusReqPublisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/commissioning/CommissioningProviderTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/AutomationCompositionInstantiationProviderTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/instantiation/rest/InstantiationControllerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAcHandlerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionAspectTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionHandlerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantHandlerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionParticipantScannerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/SupervisionScannerTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/util/CommonTestData.java

index 6ddec61..457eb65 100644 (file)
@@ -40,12 +40,6 @@ public class Participant {
     @NonNull
     private UUID participantId;
 
-    @NonNull
-    private ParticipantState participantState = ParticipantState.ON_LINE;
-
-    @NonNull
-    private String lastMsg;
-
     @NonNull
     private Map<UUID, ParticipantSupportedElementType> participantSupportedElementTypes = new HashMap<>();
 
@@ -58,9 +52,7 @@ public class Participant {
      * @param otherParticipant the participant to copy from
      */
     public Participant(Participant otherParticipant) {
-        this.participantState = otherParticipant.participantState;
         this.participantId = otherParticipant.participantId;
-        this.lastMsg = otherParticipant.lastMsg;
         this.participantSupportedElementTypes = PfUtils.mapMap(otherParticipant.getParticipantSupportedElementTypes(),
                 ParticipantSupportedElementType::new);
         this.replicas = PfUtils.mapMap(otherParticipant.replicas, ParticipantReplica::new);
index e5f4ad4..3312752 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation.
+ *  Copyright (C) 2023-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -37,6 +37,9 @@ public class ParticipantRestartAc {
 
     private UUID automationCompositionId;
 
+    private DeployState deployState;
+    private LockState lockState;
+
     private List<AcElementRestart> acElementList = new ArrayList<>();
 
     /**
@@ -46,6 +49,8 @@ public class ParticipantRestartAc {
      */
     public ParticipantRestartAc(ParticipantRestartAc copyConstructor) {
         this.automationCompositionId = copyConstructor.automationCompositionId;
+        this.deployState = copyConstructor.deployState;
+        this.lockState = copyConstructor.lockState;
         this.acElementList = PfUtils.mapList(copyConstructor.acElementList, AcElementRestart::new);
     }
 }
index 98c7d10..ff9755e 100644 (file)
@@ -36,7 +36,7 @@ import org.onap.policy.models.base.PfUtils;
 public class ParticipantRestart extends ParticipantMessage {
 
     // composition state
-    AcTypeState state;
+    private AcTypeState state;
 
     // element definition
     private List<ParticipantDefinition> participantDefinitionUpdates = new ArrayList<>();
index f35fff9..5bc2fc4 100644 (file)
@@ -32,7 +32,6 @@ import jakarta.persistence.JoinColumn;
 import jakarta.persistence.OneToMany;
 import jakarta.persistence.Table;
 import java.io.Serializable;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -44,8 +43,6 @@ import org.apache.commons.lang3.ObjectUtils;
 import org.hibernate.annotations.LazyCollection;
 import org.hibernate.annotations.LazyCollectionOption;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
-import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 import org.onap.policy.common.parameters.annotations.NotNull;
 import org.onap.policy.common.parameters.annotations.Valid;
 import org.onap.policy.models.base.PfAuthorative;
@@ -69,52 +66,43 @@ public class JpaParticipant extends Validated
     @NotNull
     private String participantId;
 
-    @Column
-    @NotNull
-    private ParticipantState participantState;
-
     @Column
     private String description;
 
-    @Column
-    @NotNull
-    private Timestamp lastMsg;
-
     @NotNull
     @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
     @JoinColumn(name = "participantId", referencedColumnName = "participantId",
         foreignKey = @ForeignKey(name = "supported_element_fk"))
+    @SuppressWarnings("squid:S1948")
     private List<@NotNull @Valid JpaParticipantSupportedElementType> supportedElements;
 
     @NotNull
-    @OneToMany
+    @OneToMany(fetch = FetchType.LAZY, cascade = CascadeType.ALL)
     @LazyCollection(LazyCollectionOption.FALSE)
     @JoinColumn(name = "participantId", referencedColumnName = "participantId",
             foreignKey = @ForeignKey(name = "participant_replica_fk"))
+    @SuppressWarnings("squid:S1948")
     private List<@NotNull @Valid JpaParticipantReplica> replicas;
 
     /**
      * The Default Constructor creates a {@link JpaParticipant} object with a null key.
      */
     public JpaParticipant() {
-        this(UUID.randomUUID().toString(), ParticipantState.ON_LINE, new ArrayList<>(), new ArrayList<>());
+        this(UUID.randomUUID().toString(), new ArrayList<>(), new ArrayList<>());
     }
 
     /**
      * The Key Constructor creates a {@link JpaParticipant} object with all mandatory fields.
      *
      * @param participantId the participant id
-     * @param participantState the state of the participant
      * @param supportedElements the list of supported Element Type
      * @param replicas the list of replica
      */
-    public JpaParticipant(@NonNull String participantId, @NonNull final ParticipantState participantState,
+    public JpaParticipant(@NonNull String participantId,
             @NonNull final List<JpaParticipantSupportedElementType> supportedElements,
             @NonNull final List<JpaParticipantReplica> replicas) {
         this.participantId = participantId;
-        this.participantState = participantState;
         this.supportedElements = supportedElements;
-        this.lastMsg = TimestampHelper.nowTimestamp();
         this.replicas = replicas;
     }
 
@@ -124,12 +112,10 @@ public class JpaParticipant extends Validated
      * @param copyConcept the concept to copy from
      */
     public JpaParticipant(@NonNull final JpaParticipant copyConcept) {
-        this.participantState = copyConcept.participantState;
         this.description = copyConcept.description;
         this.participantId = copyConcept.participantId;
         this.supportedElements = copyConcept.supportedElements;
         this.replicas = copyConcept.replicas;
-        this.lastMsg = copyConcept.lastMsg;
     }
 
     /**
@@ -145,9 +131,7 @@ public class JpaParticipant extends Validated
     public Participant toAuthorative() {
         var participant = new Participant();
 
-        participant.setParticipantState(participantState);
         participant.setParticipantId(UUID.fromString(participantId));
-        participant.setLastMsg(this.lastMsg.toString());
         participant.setParticipantSupportedElementTypes(new LinkedHashMap<>(this.supportedElements.size()));
         for (var element : this.supportedElements) {
             participant.getParticipantSupportedElementTypes()
@@ -161,9 +145,7 @@ public class JpaParticipant extends Validated
 
     @Override
     public void fromAuthorative(@NonNull final Participant participant) {
-        this.setParticipantState(participant.getParticipantState());
         this.participantId = participant.getParticipantId().toString();
-        this.lastMsg = TimestampHelper.toTimestamp(participant.getLastMsg());
 
         this.supportedElements = new ArrayList<>(participant.getParticipantSupportedElementTypes().size());
         for (var elementEntry : participant.getParticipantSupportedElementTypes().entrySet()) {
@@ -196,16 +178,6 @@ public class JpaParticipant extends Validated
             return result;
         }
 
-        result = lastMsg.compareTo(other.lastMsg);
-        if (result != 0) {
-            return result;
-        }
-
-        result = ObjectUtils.compare(participantState, other.participantState);
-        if (result != 0) {
-            return result;
-        }
-
         return ObjectUtils.compare(description, other.description);
     }
 }
index f19d5db..f90e5a8 100644 (file)
@@ -42,6 +42,7 @@ import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
@@ -49,6 +50,7 @@ import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
 import org.onap.policy.clamp.models.acm.persistence.concepts.StringToMapConverter;
@@ -115,6 +117,10 @@ public final class AcmUtils {
         return false;
     }
 
+    public static ToscaConceptIdentifier getType(ToscaNodeTemplate nodeTemplate) {
+        return new ToscaConceptIdentifier(nodeTemplate.getType(), nodeTemplate.getTypeVersion());
+    }
+
     /**
      * Prepare list of ParticipantDefinition for the Priming message.
      *
@@ -126,8 +132,7 @@ public final class AcmUtils {
 
         Map<UUID, List<AutomationCompositionElementDefinition>> map = new HashMap<>();
         for (var elementEntry : acElements) {
-            var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
-                    elementEntry.getValue().getTypeVersion());
+            var type = getType(elementEntry.getValue());
             var participantId = supportedElementMap.get(type);
             if (participantId == null) {
                 throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
@@ -431,6 +436,30 @@ public final class AcmUtils {
         return participantDeploys;
     }
 
+    /**
+     * Create a new ParticipantRestartAc for restarting scenario.
+     *
+     * @param automationComposition the AutomationComposition
+     * @param participantId the participantId of the participant restarted
+     * @param serviceTemplateFragment the ToscaServiceTemplate with policies and policy types
+     * @return the ParticipantRestartAc
+     */
+    public static ParticipantRestartAc createAcRestart(AutomationComposition automationComposition,
+            UUID participantId, ToscaServiceTemplate serviceTemplateFragment) {
+        var syncAc = new ParticipantRestartAc();
+        syncAc.setDeployState(automationComposition.getDeployState());
+        syncAc.setLockState(automationComposition.getLockState());
+        syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+        for (var element : automationComposition.getElements().values()) {
+            if (participantId.equals(element.getParticipantId())) {
+                var acElementSync = createAcElementRestart(element);
+                acElementSync.setToscaServiceTemplateFragment(serviceTemplateFragment);
+                syncAc.getAcElementList().add(acElementSync);
+            }
+        }
+        return syncAc;
+    }
+
     /**
      * Create a new AcElementRestart from an AutomationCompositionElement.
      *
@@ -451,6 +480,42 @@ public final class AcmUtils {
         return acElementRestart;
     }
 
+    /**
+     * Prepare the list of ParticipantDefinition for Participant Restarting/Sync msg.
+     *
+     * @param participantId the participantId
+     * @param acmDefinition the AutomationCompositionDefinition
+     * @param toscaElementName the ElementName
+     * @return List of ParticipantDefinition
+     */
+    public static List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
+            AutomationCompositionDefinition acmDefinition, String toscaElementName) {
+        var acElements = extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
+                toscaElementName);
+
+        // list of entry filtered by participantId
+        List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
+        Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
+        for (var elementEntry : acElements) {
+            var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
+            if (participantId == null || participantId.equals(elementState.getParticipantId())) {
+                supportedElementMap.put(getType(elementEntry.getValue()), elementState.getParticipantId());
+                elementList.add(elementEntry);
+            }
+        }
+        var list = prepareParticipantPriming(elementList, supportedElementMap);
+        for (var participantDefinition : list) {
+            for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) {
+                var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName());
+                if (state != null) {
+                    elementDe.setOutProperties(state.getOutProperties());
+                }
+            }
+        }
+        return list;
+    }
+
+
     /**
      * Recursive Merge.
      *
index a843c82..1a7a419 100644 (file)
@@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import java.util.HashMap;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
-import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 
 class ParticipantInformationTest {
 
@@ -33,8 +32,6 @@ class ParticipantInformationTest {
     void testCopyConstructor() {
         var participant = new Participant();
         participant.setParticipantId(UUID.randomUUID());
-        participant.setParticipantState(ParticipantState.ON_LINE);
-        participant.setLastMsg(TimestampHelper.now());
         participant.setParticipantSupportedElementTypes(new HashMap<>());
         var participantInfo1 = new ParticipantInformation();
         participantInfo1.setParticipant(participant);
index 7486d0d..2c6c60e 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2023 Nordix Foundation.
+ *  Copyright (C) 2021-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@
 package org.onap.policy.clamp.models.acm.concepts;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -46,7 +45,6 @@ class ParticipantTest {
         var p1 = new Participant();
 
         p1.setParticipantId(CommonTestData.getParticipantId());
-        p1.setParticipantState(ParticipantState.ON_LINE);
 
         assertThat(p1.toString()).contains("Participant(");
         assertNotEquals(0, p1.hashCode());
@@ -56,11 +54,6 @@ class ParticipantTest {
         assertNotEquals(p1, p0);
 
         var p2 = new Participant();
-
-        // @formatter:off
-        assertThatThrownBy(() -> p2.setParticipantState(null)).isInstanceOf(NullPointerException.class);
-        // @formatter:on
-
         assertEquals(p2, p0);
     }
 
@@ -68,7 +61,6 @@ class ParticipantTest {
     void testCopyConstructor() {
         var p0 = new Participant();
         p0.setParticipantId(UUID.randomUUID());
-        p0.setParticipantState(ParticipantState.ON_LINE);
         var supportedElementType = new ParticipantSupportedElementType();
         supportedElementType.setId(UUID.randomUUID());
         supportedElementType.setTypeName("type");
index e0f2f55..d2d253e 100644 (file)
@@ -27,15 +27,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.sql.Timestamp;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
-import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 
 /**
  * Test the {@link JpaParticipant} class.
@@ -47,30 +43,23 @@ class JpaParticipantTest {
     @Test
     void testJpaParticipantConstructor() {
         assertThatThrownBy(() -> new JpaParticipant((Participant) null))
-                .hasMessageMatching("authorativeConcept is marked .*ull but is null");
+            .hasMessageMatching("authorativeConcept is marked .*ull but is null");
 
         assertThatThrownBy(() -> new JpaParticipant((JpaParticipant) null))
             .hasMessageMatching("copyConcept is marked .*ull but is null");
 
-        assertThatThrownBy(() -> new JpaParticipant(null, ParticipantState.ON_LINE,
-                new ArrayList<>(), new ArrayList<>()))
+        assertThatThrownBy(() -> new JpaParticipant(null, new ArrayList<>(), new ArrayList<>()))
             .hasMessageMatching(NULL_KEY_ERROR);
 
-        assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), null,
-                new ArrayList<>(), new ArrayList<>()))
-            .hasMessageMatching("participantState is marked .*ull but is null");
-
-        assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), ParticipantState.ON_LINE,
-                null, new ArrayList<>()))
+        assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), null, new ArrayList<>()))
             .hasMessageMatching("supportedElements is marked .*ull but is null");
 
-        assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), ParticipantState.ON_LINE,
-                new ArrayList<>(), null))
-                .hasMessageMatching("replicas is marked .*ull but is null");
+        assertThatThrownBy(() -> new JpaParticipant(UUID.randomUUID().toString(), new ArrayList<>(), null))
+            .hasMessageMatching("replicas is marked .*ull but is null");
 
         assertDoesNotThrow(() -> new JpaParticipant());
         assertDoesNotThrow(() -> new JpaParticipant(UUID.randomUUID().toString(),
-            ParticipantState.ON_LINE, new ArrayList<>(), new ArrayList<>()));
+                new ArrayList<>(), new ArrayList<>()));
     }
 
     @Test
@@ -116,18 +105,6 @@ class JpaParticipantTest {
         assertEquals(0, testJpaParticipant.compareTo(testJpaParticipant));
         assertNotEquals(0, testJpaParticipant.compareTo(new DummyJpaParticipantChild()));
 
-        testJpaParticipant.setParticipantState(ParticipantState.OFF_LINE);
-        assertNotEquals(0, testJpaParticipant.compareTo(otherJpaParticipant));
-        testJpaParticipant.setParticipantState(ParticipantState.ON_LINE);
-        assertEquals(0, testJpaParticipant.compareTo(otherJpaParticipant));
-        assertEquals(testJpaParticipant, new JpaParticipant(testJpaParticipant));
-
-        testJpaParticipant.setLastMsg(Timestamp.from(Instant.EPOCH));
-        assertNotEquals(0, testJpaParticipant.compareTo(otherJpaParticipant));
-        testJpaParticipant.setLastMsg(otherJpaParticipant.getLastMsg());
-        assertEquals(0, testJpaParticipant.compareTo(otherJpaParticipant));
-        assertEquals(testJpaParticipant, new JpaParticipant(testJpaParticipant));
-
         var newJpaParticipant = new JpaParticipant(testJpaParticipant);
         newJpaParticipant.setParticipantId(testJpaParticipant.getParticipantId());
         assertEquals(testJpaParticipant, newJpaParticipant);
@@ -143,8 +120,6 @@ class JpaParticipantTest {
 
 
         var p1 = new JpaParticipant();
-        p1.setParticipantState(ParticipantState.ON_LINE);
-
         assertThat(p1.toString()).contains("Participant(");
         assertNotEquals(0, p1.hashCode());
         assertNotEquals(p1, p0);
@@ -154,14 +129,12 @@ class JpaParticipantTest {
 
         var p2 = new JpaParticipant();
         p2.setParticipantId(p0.getParticipantId());
-        p2.setLastMsg(p0.getLastMsg());
         assertEquals(p2, p0);
     }
 
     private Participant createParticipantInstance() {
         var testParticipant = new Participant();
         testParticipant.setParticipantId(UUID.randomUUID());
-        testParticipant.setLastMsg(TimestampHelper.now());
         testParticipant.setParticipantSupportedElementTypes(new LinkedHashMap<>());
         testParticipant.setReplicas(new LinkedHashMap<>());
 
index a5c93e8..024060f 100644 (file)
@@ -37,7 +37,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
@@ -45,6 +47,7 @@ import org.onap.policy.clamp.models.acm.document.concepts.DocToscaServiceTemplat
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
 import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.base.PfModelRuntimeException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaDataType;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
@@ -276,9 +279,7 @@ class AcmUtilsTest {
     }
 
     private Map<String, ToscaPolicyType> getDummyPolicyTypesMap() {
-        Map<String, ToscaPolicyType> policyTypes = new HashMap<>();
-        policyTypes.put("onap.policies.Match", new ToscaPolicyType());
-        return policyTypes;
+        return Map.of("onap.policies.Match", new ToscaPolicyType());
     }
 
     private Map<String, ToscaDataType> getDummyToscaDataTypeMap() {
@@ -290,11 +291,44 @@ class AcmUtilsTest {
     private Map<String, ToscaNodeTemplate> getDummyNodeTemplates() {
         Map<String, ToscaNodeTemplate> nodeTemplates = new HashMap<>();
         var nodeTemplate = new ToscaNodeTemplate();
-        nodeTemplate.setType("org.onap.policy.clamp.acm.AutomationCompositionElement");
+        nodeTemplate.setType(AUTOMATION_COMPOSITION_ELEMENT);
         nodeTemplates.put("org.onap.dcae.acm.DCAEMicroserviceAutomationCompositionParticipant", nodeTemplate);
         return nodeTemplates;
     }
 
+    @Test
+    void testcreateAcRestart() {
+        var automationComposition = getDummyAutomationComposition();
+        automationComposition.setInstanceId(UUID.randomUUID());
+        var toscaServiceTemplate = getDummyToscaServiceTemplate();
+        var participantId = automationComposition.getElements().values().iterator().next().getParticipantId();
+        var serviceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(toscaServiceTemplate);
+        var result = AcmUtils.createAcRestart(automationComposition, participantId, serviceTemplateFragment);
+        assertEquals(result.getAutomationCompositionId(), automationComposition.getInstanceId());
+        assertThat(result.getAcElementList()).hasSize(1);
+    }
+
+    @Test
+    void testPrepareParticipantRestarting() {
+        var serviceTemplate = CommonTestData.getToscaServiceTemplate(TOSCA_TEMPLATE_YAML);
+        var acmDefinition = new AutomationCompositionDefinition();
+        acmDefinition.setElementStateMap(Map.of());
+        acmDefinition.setServiceTemplate(serviceTemplate);
+        var acElements = AcmUtils.extractAcElementsFromServiceTemplate(serviceTemplate, AUTOMATION_COMPOSITION_ELEMENT);
+        acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.COMMISSIONED));
+        acmDefinition.getElementStateMap()
+                .values().forEach(element -> element.setParticipantId(UUID.randomUUID()));
+        var participantId = UUID.randomUUID();
+        var result = AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+                AUTOMATION_COMPOSITION_ELEMENT);
+        assertThat(result).isEmpty();
+
+        participantId = acmDefinition.getElementStateMap().values().iterator().next().getParticipantId();
+        result = AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+                AUTOMATION_COMPOSITION_ELEMENT);
+        assertThat(result).hasSize(1);
+    }
+
     @Test
     void testRecursiveMergeMap() {
         var oldProperties = """
index 7ac58ae..caa2c56 100644 (file)
@@ -218,7 +218,6 @@ public class ParticipantHandler {
     public void handleParticipantSync(ParticipantSync participantSyncMsg) {
         LOGGER.debug("ParticipantSync message received for participantId {}",
                 participantSyncMsg.getParticipantId());
-        acDefinitionHandler.handleParticipantRestart(participantSyncMsg);
     }
 
     /**
index 74ccb9c..8a56fbb 100644 (file)
@@ -29,7 +29,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -39,6 +38,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.Commissionin
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 import org.onap.policy.models.base.PfModelRuntimeException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
@@ -56,7 +56,7 @@ public class CommissioningProvider {
 
     private final AcDefinitionProvider acDefinitionProvider;
     private final AutomationCompositionProvider acProvider;
-    private final AcmParticipantProvider acmParticipantProvider;
+    private final ParticipantProvider participantProvider;
     private final AcTypeStateResolver acTypeStateResolver;
     private final ParticipantPrimePublisher participantPrimePublisher;
     private final AcRuntimeParameterGroup acRuntimeParameterGroup;
@@ -229,7 +229,7 @@ public class CommissioningProvider {
             }
         }
         if (!participantIds.isEmpty()) {
-            acmParticipantProvider.verifyParticipantState(participantIds);
+            participantProvider.verifyParticipantState(participantIds);
         }
         acmDefinition.setState(AcTypeState.DEPRIMING);
         acmDefinition.setLastMsg(TimestampHelper.now());
index 220636b..2bf0822 100644 (file)
@@ -29,7 +29,6 @@ import java.util.stream.Collectors;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
 import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -43,6 +42,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.Instantiatio
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
 import org.onap.policy.common.parameters.BeanValidationResult;
 import org.onap.policy.common.parameters.ObjectValidationResult;
@@ -69,7 +69,7 @@ public class AutomationCompositionInstantiationProvider {
     private final AcDefinitionProvider acDefinitionProvider;
     private final AcInstanceStateResolver acInstanceStateResolver;
     private final SupervisionAcHandler supervisionAcHandler;
-    private final AcmParticipantProvider acmParticipantProvider;
+    private final ParticipantProvider participantProvider;
     private final AcRuntimeParameterGroup acRuntimeParameterGroup;
 
     /**
@@ -265,7 +265,7 @@ public class AutomationCompositionInstantiationProvider {
         var participantIds = acDefinitionOpt.get().getElementStateMap().values().stream()
                 .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
 
-        acmParticipantProvider.verifyParticipantState(participantIds);
+        participantProvider.verifyParticipantState(participantIds);
 
         result.addResult(AcmUtils.validateAutomationComposition(automationComposition,
                 acDefinitionOpt.get().getServiceTemplate(),
@@ -331,7 +331,7 @@ public class AutomationCompositionInstantiationProvider {
         var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
         var participantIds = acDefinition.getElementStateMap().values().stream()
             .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
-        acmParticipantProvider.verifyParticipantState(participantIds);
+        participantProvider.verifyParticipantState(participantIds);
         supervisionAcHandler.delete(automationComposition, acDefinition);
         var response = new InstantiationResponse();
         response.setInstanceId(automationComposition.getInstanceId());
@@ -374,7 +374,7 @@ public class AutomationCompositionInstantiationProvider {
         var participantIds = acDefinition.getElementStateMap().values().stream()
                 .map(NodeTemplateState::getParticipantId).collect(Collectors.toSet());
 
-        acmParticipantProvider.verifyParticipantState(participantIds);
+        participantProvider.verifyParticipantState(participantIds);
         var result = acInstanceStateResolver.resolve(acInstanceStateUpdate.getDeployOrder(),
                 acInstanceStateUpdate.getLockOrder(), automationComposition.getDeployState(),
                 automationComposition.getLockState(), automationComposition.getStateChangeResult());
index 282389a..62ba7b0 100644 (file)
 
 package org.onap.policy.clamp.acm.runtime.participants;
 
-import jakarta.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import org.apache.commons.collections4.MapUtils;
@@ -33,9 +31,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantStatusReqPu
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantInformation;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
-import org.onap.policy.models.base.PfModelRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -94,12 +90,11 @@ public class AcmParticipantProvider {
      * @param participantId The UUID of the participant to send request to
      */
     public void sendParticipantStatusRequest(UUID participantId) {
-        var participant = this.participantProvider.getParticipantById(participantId);
+        // check if participant is present
+        this.participantProvider.getParticipantById(participantId);
 
         LOGGER.debug("Requesting Participant Status Now ParticipantStatusReq");
         participantStatusReqPublisher.send(participantId);
-        participant.setParticipantState(ParticipantState.OFF_LINE);
-        participantProvider.saveParticipant(participant);
     }
 
     /**
@@ -110,22 +105,6 @@ public class AcmParticipantProvider {
         this.participantStatusReqPublisher.send((UUID) null);
     }
 
-    /**
-     * Verify Participant state.
-     *
-     * @param participantIds The list of UUIDs of the participants to get
-     * @throws  PfModelRuntimeException in case the participant is offline
-     */
-    public void verifyParticipantState(Set<UUID> participantIds) {
-        for (UUID participantId : participantIds) {
-            var participant = this.participantProvider.getParticipantById(participantId);
-            if (! participant.getParticipantState().equals(ParticipantState.ON_LINE)) {
-                throw new PfModelRuntimeException(Response.Status.CONFLICT,
-                        "Participant: " + participantId + " is OFFLINE");
-            }
-        }
-    }
-
     private Map<UUID, AutomationCompositionElement> getAutomationCompositionElementsForParticipant(UUID participantId) {
         var automationCompositionElements = participantProvider
             .getAutomationCompositionElements(participantId);
index 802c660..3e2057e 100644 (file)
@@ -33,6 +33,7 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -42,6 +43,7 @@ import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
+import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
 import org.slf4j.Logger;
@@ -58,12 +60,14 @@ public class SupervisionAcHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAcHandler.class);
 
     private final AutomationCompositionProvider automationCompositionProvider;
+    private final AcDefinitionProvider acDefinitionProvider;
 
     // Publishers for participant communication
     private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher;
     private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
     private final AcElementPropertiesPublisher acElementPropertiesPublisher;
     private final AutomationCompositionMigrationPublisher acCompositionMigrationPublisher;
+    private final ParticipantSyncPublisher participantSyncPublisher;
 
     private final ExecutorService executor = Context.taskWrapping(Executors.newFixedThreadPool(1));
 
@@ -260,6 +264,8 @@ public class SupervisionAcHandler {
                 automationCompositionAckMessage.getStateChangeResult());
         if (updated) {
             automationCompositionProvider.updateAutomationComposition(automationComposition);
+            var acDefinition = acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId());
+            participantSyncPublisher.sendSync(acDefinition.getServiceTemplate(), automationComposition);
         }
     }
 
index 8f3a4c2..9ef979f 100644 (file)
@@ -41,9 +41,9 @@ public class SupervisionAspect implements Closeable {
     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionAspect.class);
 
     private final SupervisionScanner supervisionScanner;
-    private final SupervisionPartecipantScanner partecipantScanner;
+    private final SupervisionParticipantScanner participantScanner;
 
-    private ThreadPoolExecutor executor =
+    private final ThreadPoolExecutor executor =
             new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
 
     @Scheduled(
@@ -56,7 +56,7 @@ public class SupervisionAspect implements Closeable {
 
     private void executeScan() {
         supervisionScanner.run();
-        partecipantScanner.run();
+        participantScanner.run();
     }
 
     /**
index 963e483..a4e4704 100644 (file)
@@ -23,9 +23,10 @@ package org.onap.policy.clamp.acm.runtime.supervision;
 
 import io.micrometer.core.annotation.Timed;
 import lombok.AllArgsConstructor;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantPrimeAck;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
@@ -43,7 +44,7 @@ public class SupervisionHandler {
     private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionHandler.class);
 
     private final AcDefinitionProvider acDefinitionProvider;
-    private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+    private final ParticipantSyncPublisher participantSyncPublisher;
 
     /**
      * Handle a ParticipantPrimeAck message from a participant.
@@ -82,12 +83,7 @@ public class SupervisionHandler {
         boolean completed = true;
         boolean restarting = false;
         for (var element : acDefinition.getElementStateMap().values()) {
-            if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
-                element.setMessage(participantPrimeAckMessage.getMessage());
-                element.setState(participantPrimeAckMessage.getCompositionState());
-                element.setRestarting(null);
-                acDefinitionProvider.updateAcDefinitionElement(element, acDefinition.getCompositionId());
-            }
+            handlePrimeAckElement(participantPrimeAckMessage, element);
             if (!finalState.equals(element.getState())) {
                 completed = false;
             }
@@ -110,6 +106,18 @@ public class SupervisionHandler {
         if (toUpdate) {
             acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
                 acDefinition.getStateChangeResult(), acDefinition.getRestarting());
+            if (!participantPrimeAckMessage.getParticipantId().equals(participantPrimeAckMessage.getReplicaId())) {
+                participantSyncPublisher.sendSync(acDefinition, participantPrimeAckMessage.getReplicaId());
+            }
+        }
+    }
+
+    private void handlePrimeAckElement(ParticipantPrimeAck participantPrimeAckMessage, NodeTemplateState element) {
+        if (participantPrimeAckMessage.getParticipantId().equals(element.getParticipantId())) {
+            element.setMessage(participantPrimeAckMessage.getMessage());
+            element.setState(participantPrimeAckMessage.getCompositionState());
+            element.setRestarting(null);
+            acDefinitionProvider.updateAcDefinitionElement(element, participantPrimeAckMessage.getCompositionId());
         }
     }
 }
index 609e036..4c8c581 100644 (file)
@@ -21,7 +21,6 @@
 package org.onap.policy.clamp.acm.runtime.supervision;
 
 import io.micrometer.core.annotation.Timed;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,11 +31,13 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
@@ -65,6 +66,7 @@ public class SupervisionParticipantHandler {
     private final AutomationCompositionProvider automationCompositionProvider;
     private final AcDefinitionProvider acDefinitionProvider;
     private final ParticipantRestartPublisher participantRestartPublisher;
+    private final ParticipantSyncPublisher participantSyncPublisher;
     private final AcRuntimeParameterGroup acRuntimeParameterGroup;
 
     /**
@@ -72,21 +74,11 @@ public class SupervisionParticipantHandler {
      *
      * @param participantRegisterMsg the ParticipantRegister message received from a participant
      */
-    @MessageIntercept
     @Timed(value = "listener.participant_register", description = "PARTICIPANT_REGISTER messages received")
     public void handleParticipantMessage(ParticipantRegister participantRegisterMsg) {
-        var participantOpt = participantProvider.findParticipant(participantRegisterMsg.getParticipantId());
-
-        if (participantOpt.isPresent()) {
-            var participant = participantOpt.get();
-            checkOnline(participant);
-            handleRestart(participant.getParticipantId());
-        } else {
-            var participant = createParticipant(participantRegisterMsg.getParticipantId(),
-                    listToMap(participantRegisterMsg.getParticipantSupportedElementType()));
-            participantProvider.saveParticipant(participant);
-
-        }
+        saveIfNotPresent(participantRegisterMsg.getReplicaId(),
+                participantRegisterMsg.getParticipantId(),
+                participantRegisterMsg.getParticipantSupportedElementType(), true);
 
         participantRegisterAckPublisher.send(participantRegisterMsg.getMessageId(),
                 participantRegisterMsg.getParticipantId());
@@ -97,15 +89,13 @@ public class SupervisionParticipantHandler {
      *
      * @param participantDeregisterMsg the ParticipantDeregister message received from a participant
      */
-    @MessageIntercept
     @Timed(value = "listener.participant_deregister", description = "PARTICIPANT_DEREGISTER messages received")
     public void handleParticipantMessage(ParticipantDeregister participantDeregisterMsg) {
-        var participantOpt = participantProvider.findParticipant(participantDeregisterMsg.getParticipantId());
-
-        if (participantOpt.isPresent()) {
-            var participant = participantOpt.get();
-            participant.setParticipantState(ParticipantState.OFF_LINE);
-            participantProvider.saveParticipant(participant);
+        var replicaId = participantDeregisterMsg.getReplicaId() != null
+                ? participantDeregisterMsg.getReplicaId() : participantDeregisterMsg.getParticipantId();
+        var replicaOpt = participantProvider.findParticipantReplica(replicaId);
+        if (replicaOpt.isPresent()) {
+            participantProvider.deleteParticipantReplica(replicaId);
         }
 
         participantDeregisterAckPublisher.send(participantDeregisterMsg.getMessageId());
@@ -116,32 +106,57 @@ public class SupervisionParticipantHandler {
      *
      * @param participantStatusMsg the ParticipantStatus message received from a participant
      */
-    @MessageIntercept
     @Timed(value = "listener.participant_status", description = "PARTICIPANT_STATUS messages received")
     public void handleParticipantMessage(ParticipantStatus participantStatusMsg) {
+        saveIfNotPresent(participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantId(),
+                participantStatusMsg.getParticipantSupportedElementType(), false);
 
-        var participantOpt = participantProvider.findParticipant(participantStatusMsg.getParticipantId());
-        if (participantOpt.isEmpty()) {
-            var participant = createParticipant(participantStatusMsg.getParticipantId(),
-                    listToMap(participantStatusMsg.getParticipantSupportedElementType()));
-            participantProvider.saveParticipant(participant);
-        } else {
-            checkOnline(participantOpt.get());
-        }
         if (!participantStatusMsg.getAutomationCompositionInfoList().isEmpty()) {
             automationCompositionProvider.upgradeStates(participantStatusMsg.getAutomationCompositionInfoList());
         }
         if (!participantStatusMsg.getParticipantDefinitionUpdates().isEmpty()
                 && participantStatusMsg.getCompositionId() != null) {
             updateAcDefinitionOutProperties(participantStatusMsg.getCompositionId(),
-                    participantStatusMsg.getParticipantDefinitionUpdates());
+                participantStatusMsg.getReplicaId(), participantStatusMsg.getParticipantDefinitionUpdates());
         }
     }
 
-    private void updateAcDefinitionOutProperties(UUID composotionId, List<ParticipantDefinition> list) {
-        var acDefinitionOpt = acDefinitionProvider.findAcDefinition(composotionId);
+    private void saveIfNotPresent(UUID msgReplicaId, UUID participantId,
+            List<ParticipantSupportedElementType> participantSupportedElementType, boolean registration) {
+        var replicaId = msgReplicaId != null ? msgReplicaId : participantId;
+        var replicaOpt = participantProvider.findParticipantReplica(replicaId);
+        if (replicaOpt.isPresent()) {
+            var replica = replicaOpt.get();
+            checkOnline(replica);
+        } else {
+            var participant = getParticipant(participantId, listToMap(participantSupportedElementType));
+            participant.getReplicas().put(replicaId, createReplica(replicaId));
+            participantProvider.saveParticipant(participant);
+        }
+        if (registration) {
+            handleRestart(participantId, replicaId);
+        }
+    }
+
+    private Participant getParticipant(UUID participantId,
+            Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
+        var participantOpt = participantProvider.findParticipant(participantId);
+        return participantOpt.orElseGet(() -> createParticipant(participantId, participantSupportedElementType));
+    }
+
+    private ParticipantReplica createReplica(UUID replicaId) {
+        var replica = new ParticipantReplica();
+        replica.setReplicaId(replicaId);
+        replica.setParticipantState(ParticipantState.ON_LINE);
+        replica.setLastMsg(TimestampHelper.now());
+        return replica;
+
+    }
+
+    private void updateAcDefinitionOutProperties(UUID compositionId, UUID replicaId, List<ParticipantDefinition> list) {
+        var acDefinitionOpt = acDefinitionProvider.findAcDefinition(compositionId);
         if (acDefinitionOpt.isEmpty()) {
-            LOGGER.error("Ac Definition with id {} not found", composotionId);
+            LOGGER.error("Ac Definition with id {} not found", compositionId);
             return;
         }
         var acDefinition = acDefinitionOpt.get();
@@ -155,26 +170,32 @@ public class SupervisionParticipantHandler {
         }
         acDefinitionProvider.updateAcDefinition(acDefinition,
                 acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
+        participantSyncPublisher.sendSync(acDefinition, replicaId);
     }
 
-    private void checkOnline(Participant participant) {
-        if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) {
-            participant.setParticipantState(ParticipantState.ON_LINE);
+    private void checkOnline(ParticipantReplica replica) {
+        if (ParticipantState.OFF_LINE.equals(replica.getParticipantState())) {
+            replica.setParticipantState(ParticipantState.ON_LINE);
         }
-        participant.setLastMsg(TimestampHelper.now());
-        participantProvider.saveParticipant(participant);
+        replica.setLastMsg(TimestampHelper.now());
+        participantProvider.saveParticipantReplica(replica);
     }
 
-    private void handleRestart(UUID participantId) {
+    private void handleRestart(UUID participantId, UUID replicaId) {
         var compositionIds = participantProvider.getCompositionIds(participantId);
+        var oldParticipant = participantId.equals(replicaId);
         for (var compositionId : compositionIds) {
             var acDefinition = acDefinitionProvider.getAcDefinition(compositionId);
             LOGGER.debug("Scan Composition {} for restart", acDefinition.getCompositionId());
-            handleRestart(participantId, acDefinition);
+            if (oldParticipant) {
+                handleRestart(participantId, acDefinition);
+            } else {
+                handleSyncRestart(participantId, replicaId, acDefinition);
+            }
         }
     }
 
-    private void handleRestart(UUID participantId, AutomationCompositionDefinition acDefinition) {
+    private void handleRestart(final UUID participantId, AutomationCompositionDefinition acDefinition) {
         if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
             LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
             return;
@@ -185,14 +206,6 @@ public class SupervisionParticipantHandler {
                 elementState.setRestarting(true);
             }
         }
-        var automationCompositionList =
-                automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
-        List<AutomationComposition> automationCompositions = new ArrayList<>();
-        for (var automationComposition : automationCompositionList) {
-            if (isAcToBeRestarted(participantId, automationComposition)) {
-                automationCompositions.add(automationComposition);
-            }
-        }
         // expected final state
         if (StateChangeResult.TIMEOUT.equals(acDefinition.getStateChangeResult())) {
             acDefinition.setStateChangeResult(StateChangeResult.NO_ERROR);
@@ -200,6 +213,11 @@ public class SupervisionParticipantHandler {
         acDefinition.setRestarting(true);
         acDefinitionProvider.updateAcDefinition(acDefinition,
                 acRuntimeParameterGroup.getAcmParameters().getToscaCompositionName());
+
+        var automationCompositionList =
+                automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
+        var automationCompositions = automationCompositionList.stream()
+                .filter(ac -> isAcToBeRestarted(participantId, ac)).toList();
         participantRestartPublisher.send(participantId, acDefinition, automationCompositions);
     }
 
@@ -222,13 +240,34 @@ public class SupervisionParticipantHandler {
         return toAdd;
     }
 
+    private void handleSyncRestart(final UUID participantId, UUID replicaId,
+            AutomationCompositionDefinition acDefinition) {
+        if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+            LOGGER.debug("Composition {} COMMISSIONED", acDefinition.getCompositionId());
+            return;
+        }
+        LOGGER.debug("Composition to be send in Restart message {}", acDefinition.getCompositionId());
+        var automationCompositionList =
+                automationCompositionProvider.getAcInstancesByCompositionId(acDefinition.getCompositionId());
+        var automationCompositions = automationCompositionList.stream()
+                .filter(ac -> isAcToBeSyncRestarted(participantId, ac)).toList();
+        participantSyncPublisher.sendRestartMsg(participantId, replicaId, acDefinition, automationCompositions);
+    }
+
+    private boolean isAcToBeSyncRestarted(UUID participantId, AutomationComposition automationComposition) {
+        for (var element : automationComposition.getElements().values()) {
+            if (participantId.equals(element.getParticipantId())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private Participant createParticipant(UUID participantId,
             Map<UUID, ParticipantSupportedElementType> participantSupportedElementType) {
         var participant = new Participant();
         participant.setParticipantId(participantId);
         participant.setParticipantSupportedElementTypes(participantSupportedElementType);
-        participant.setParticipantState(ParticipantState.ON_LINE);
-        participant.setLastMsg(TimestampHelper.now());
         return participant;
     }
 
@@ -21,8 +21,7 @@
 package org.onap.policy.clamp.acm.runtime.supervision;
 
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.models.acm.concepts.Participant;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 import org.onap.policy.clamp.models.acm.utils.TimestampHelper;
 import org.slf4j.Logger;
@@ -33,20 +32,20 @@ import org.springframework.stereotype.Component;
  * This class is used to scan the automation compositions in the database and check if they are in the correct state.
  */
 @Component
-public class SupervisionPartecipantScanner {
-    private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionPartecipantScanner.class);
+public class SupervisionParticipantScanner {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SupervisionParticipantScanner.class);
 
     private final long maxWaitMs;
 
     private final ParticipantProvider participantProvider;
 
     /**
-     * Constructor for instantiating SupervisionPartecipantScanner.
+     * Constructor for instantiating SupervisionParticipantScanner.
      *
      * @param participantProvider the Participant Provider
      * @param acRuntimeParameterGroup the parameters for the automation composition runtime
      */
-    public SupervisionPartecipantScanner(final ParticipantProvider participantProvider,
+    public SupervisionParticipantScanner(final ParticipantProvider participantProvider,
             final AcRuntimeParameterGroup acRuntimeParameterGroup) {
         this.participantProvider = participantProvider;
         this.maxWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs();
@@ -56,27 +55,17 @@ public class SupervisionPartecipantScanner {
      * Run Scanning.
      */
     public void run() {
-        LOGGER.debug("Scanning participans in the database . . .");
-
-        for (var participant : participantProvider.getParticipants()) {
-            scanParticipantStatus(participant);
-        }
-
-        LOGGER.debug("Participans scan complete . . .");
+        LOGGER.debug("Scanning participants in the database . . .");
+        participantProvider.findReplicasOnLine().forEach(this::scanParticipantReplicaStatus);
+        LOGGER.debug("Participants scan complete . . .");
     }
 
-    private void scanParticipantStatus(Participant participant) {
-        var id = participant.getParticipantId();
-        if (ParticipantState.OFF_LINE.equals(participant.getParticipantState())) {
-            LOGGER.debug("report Participant is still OFF_LINE {}", id);
-            return;
-        }
+    private void scanParticipantReplicaStatus(ParticipantReplica replica) {
         var now = TimestampHelper.nowEpochMilli();
-        var lastMsg = TimestampHelper.toEpochMilli(participant.getLastMsg());
+        var lastMsg = TimestampHelper.toEpochMilli(replica.getLastMsg());
         if ((now - lastMsg) > maxWaitMs) {
-            LOGGER.debug("report Participant OFF_LINE {}", id);
-            participant.setParticipantState(ParticipantState.OFF_LINE);
-            participantProvider.saveParticipant(participant);
+            LOGGER.debug("Participant OFF_LINE {}", replica.getReplicaId());
+            participantProvider.deleteParticipantReplica(replica.getReplicaId());
         }
     }
 }
index 06d4646..75a2f05 100644 (file)
@@ -27,6 +27,7 @@ import java.util.UUID;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -55,6 +56,7 @@ public class SupervisionScanner {
     private final AcDefinitionProvider acDefinitionProvider;
     private final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher;
     private final AutomationCompositionDeployPublisher automationCompositionDeployPublisher;
+    private final ParticipantSyncPublisher participantSyncPublisher;
 
     /**
      * Constructor for instantiating SupervisionScanner.
@@ -69,11 +71,13 @@ public class SupervisionScanner {
             final AcDefinitionProvider acDefinitionProvider,
             final AutomationCompositionStateChangePublisher automationCompositionStateChangePublisher,
             final AutomationCompositionDeployPublisher automationCompositionDeployPublisher,
+            final ParticipantSyncPublisher participantSyncPublisher,
             final AcRuntimeParameterGroup acRuntimeParameterGroup) {
         this.automationCompositionProvider = automationCompositionProvider;
         this.acDefinitionProvider = acDefinitionProvider;
         this.automationCompositionStateChangePublisher = automationCompositionStateChangePublisher;
         this.automationCompositionDeployPublisher = automationCompositionDeployPublisher;
+        this.participantSyncPublisher = participantSyncPublisher;
         this.maxStatusWaitMs = acRuntimeParameterGroup.getParticipantParameters().getMaxStatusWaitMs();
     }
 
@@ -118,6 +122,7 @@ public class SupervisionScanner {
         if (completed) {
             acDefinitionProvider.updateAcDefinitionState(acDefinition.getCompositionId(), finalState,
                 StateChangeResult.NO_ERROR, null);
+            participantSyncPublisher.sendSync(acDefinition, null);
         } else {
             handleTimeout(acDefinition);
         }
@@ -132,7 +137,6 @@ public class SupervisionScanner {
                 || StateChangeResult.FAILED.equals(automationComposition.getStateChangeResult())) {
             LOGGER.debug("automation composition {} scanned, OK", automationComposition.getInstanceId());
 
-            // Clear Timeout on automation composition
             return;
         }
 
@@ -158,7 +162,7 @@ public class SupervisionScanner {
             LOGGER.debug("automation composition scan: transition state {} {} completed",
                     automationComposition.getDeployState(), automationComposition.getLockState());
 
-            complete(automationComposition);
+            complete(automationComposition, serviceTemplate);
         } else {
             LOGGER.debug("automation composition scan: transition state {} {} not completed",
                     automationComposition.getDeployState(), automationComposition.getLockState());
@@ -183,7 +187,8 @@ public class SupervisionScanner {
         }
     }
 
-    private void complete(final AutomationComposition automationComposition) {
+    private void complete(final AutomationComposition automationComposition,
+            ToscaServiceTemplate serviceTemplate) {
         var deployState = automationComposition.getDeployState();
         if (DeployState.MIGRATING.equals(automationComposition.getDeployState())) {
             // migration scenario
@@ -201,6 +206,7 @@ public class SupervisionScanner {
         } else {
             automationCompositionProvider.updateAutomationComposition(automationComposition);
         }
+        participantSyncPublisher.sendSync(serviceTemplate, automationComposition);
     }
 
     private void handleTimeout(AutomationCompositionDefinition acDefinition) {
index 89763a2..b0848bd 100644 (file)
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.UUID;
 import lombok.AllArgsConstructor;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
@@ -55,7 +54,6 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
     private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantPrimePublisher.class);
 
     private final ParticipantProvider participantProvider;
-    private final AcmParticipantProvider acmParticipantProvider;
     private final AcRuntimeParameterGroup acRuntimeParameterGroup;
 
     /**
@@ -99,9 +97,7 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
                 var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
                 elementState.setState(AcTypeState.PRIMING);
                 participantIds.add(elementState.getParticipantId());
-                var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
-                        elementEntry.getValue().getTypeVersion());
-                supportedElementMap.put(type, elementState.getParticipantId());
+                supportedElementMap.put(AcmUtils.getType(elementEntry.getValue()), elementState.getParticipantId());
             }
         } else {
             // scenario Prime participants not assigned yet
@@ -109,16 +105,14 @@ public class ParticipantPrimePublisher extends AbstractParticipantPublisher<Part
             for (var elementEntry : acElements) {
                 var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
                 elementState.setState(AcTypeState.PRIMING);
-                var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
-                        elementEntry.getValue().getTypeVersion());
-                var participantId = supportedElementMap.get(type);
+                var participantId = supportedElementMap.get(AcmUtils.getType(elementEntry.getValue()));
                 if (participantId != null) {
                     elementState.setParticipantId(participantId);
                     participantIds.add(participantId);
                 }
             }
         }
-        acmParticipantProvider.verifyParticipantState(participantIds);
+        participantProvider.verifyParticipantState(participantIds);
         return AcmUtils.prepareParticipantPriming(acElements, supportedElementMap);
     }
 
index 4f28eab..3fe46a9 100644 (file)
@@ -22,22 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
 
 import io.micrometer.core.annotation.Timed;
 import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.UUID;
 import lombok.AllArgsConstructor;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -66,52 +59,18 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa
         message.setMessageId(UUID.randomUUID());
         message.setTimestamp(Instant.now());
         message.setState(acmDefinition.getState());
-        message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+        message.setParticipantDefinitionUpdates(
+                AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+                        acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
         var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
 
         for (var automationComposition : automationCompositions) {
-            var restartAc = new ParticipantRestartAc();
-            restartAc.setAutomationCompositionId(automationComposition.getInstanceId());
-            for (var element : automationComposition.getElements().values()) {
-                if (participantId.equals(element.getParticipantId())) {
-                    var acElementRestart = AcmUtils.createAcElementRestart(element);
-                    acElementRestart.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
-                    restartAc.getAcElementList().add(acElementRestart);
-                }
-            }
+            var restartAc = AcmUtils
+                    .createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment);
             message.getAutomationcompositionList().add(restartAc);
         }
 
         LOGGER.debug("Participant Restart sent {}", message);
         super.send(message);
     }
-
-    protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
-            AutomationCompositionDefinition acmDefinition) {
-        var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
-                acRuntimeParameterGroup.getAcmParameters().getToscaElementName());
-
-        // list of entry filtered by participantId
-        List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
-        Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
-        for (var elementEntry : acElements) {
-            var elementState = acmDefinition.getElementStateMap().get(elementEntry.getKey());
-            if (participantId.equals(elementState.getParticipantId())) {
-                var type = new ToscaConceptIdentifier(elementEntry.getValue().getType(),
-                        elementEntry.getValue().getTypeVersion());
-                supportedElementMap.put(type, participantId);
-                elementList.add(elementEntry);
-            }
-        }
-        var list = AcmUtils.prepareParticipantPriming(elementList, supportedElementMap);
-        for (var participantDefinition : list) {
-            for (var elementDe : participantDefinition.getAutomationCompositionElementDefinitionList()) {
-                var state = acmDefinition.getElementStateMap().get(elementDe.getAcElementDefinitionId().getName());
-                if (state != null) {
-                    elementDe.setOutProperties(state.getOutProperties());
-                }
-            }
-        }
-        return list;
-    }
 }
index 76feee7..2eb434b 100644 (file)
@@ -40,7 +40,7 @@ public class ParticipantStatusReqPublisher extends AbstractParticipantPublisher<
      */
     @Timed(value = "publisher.participant_status_req", description = "PARTICIPANT_STATUS_REQ messages published")
     public void send(UUID participantId) {
-        ParticipantStatusReq message = new ParticipantStatusReq();
+        var message = new ParticipantStatusReq();
         message.setParticipantId(participantId);
         message.setTimestamp(Instant.now());
 
index ae7eda1..b63bc0a 100644 (file)
@@ -23,69 +23,58 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
 import io.micrometer.core.annotation.Timed;
 import java.time.Instant;
 import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
+import lombok.AllArgsConstructor;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
-
 @Component
-public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
+@AllArgsConstructor
+public class ParticipantSyncPublisher extends AbstractParticipantPublisher<ParticipantSync> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class);
-
     private final AcRuntimeParameterGroup acRuntimeParameterGroup;
 
-    public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) {
-        super(acRuntimeParameterGroup);
-        this.acRuntimeParameterGroup = acRuntimeParameterGroup;
-    }
-
-
     /**
-     * Send sync msg to Participant.
+     * Send Restart sync msg to Participant by participantId.
      *
-     * @param participantId the ParticipantId
+     * @param participantId the participantId
+     * @param replicaId the replicaId
      * @param acmDefinition the AutomationComposition Definition
      * @param automationCompositions the list of automationCompositions
      */
-    @Override
     @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
-    public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
+    public void sendRestartMsg(UUID participantId, UUID replicaId, AutomationCompositionDefinition acmDefinition,
                      List<AutomationComposition> automationCompositions) {
 
         var message = new ParticipantSync();
         message.setParticipantId(participantId);
+        message.setReplicaId(replicaId);
+        message.setRestarting(true);
         message.setCompositionId(acmDefinition.getCompositionId());
         message.setMessageId(UUID.randomUUID());
         message.setTimestamp(Instant.now());
         message.setState(acmDefinition.getState());
-        message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+        message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(participantId, acmDefinition,
+                acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
         var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
 
         for (var automationComposition : automationCompositions) {
-            var syncAc = new ParticipantRestartAc();
-            syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
-            for (var element : automationComposition.getElements().values()) {
-                if (participantId.equals(element.getParticipantId())) {
-                    var acElementSync = AcmUtils.createAcElementRestart(element);
-                    acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
-                    syncAc.getAcElementList().add(acElementSync);
-                }
-            }
+            var syncAc = AcmUtils.createAcRestart(automationComposition, participantId, toscaServiceTemplateFragment);
             message.getAutomationcompositionList().add(syncAc);
         }
 
-        LOGGER.debug("Participant Sync sent {}", message);
+        LOGGER.debug("Participant Restarting Sync sent {}", message);
         super.send(message);
     }
 
@@ -98,4 +87,64 @@ public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
         return false;
     }
 
+    /**
+     * Send AutomationCompositionDefinition sync msg to all Participants.
+     *
+     * @param acDefinition the AutomationComposition Definition
+     * @param excludeReplicaId the replica to be excluded
+     */
+    @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+    public void sendSync(AutomationCompositionDefinition acDefinition, UUID excludeReplicaId) {
+        var message = new ParticipantSync();
+        message.setCompositionId(acDefinition.getCompositionId());
+        if (excludeReplicaId != null) {
+            message.getExcludeReplicas().add(excludeReplicaId);
+        }
+        message.setState(acDefinition.getState());
+        message.setMessageId(UUID.randomUUID());
+        message.setTimestamp(Instant.now());
+        if (AcTypeState.COMMISSIONED.equals(acDefinition.getState())) {
+            message.setDelete(true);
+        } else {
+            message.setParticipantDefinitionUpdates(AcmUtils.prepareParticipantRestarting(null, acDefinition,
+                    acRuntimeParameterGroup.getAcmParameters().getToscaElementName()));
+        }
+        LOGGER.debug("Participant AutomationCompositionDefinition Sync sent {}", message);
+        super.send(message);
+    }
+
+    /**
+     * Send AutomationComposition sync msg to all Participants.
+     *
+     * @param serviceTemplate the ServiceTemplate
+     * @param automationComposition the automationComposition
+     */
+    @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+    public void sendSync(ToscaServiceTemplate serviceTemplate, AutomationComposition automationComposition) {
+        var message = new ParticipantSync();
+        message.setCompositionId(automationComposition.getCompositionId());
+        message.setAutomationCompositionId(automationComposition.getInstanceId());
+        message.setState(AcTypeState.PRIMED);
+        message.setMessageId(UUID.randomUUID());
+        message.setTimestamp(Instant.now());
+        var syncAc = new ParticipantRestartAc();
+        syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+        syncAc.setDeployState(automationComposition.getDeployState());
+        syncAc.setLockState(automationComposition.getLockState());
+        if (DeployState.DELETED.equals(automationComposition.getDeployState())) {
+            message.setDelete(true);
+        } else {
+            var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(serviceTemplate);
+            for (var element : automationComposition.getElements().values()) {
+                var acElementSync = AcmUtils.createAcElementRestart(element);
+                acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
+                syncAc.getAcElementList().add(acElementSync);
+
+            }
+        }
+        message.getAutomationcompositionList().add(syncAc);
+
+        LOGGER.debug("Participant AutomationComposition Sync sent {}", message);
+        super.send(message);
+    }
 }
index 5c26ea3..4137191 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2023 Nordix Foundation.
+ *  Copyright (C) 2021-2024 Nordix Foundation.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -36,7 +36,6 @@ import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantPrimePublisher;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
@@ -47,6 +46,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.commissioning.PrimeOrder;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcTypeStateResolver;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 
 class CommissioningProviderTest {
@@ -165,7 +165,7 @@ class CommissioningProviderTest {
 
         var participantPrimePublisher = mock(ParticipantPrimePublisher.class);
         var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class),
-                mock(AcmParticipantProvider.class), new AcTypeStateResolver(), participantPrimePublisher,
+                mock(ParticipantProvider.class), new AcTypeStateResolver(), participantPrimePublisher,
                 CommonTestData.getTestParamaterGroup());
 
         var acTypeStateUpdate = new AcTypeStateUpdate();
@@ -184,15 +184,15 @@ class CommissioningProviderTest {
         when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition);
 
         var participantPrimePublisher = mock(ParticipantPrimePublisher.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class),
-                acmParticipantProvider, new AcTypeStateResolver(), participantPrimePublisher,
+                participantProvider, new AcTypeStateResolver(), participantPrimePublisher,
                 CommonTestData.getTestParamaterGroup());
 
         var acTypeStateUpdate = new AcTypeStateUpdate();
         acTypeStateUpdate.setPrimeOrder(PrimeOrder.DEPRIME);
 
-        doNothing().when(acmParticipantProvider).verifyParticipantState(any());
+        doNothing().when(participantProvider).verifyParticipantState(any());
         provider.compositionDefinitionPriming(compositionId, acTypeStateUpdate);
         verify(participantPrimePublisher, timeout(1000).times(1)).sendDepriming(compositionId);
     }
@@ -201,7 +201,7 @@ class CommissioningProviderTest {
     void testBadRequest() {
         var acProvider = mock(AutomationCompositionProvider.class);
         var provider = new CommissioningProvider(mock(AcDefinitionProvider.class), acProvider,
-                mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
+                mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
                 mock(AcRuntimeParameterGroup.class));
 
         var compositionId = UUID.randomUUID();
@@ -225,7 +225,7 @@ class CommissioningProviderTest {
         when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition);
 
         var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class),
-                mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
+                mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
                 mock(AcRuntimeParameterGroup.class));
 
         assertThatThrownBy(() -> provider.updateCompositionDefinition(compositionId, toscaServiceTemplate))
@@ -245,7 +245,7 @@ class CommissioningProviderTest {
         when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acmDefinition);
 
         var provider = new CommissioningProvider(acDefinitionProvider, mock(AutomationCompositionProvider.class),
-                mock(AcmParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
+                mock(ParticipantProvider.class), new AcTypeStateResolver(), mock(ParticipantPrimePublisher.class),
                 mock(AcRuntimeParameterGroup.class));
 
         var acTypeStateUpdate = new AcTypeStateUpdate();
index fbd8330..2ee6a15 100644 (file)
@@ -37,7 +37,6 @@ import java.util.UUID;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
 import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
@@ -52,6 +51,7 @@ import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
+import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.ProviderUtils;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
 import org.onap.policy.models.tosca.simple.concepts.JpaToscaServiceTemplate;
@@ -101,9 +101,9 @@ class AutomationCompositionInstantiationProviderTest {
         when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
         var acProvider = mock(AutomationCompositionProvider.class);
         var supervisionAcHandler = mock(SupervisionAcHandler.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
-                null, supervisionAcHandler, acmParticipantProvider,
+                null, supervisionAcHandler, participantProvider,
                 CommonTestData.getTestParamaterGroup());
         var automationCompositionCreate =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud");
@@ -141,7 +141,7 @@ class AutomationCompositionInstantiationProviderTest {
 
         when(acProvider.deleteAutomationComposition(automationCompositionUpdate.getInstanceId()))
                 .thenReturn(automationCompositionUpdate);
-        doNothing().when(acmParticipantProvider).verifyParticipantState(any());
+        doNothing().when(participantProvider).verifyParticipantState(any());
         instantiationProvider.deleteAutomationComposition(automationCompositionCreate.getCompositionId(),
                 automationCompositionCreate.getInstanceId());
 
@@ -167,9 +167,9 @@ class AutomationCompositionInstantiationProviderTest {
         when(acProvider.updateAutomationComposition(acmFromDb)).thenReturn(acmFromDb);
 
         var supervisionAcHandler = mock(SupervisionAcHandler.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
-                null, supervisionAcHandler, acmParticipantProvider,
+                null, supervisionAcHandler, participantProvider,
                 CommonTestData.getTestParamaterGroup());
         var instantiationResponse = instantiationProvider.updateAutomationComposition(
                 automationCompositionUpdate.getCompositionId(), automationCompositionUpdate);
@@ -201,7 +201,7 @@ class AutomationCompositionInstantiationProviderTest {
 
         var instantiationProvider =
                 new AutomationCompositionInstantiationProvider(acProvider, mock(AcDefinitionProvider.class), null,
-                        mock(SupervisionAcHandler.class), mock(AcmParticipantProvider.class),
+                        mock(SupervisionAcHandler.class), mock(ParticipantProvider.class),
                         mock(AcRuntimeParameterGroup.class));
 
         var compositionId = automationCompositionUpdate.getCompositionId();
@@ -232,7 +232,7 @@ class AutomationCompositionInstantiationProviderTest {
 
         var instantiationProvider =
                 new AutomationCompositionInstantiationProvider(acProvider, mock(AcDefinitionProvider.class), null,
-                        mock(SupervisionAcHandler.class), mock(AcmParticipantProvider.class),
+                        mock(SupervisionAcHandler.class), mock(ParticipantProvider.class),
                         mock(AcRuntimeParameterGroup.class));
 
         var compositionId = automationCompositionUpdate.getCompositionId();
@@ -273,9 +273,9 @@ class AutomationCompositionInstantiationProviderTest {
                 .thenReturn(automationCompositionUpdate);
 
         var supervisionAcHandler = mock(SupervisionAcHandler.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
-                null, supervisionAcHandler, acmParticipantProvider,
+                null, supervisionAcHandler, participantProvider,
                 mock(AcRuntimeParameterGroup.class));
         assertThatThrownBy(
                 () -> instantiationProvider.updateAutomationComposition(compositionId, automationCompositionUpdate))
@@ -303,9 +303,9 @@ class AutomationCompositionInstantiationProviderTest {
         when(acProvider.updateAutomationComposition(automationComposition)).thenReturn(automationComposition);
 
         var supervisionAcHandler = mock(SupervisionAcHandler.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
-                null, supervisionAcHandler, acmParticipantProvider, new AcRuntimeParameterGroup());
+                null, supervisionAcHandler, participantProvider, new AcRuntimeParameterGroup());
 
         assertThatThrownBy(() -> instantiationProvider
                 .updateAutomationComposition(automationComposition.getCompositionId(), automationComposition))
@@ -352,9 +352,9 @@ class AutomationCompositionInstantiationProviderTest {
         automationComposition.getElements().clear();
 
         var supervisionAcHandler = mock(SupervisionAcHandler.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
-                null, supervisionAcHandler, acmParticipantProvider, new AcRuntimeParameterGroup());
+                null, supervisionAcHandler, participantProvider, new AcRuntimeParameterGroup());
 
         assertThatThrownBy(() -> instantiationProvider
                 .updateAutomationComposition(automationComposition.getCompositionId(), acMigrate))
@@ -373,11 +373,11 @@ class AutomationCompositionInstantiationProviderTest {
         when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
         automationComposition.setCompositionId(compositionId);
         var supervisionAcHandler = mock(SupervisionAcHandler.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class);
 
         var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
-                null, supervisionAcHandler, acmParticipantProvider, acRuntimeParameterGroup);
+                null, supervisionAcHandler, participantProvider, acRuntimeParameterGroup);
 
         when(acProvider.getAutomationComposition(automationComposition.getInstanceId()))
                 .thenReturn(automationComposition);
@@ -436,10 +436,10 @@ class AutomationCompositionInstantiationProviderTest {
         var acProvider = mock(AutomationCompositionProvider.class);
         when(acProvider.createAutomationComposition(automationCompositionCreate))
                 .thenReturn(automationCompositionCreate);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
 
         var instantiationProvider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
-                null, null, acmParticipantProvider,
+                null, null, participantProvider,
                 CommonTestData.getTestParamaterGroup());
 
         var instantiationResponse = instantiationProvider.createAutomationComposition(
@@ -457,7 +457,7 @@ class AutomationCompositionInstantiationProviderTest {
     @Test
     void testCreateAutomationCompositions_CommissionedAcElementNotFound() {
         var acDefinitionProvider = mock(AcDefinitionProvider.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var compositionId = acDefinition.getCompositionId();
         when(acDefinitionProvider.findAcDefinition(compositionId)).thenReturn(Optional.of(acDefinition));
@@ -467,7 +467,7 @@ class AutomationCompositionInstantiationProviderTest {
 
         var acProvider = mock(AutomationCompositionProvider.class);
         var provider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider, null, null,
-                acmParticipantProvider, CommonTestData.getTestParamaterGroup());
+                participantProvider, CommonTestData.getTestParamaterGroup());
 
         assertThatThrownBy(() -> provider.createAutomationComposition(compositionId, automationComposition))
                 .hasMessageMatching(AC_ELEMENT_NAME_NOT_FOUND);
@@ -572,9 +572,9 @@ class AutomationCompositionInstantiationProviderTest {
         when(acProvider.getAutomationComposition(instanceId)).thenReturn(automationComposition);
 
         var supervisionAcHandler = mock(SupervisionAcHandler.class);
-        var acmParticipantProvider = mock(AcmParticipantProvider.class);
+        var participantProvider = mock(ParticipantProvider.class);
         var provider = new AutomationCompositionInstantiationProvider(acProvider, acDefinitionProvider,
-                new AcInstanceStateResolver(), supervisionAcHandler, acmParticipantProvider,
+                new AcInstanceStateResolver(), supervisionAcHandler, participantProvider,
                 mock(AcRuntimeParameterGroup.class));
 
         var acInstanceStateUpdate = new AcInstanceStateUpdate();
index bcfdea1..ca58fad 100644 (file)
@@ -374,6 +374,9 @@ class InstantiationControllerTest extends CommonRestController {
     }
 
     private void saveDummyParticipantInDb() {
-        participantProvider.saveParticipant(CommonTestData.createParticipant(CommonTestData.getParticipantId()));
+        var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
+        var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId());
+        participant.getReplicas().put(replica.getReplicaId(), replica);
+        participantProvider.saveParticipant(participant);
     }
 }
index 8f39c9e..0bec9d0 100644 (file)
@@ -39,16 +39,19 @@ import org.onap.policy.clamp.acm.runtime.supervision.comm.AcElementPropertiesPub
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionMigrationPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeployAck;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
 import org.onap.policy.clamp.models.acm.concepts.DeployState;
 import org.onap.policy.clamp.models.acm.concepts.LockState;
 import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.AutomationCompositionDeployAck;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
+import org.onap.policy.clamp.models.acm.persistence.provider.AcDefinitionProvider;
 import org.onap.policy.clamp.models.acm.persistence.provider.AutomationCompositionProvider;
 
 class SupervisionAcHandlerTest {
@@ -64,9 +67,14 @@ class SupervisionAcHandlerTest {
         when(automationCompositionProvider.findAutomationComposition(IDENTIFIER))
                 .thenReturn(Optional.of(automationComposition));
 
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var acDefinitionProvider = mock(AcDefinitionProvider.class);
+        when(acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()))
+                .thenReturn(new AutomationCompositionDefinition());
+
+        var handler = new SupervisionAcHandler(automationCompositionProvider, acDefinitionProvider,
                 mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
 
         var automationCompositionAckMessage =
                 getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK,
@@ -100,14 +108,19 @@ class SupervisionAcHandlerTest {
         when(automationCompositionProvider.findAutomationComposition(IDENTIFIER))
                 .thenReturn(Optional.of(automationComposition));
 
+        var acDefinitionProvider = mock(AcDefinitionProvider.class);
+        when(acDefinitionProvider.getAcDefinition(automationComposition.getCompositionId()))
+                .thenReturn(new AutomationCompositionDefinition());
+
         var automationCompositionAckMessage =
                 getAutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_DEPLOY_ACK,
                         automationComposition, DeployState.DEPLOYED, LockState.LOCKED);
         automationCompositionAckMessage.setParticipantId(CommonTestData.getParticipantId());
 
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, acDefinitionProvider,
                 mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
 
         handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
 
@@ -142,9 +155,9 @@ class SupervisionAcHandlerTest {
 
         var automationCompositionStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
 
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class), automationCompositionStateChangePublisher, null,
-                null);
+                null, mock(ParticipantSyncPublisher.class));
 
         handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
 
@@ -156,8 +169,9 @@ class SupervisionAcHandlerTest {
     void testDeployFailed() {
         var automationCompositionDeployPublisher = mock(AutomationCompositionDeployPublisher.class);
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider, automationCompositionDeployPublisher,
-                mock(AutomationCompositionStateChangePublisher.class), mock(AcElementPropertiesPublisher.class), null);
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
+                automationCompositionDeployPublisher, mock(AutomationCompositionStateChangePublisher.class),
+                mock(AcElementPropertiesPublisher.class), null, mock(ParticipantSyncPublisher.class));
 
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
@@ -174,9 +188,10 @@ class SupervisionAcHandlerTest {
     void testUndeploy() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -191,9 +206,10 @@ class SupervisionAcHandlerTest {
     void testUndeployFailed() {
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
 
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
@@ -211,9 +227,10 @@ class SupervisionAcHandlerTest {
     void testUnlock() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -228,9 +245,10 @@ class SupervisionAcHandlerTest {
     void testUnlockFailed() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -247,9 +265,10 @@ class SupervisionAcHandlerTest {
     void testLock() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -264,9 +283,10 @@ class SupervisionAcHandlerTest {
     void testLockFailed() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acStateChangePublisher = mock(AutomationCompositionStateChangePublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class), acStateChangePublisher,
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acDefinition = CommonTestData.createAcDefinition(serviceTemplate, AcTypeState.PRIMED);
         var automationComposition =
@@ -294,9 +314,10 @@ class SupervisionAcHandlerTest {
                 .setParticipantId(automationComposition.getElements().values().iterator().next().getParticipantId());
         automationCompositionAckMessage.setAutomationCompositionId(IDENTIFIER);
 
-        var handler = new SupervisionAcHandler(automationCompositionProvider,
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
                 mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                mock(AcElementPropertiesPublisher.class), null);
+                mock(AcElementPropertiesPublisher.class), null,
+                mock(ParticipantSyncPublisher.class));
 
         handler.handleAutomationCompositionUpdateAckMessage(automationCompositionAckMessage);
 
@@ -308,8 +329,9 @@ class SupervisionAcHandlerTest {
     void testUpdate() {
         var acElementPropertiesPublisher = mock(AcElementPropertiesPublisher.class);
         var handler = new SupervisionAcHandler(mock(AutomationCompositionProvider.class),
-                mock(AutomationCompositionDeployPublisher.class), mock(AutomationCompositionStateChangePublisher.class),
-                acElementPropertiesPublisher, null);
+                mock(AcDefinitionProvider.class), mock(AutomationCompositionDeployPublisher.class),
+                mock(AutomationCompositionStateChangePublisher.class), acElementPropertiesPublisher, null,
+                mock(ParticipantSyncPublisher.class));
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Lock");
         handler.update(automationComposition);
@@ -320,8 +342,9 @@ class SupervisionAcHandlerTest {
     void testMigrate() {
         var automationCompositionProvider = mock(AutomationCompositionProvider.class);
         var acCompositionMigrationPublisher = mock(AutomationCompositionMigrationPublisher.class);
-        var handler = new SupervisionAcHandler(automationCompositionProvider, null, null, null,
-                acCompositionMigrationPublisher);
+        var handler = new SupervisionAcHandler(automationCompositionProvider, mock(AcDefinitionProvider.class),
+                null, null, null,
+                acCompositionMigrationPublisher, mock(ParticipantSyncPublisher.class));
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Migrate");
         handler.migrate(automationComposition, UUID.randomUUID());
index f78344b..7a72e0e 100644 (file)
@@ -32,19 +32,19 @@ class SupervisionAspectTest {
     @Test
     void testSchedule() throws Exception {
         var supervisionScanner = mock(SupervisionScanner.class);
-        var partecipantScanner = mock(SupervisionPartecipantScanner.class);
-        try (var supervisionAspect = new SupervisionAspect(supervisionScanner, partecipantScanner)) {
+        var participantScanner = mock(SupervisionParticipantScanner.class);
+        try (var supervisionAspect = new SupervisionAspect(supervisionScanner, participantScanner)) {
             supervisionAspect.schedule();
             verify(supervisionScanner, timeout(500)).run();
-            verify(partecipantScanner, timeout(500)).run();
+            verify(participantScanner, timeout(500)).run();
         }
     }
 
     @Test
     void testDoCheck() throws Exception {
         var supervisionScanner = mock(SupervisionScanner.class);
-        var partecipantScanner = mock(SupervisionPartecipantScanner.class);
-        try (var supervisionAspect = new SupervisionAspect(supervisionScanner, partecipantScanner)) {
+        var participantScanner = mock(SupervisionParticipantScanner.class);
+        try (var supervisionAspect = new SupervisionAspect(supervisionScanner, participantScanner)) {
             supervisionAspect.doCheck();
             supervisionAspect.doCheck();
             verify(supervisionScanner, timeout(500).times(2)).run();
index 448666f..e8be3b6 100644 (file)
@@ -30,7 +30,7 @@ import static org.onap.policy.clamp.acm.runtime.util.CommonTestData.TOSCA_SERVIC
 import java.util.Optional;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
-import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
@@ -46,8 +46,7 @@ class SupervisionHandlerTest {
         participantPrimeAckMessage.setParticipantId(CommonTestData.getParticipantId());
         participantPrimeAckMessage.setState(ParticipantState.ON_LINE);
         var acDefinitionProvider = mock(AcDefinitionProvider.class);
-        var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class);
-        var handler = new SupervisionHandler(acDefinitionProvider, acRuntimeParameterGroup);
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
 
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(any());
@@ -66,9 +65,7 @@ class SupervisionHandlerTest {
         var acDefinitionProvider = mock(AcDefinitionProvider.class);
         when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
                 .thenReturn(Optional.of(acDefinition));
-        var acRuntimeParameterGroup = mock(AcRuntimeParameterGroup.class);
-
-        var handler = new SupervisionHandler(acDefinitionProvider, acRuntimeParameterGroup);
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
 
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(any());
@@ -93,7 +90,7 @@ class SupervisionHandlerTest {
         when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
                 .thenReturn(Optional.of(acDefinition));
 
-        var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup());
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
 
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(any());
@@ -120,7 +117,7 @@ class SupervisionHandlerTest {
         when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
                 .thenReturn(Optional.of(acDefinition));
 
-        var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup());
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
 
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(any());
@@ -150,7 +147,7 @@ class SupervisionHandlerTest {
         when(acDefinitionProvider.findAcDefinition(acDefinition.getCompositionId()))
             .thenReturn(Optional.of(acDefinition));
 
-        var handler = new SupervisionHandler(acDefinitionProvider, CommonTestData.getTestParamaterGroup());
+        var handler = new SupervisionHandler(acDefinitionProvider, mock(ParticipantSyncPublisher.class));
 
         handler.handleParticipantMessage(participantPrimeAckMessage);
         verify(acDefinitionProvider).findAcDefinition(any());
index e352d2f..bebaa33 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.policy.clamp.acm.runtime.supervision;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -37,6 +38,7 @@ import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantDeregisterAckPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRegisterAckPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantRestartPublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
@@ -45,6 +47,7 @@ import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionInfo;
 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantDeregister;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRegister;
@@ -60,25 +63,26 @@ class SupervisionParticipantHandlerTest {
 
     @Test
     void testHandleParticipantDeregister() {
-        var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
+        var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId());
 
         var participantProvider = mock(ParticipantProvider.class);
-        when(participantProvider.findParticipant(CommonTestData.getParticipantId()))
-                .thenReturn(Optional.of(participant));
+        when(participantProvider.findParticipantReplica(replica.getReplicaId()))
+                .thenReturn(Optional.of(replica));
 
         var participantDeregisterMessage = new ParticipantDeregister();
         participantDeregisterMessage.setMessageId(UUID.randomUUID());
         participantDeregisterMessage.setParticipantId(CommonTestData.getParticipantId());
+        participantDeregisterMessage.setReplicaId(replica.getReplicaId());
         var participantDeregisterAckPublisher = mock(ParticipantDeregisterAckPublisher.class);
         var handler =
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         participantDeregisterAckPublisher, mock(AutomationCompositionProvider.class),
                         mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
-                        mock(AcRuntimeParameterGroup.class));
+                        mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
 
         handler.handleParticipantMessage(participantDeregisterMessage);
 
-        verify(participantProvider).saveParticipant(any());
+        verify(participantProvider).deleteParticipantReplica(CommonTestData.getReplicaId());
         verify(participantDeregisterAckPublisher).send(participantDeregisterMessage.getMessageId());
     }
 
@@ -95,7 +99,7 @@ class SupervisionParticipantHandlerTest {
         var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher,
                 mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class),
                 mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
-                mock(AcRuntimeParameterGroup.class));
+                mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
         handler.handleParticipantMessage(participantRegisterMessage);
 
         verify(participantProvider).saveParticipant(any());
@@ -109,13 +113,18 @@ class SupervisionParticipantHandlerTest {
         participantRegisterMessage.setMessageId(UUID.randomUUID());
         var participantId = CommonTestData.getParticipantId();
         participantRegisterMessage.setParticipantId(participantId);
+        participantRegisterMessage.setReplicaId(participantId);
         var supportedElementType = CommonTestData.createParticipantSupportedElementType();
         participantRegisterMessage.setParticipantSupportedElementType(List.of(supportedElementType));
 
         var participant = new Participant();
+        var replica = new ParticipantReplica();
+        replica.setReplicaId(participantId);
         participant.setParticipantId(participantId);
+        participant.getReplicas().put(replica.getReplicaId(), replica);
         var participantProvider = mock(ParticipantProvider.class);
         when(participantProvider.findParticipant(participantId)).thenReturn(Optional.of(participant));
+        when(participantProvider.findParticipantReplica(participantId)).thenReturn(Optional.of(replica));
         var compositionId = UUID.randomUUID();
         var composition2Id = UUID.randomUUID();
         when(participantProvider.getCompositionIds(participantId)).thenReturn(Set.of(compositionId, composition2Id));
@@ -145,7 +154,8 @@ class SupervisionParticipantHandlerTest {
         var participantRestartPublisher = mock(ParticipantRestartPublisher.class);
         var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher,
                 mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider,
-                participantRestartPublisher, CommonTestData.getTestParamaterGroup());
+                participantRestartPublisher, mock(ParticipantSyncPublisher.class),
+                CommonTestData.getTestParamaterGroup());
         handler.handleParticipantMessage(participantRegisterMessage);
 
         verify(participantRegisterAckPublisher).send(participantRegisterMessage.getMessageId(), participantId);
@@ -154,6 +164,65 @@ class SupervisionParticipantHandlerTest {
         verify(participantRestartPublisher).send(any(), any(AutomationCompositionDefinition.class), any());
     }
 
+    @Test
+    void testHandleParticipantSyncRestart() {
+        var participantRegisterMessage = new ParticipantRegister();
+        participantRegisterMessage.setMessageId(UUID.randomUUID());
+        var participantId = CommonTestData.getParticipantId();
+        participantRegisterMessage.setParticipantId(participantId);
+        var replicaId = CommonTestData.getReplicaId();
+        participantRegisterMessage.setReplicaId(replicaId);
+        var supportedElementType = CommonTestData.createParticipantSupportedElementType();
+        participantRegisterMessage.setParticipantSupportedElementType(List.of(supportedElementType));
+
+        var participant = new Participant();
+        var replica = new ParticipantReplica();
+        replica.setReplicaId(replicaId);
+        participant.setParticipantId(participantId);
+        participant.getReplicas().put(replica.getReplicaId(), replica);
+        var participantProvider = mock(ParticipantProvider.class);
+        when(participantProvider.findParticipant(participantId)).thenReturn(Optional.of(participant));
+        when(participantProvider.findParticipantReplica(replicaId)).thenReturn(Optional.of(replica));
+        var compositionId = UUID.randomUUID();
+        var composition2Id = UUID.randomUUID();
+        when(participantProvider.getCompositionIds(participantId)).thenReturn(Set.of(compositionId, composition2Id));
+
+        var acDefinitionProvider = mock(AcDefinitionProvider.class);
+        var acDefinition = new AutomationCompositionDefinition();
+        acDefinition.setState(AcTypeState.COMMISSIONED);
+        acDefinition.setCompositionId(composition2Id);
+        when(acDefinitionProvider.getAcDefinition(composition2Id)).thenReturn(acDefinition);
+
+        acDefinition = new AutomationCompositionDefinition();
+        acDefinition.setCompositionId(compositionId);
+        acDefinition.setState(AcTypeState.PRIMED);
+        var nodeTemplateState = new NodeTemplateState();
+        nodeTemplateState.setParticipantId(participantId);
+        acDefinition.setElementStateMap(Map.of("code", nodeTemplateState));
+        when(acDefinitionProvider.getAcDefinition(compositionId)).thenReturn(acDefinition);
+
+        var automationComposition =
+                InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_CREATE_JSON, "Crud");
+        automationComposition.getElements().values().iterator().next().setParticipantId(participantId);
+        var automationCompositionProvider = mock(AutomationCompositionProvider.class);
+        when(automationCompositionProvider.getAcInstancesByCompositionId(compositionId))
+                .thenReturn(List.of(automationComposition));
+
+        var participantRegisterAckPublisher = mock(ParticipantRegisterAckPublisher.class);
+        var participantSyncPublisher = mock(ParticipantSyncPublisher.class);
+        var handler = new SupervisionParticipantHandler(participantProvider, participantRegisterAckPublisher,
+                mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider, acDefinitionProvider,
+                mock(ParticipantRestartPublisher.class), participantSyncPublisher,
+                CommonTestData.getTestParamaterGroup());
+        handler.handleParticipantMessage(participantRegisterMessage);
+
+        verify(participantRegisterAckPublisher).send(participantRegisterMessage.getMessageId(), participantId);
+        verify(acDefinitionProvider, times(0)).updateAcDefinition(any(AutomationCompositionDefinition.class),
+                eq(CommonTestData.TOSCA_COMP_NAME));
+        verify(participantSyncPublisher)
+                .sendRestartMsg(any(), any(), any(AutomationCompositionDefinition.class), any());
+    }
+
     @Test
     void testHandleParticipantStatus() {
         var participantStatusMessage = createParticipantStatus();
@@ -165,7 +234,7 @@ class SupervisionParticipantHandlerTest {
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider,
                         mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
-                        mock(AcRuntimeParameterGroup.class));
+                        mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
         var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
         when(participantProvider.findParticipant(CommonTestData.getParticipantId()))
                 .thenReturn(Optional.of(participant));
@@ -201,7 +270,7 @@ class SupervisionParticipantHandlerTest {
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         mock(ParticipantDeregisterAckPublisher.class), mock(AutomationCompositionProvider.class),
                         acDefinitionProvider, mock(ParticipantRestartPublisher.class),
-                       CommonTestData.getTestParamaterGroup());
+                        mock(ParticipantSyncPublisher.class), CommonTestData.getTestParamaterGroup());
         handler.handleParticipantMessage(participantStatusMessage);
 
         verify(acDefinitionProvider).updateAcDefinition(acDefinition, CommonTestData.TOSCA_COMP_NAME);
@@ -218,7 +287,7 @@ class SupervisionParticipantHandlerTest {
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider,
                         mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
-                        mock(AcRuntimeParameterGroup.class));
+                        mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
         handler.handleParticipantMessage(participantStatusMessage);
 
         verify(participantProvider).saveParticipant(any());
@@ -236,9 +305,8 @@ class SupervisionParticipantHandlerTest {
                 new SupervisionParticipantHandler(participantProvider, mock(ParticipantRegisterAckPublisher.class),
                         mock(ParticipantDeregisterAckPublisher.class), automationCompositionProvider,
                         mock(AcDefinitionProvider.class), mock(ParticipantRestartPublisher.class),
-                        mock(AcRuntimeParameterGroup.class));
+                        mock(ParticipantSyncPublisher.class), mock(AcRuntimeParameterGroup.class));
         var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
-        participant.setParticipantState(ParticipantState.OFF_LINE);
         when(participantProvider.findParticipant(CommonTestData.getParticipantId()))
                 .thenReturn(Optional.of(participant));
         handler.handleParticipantMessage(participantStatusMessage);
index 690ad96..0ae1c1a 100644 (file)
@@ -29,28 +29,26 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
-import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.persistence.provider.ParticipantProvider;
 
 class SupervisionParticipantScannerTest {
 
     @Test
     void testScanParticipant() {
-        var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant");
-        acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1);
-
-        var participant = CommonTestData.createParticipant(CommonTestData.getParticipantId());
         var participantProvider = mock(ParticipantProvider.class);
-        when(participantProvider.getParticipants()).thenReturn(List.of(participant));
+        var replica = CommonTestData.createParticipantReplica(CommonTestData.getReplicaId());
+        when(participantProvider.findReplicasOnLine()).thenReturn(List.of(replica));
 
-        var supervisionScanner = new SupervisionPartecipantScanner(participantProvider, acRuntimeParameterGroup);
+        var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanParticipant");
+        var supervisionScanner = new SupervisionParticipantScanner(participantProvider, acRuntimeParameterGroup);
 
-        participant.setParticipantState(ParticipantState.OFF_LINE);
+        acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(100000);
         supervisionScanner.run();
-        verify(participantProvider, times(0)).saveParticipant(any());
+        verify(participantProvider, times(0)).saveParticipantReplica(any());
 
-        participant.setParticipantState(ParticipantState.ON_LINE);
+        acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1);
+        supervisionScanner = new SupervisionParticipantScanner(participantProvider, acRuntimeParameterGroup);
         supervisionScanner.run();
-        verify(participantProvider, times(1)).saveParticipant(any());
+        verify(participantProvider).deleteParticipantReplica(CommonTestData.getReplicaId());
     }
 }
index d5163be..fa5929f 100644 (file)
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionDeployPublisher;
 import org.onap.policy.clamp.acm.runtime.supervision.comm.AutomationCompositionStateChangePublisher;
+import org.onap.policy.clamp.acm.runtime.supervision.comm.ParticipantSyncPublisher;
 import org.onap.policy.clamp.acm.runtime.util.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -101,7 +102,7 @@ class SupervisionScannerTest {
         var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
         var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
                 mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class),
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
         supervisionScanner.run();
         verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any(), any());
     }
@@ -113,7 +114,7 @@ class SupervisionScannerTest {
         var acRuntimeParameterGroup = CommonTestData.geParameterGroup("dbScanner");
         var supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
                 mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class),
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
         supervisionScanner.run();
         // Ac Definition in Priming state
         verify(acDefinitionProvider, times(0)).updateAcDefinitionState(any(), any(), any(), any());
@@ -121,7 +122,7 @@ class SupervisionScannerTest {
         acRuntimeParameterGroup.getParticipantParameters().setMaxStatusWaitMs(-1);
         supervisionScanner = new SupervisionScanner(mock(AutomationCompositionProvider.class), acDefinitionProvider,
                 mock(AutomationCompositionStateChangePublisher.class), mock(AutomationCompositionDeployPublisher.class),
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
         supervisionScanner.run();
         // set Timeout
         verify(acDefinitionProvider).updateAcDefinitionState(acDefinition.getCompositionId(), acDefinition.getState(),
@@ -164,7 +165,7 @@ class SupervisionScannerTest {
 
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
                 automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
 
         // not in transition
         supervisionScanner.run();
@@ -192,7 +193,7 @@ class SupervisionScannerTest {
 
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
                 automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
         supervisionScanner.run();
 
         verify(automationCompositionProvider).updateAutomationComposition(any(AutomationComposition.class));
@@ -213,7 +214,7 @@ class SupervisionScannerTest {
 
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
                 automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
         supervisionScanner.run();
 
         verify(automationCompositionProvider).deleteAutomationComposition(automationComposition.getInstanceId());
@@ -232,7 +233,7 @@ class SupervisionScannerTest {
 
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
                 automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
 
         supervisionScanner.run();
         verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class));
@@ -263,7 +264,7 @@ class SupervisionScannerTest {
         // verify timeout scenario
         var scannerObj2 = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
                 automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
 
         automationComposition.setStateChangeResult(StateChangeResult.NO_ERROR);
         automationComposition.setLastMsg(TimestampHelper.now());
@@ -312,7 +313,7 @@ class SupervisionScannerTest {
 
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
                 automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
 
         supervisionScanner.run();
 
@@ -347,7 +348,7 @@ class SupervisionScannerTest {
 
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
                 automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
 
         supervisionScanner.run();
         verify(automationCompositionProvider, times(0)).updateAutomationComposition(any(AutomationComposition.class));
@@ -390,7 +391,7 @@ class SupervisionScannerTest {
 
         var supervisionScanner = new SupervisionScanner(automationCompositionProvider, createAcDefinitionProvider(),
                 automationCompositionStateChangePublisher, automationCompositionDeployPublisher,
-                acRuntimeParameterGroup);
+                mock(ParticipantSyncPublisher.class), acRuntimeParameterGroup);
 
         supervisionScanner.run();
 
index 766380a..cab5adf 100644 (file)
@@ -35,7 +35,6 @@ import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
-import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
 import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
 import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler;
 import org.onap.policy.clamp.acm.runtime.supervision.SupervisionParticipantHandler;
@@ -149,7 +148,7 @@ class SupervisionMessagesTest {
     @Test
     void testParticipantPrimePublisherDecommissioning() {
         var publisher = new ParticipantPrimePublisher(mock(ParticipantProvider.class),
-                mock(AcmParticipantProvider.class), mock(AcRuntimeParameterGroup.class));
+                mock(AcRuntimeParameterGroup.class));
         var topicSink = mock(TopicSink.class);
         publisher.active(topicSink);
         publisher.sendDepriming(UUID.randomUUID());
@@ -170,8 +169,7 @@ class SupervisionMessagesTest {
                 participantId);
         var participantProvider = mock(ParticipantProvider.class);
         when(participantProvider.getSupportedElementMap()).thenReturn(supportedElementMap);
-        var publisher = new ParticipantPrimePublisher(participantProvider, mock(AcmParticipantProvider.class),
-                CommonTestData.getTestParamaterGroup());
+        var publisher = new ParticipantPrimePublisher(participantProvider, CommonTestData.getTestParamaterGroup());
         var topicSink = mock(TopicSink.class);
         publisher.active(topicSink);
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
@@ -262,29 +260,70 @@ class SupervisionMessagesTest {
     }
 
     @Test
-    void testParticipantSyncPublisher() {
+    void testParticipantSyncPublisherAutomationComposition() {
         var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
         var topicSink = mock(TopicSink.class);
         publisher.active(topicSink);
 
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
-        var acmDefinition = new AutomationCompositionDefinition();
-        acmDefinition.setCompositionId(UUID.randomUUID());
-        acmDefinition.setServiceTemplate(serviceTemplate);
-        var acElements = AcmUtils
-                .extractAcElementsFromServiceTemplate(serviceTemplate, "");
-        acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED));
-
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
+        publisher.sendSync(serviceTemplate, automationComposition);
+        verify(topicSink).send(anyString());
+    }
+
+    @Test
+    void testParticipantSyncPublisherAcDefinition() {
+        var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+        var topicSink = mock(TopicSink.class);
+        publisher.active(topicSink);
+
+        var acmDefinition = getAcmDefinition();
+        publisher.sendSync(acmDefinition, null);
+        verify(topicSink).send(anyString());
+    }
+
+    @Test
+    void testParticipantSyncPublisherAcDefinitionCommissioned() {
+        var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+        var topicSink = mock(TopicSink.class);
+        publisher.active(topicSink);
 
+        var acmDefinition = getAcmDefinition();
+        acmDefinition.setState(AcTypeState.COMMISSIONED);
+        publisher.sendSync(acmDefinition, UUID.randomUUID());
+        verify(topicSink).send(anyString());
+    }
+
+    @Test
+    void testParticipantSyncPublisherRestart() {
+        var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+        var topicSink = mock(TopicSink.class);
+        publisher.active(topicSink);
+
+        var automationComposition =
+                InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
         var participantId = automationComposition.getElements().values().iterator().next().getParticipantId();
+        var acmDefinition = getAcmDefinition();
         acmDefinition.getElementStateMap().values().iterator().next().setParticipantId(participantId);
-
-        publisher.send(participantId, acmDefinition, List.of(automationComposition));
+        var replicaId = UUID.randomUUID();
+        publisher.sendRestartMsg(participantId, replicaId, acmDefinition, List.of(automationComposition));
         verify(topicSink).send(anyString());
     }
 
+    private AutomationCompositionDefinition getAcmDefinition() {
+        var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
+        var acmDefinition = new AutomationCompositionDefinition();
+        acmDefinition.setCompositionId(UUID.randomUUID());
+        acmDefinition.setState(AcTypeState.PRIMED);
+        acmDefinition.setServiceTemplate(serviceTemplate);
+        var acElements = AcmUtils
+                .extractAcElementsFromServiceTemplate(serviceTemplate, TOSCA_ELEMENT_NAME);
+        acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED));
+        acmDefinition.getElementStateMap().values().forEach(element -> element.setParticipantId(UUID.randomUUID()));
+        return acmDefinition;
+    }
+
     @Test
     void testParticipantRegisterListener() {
         final var participantRegister = new ParticipantRegister();
index e031e0f..c3b5ff9 100644 (file)
@@ -28,6 +28,7 @@ import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeEx
 import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantSupportedElementType;
 import org.onap.policy.clamp.models.acm.utils.AcmUtils;
@@ -88,11 +89,23 @@ public class CommonTestData {
     public static Participant createParticipant(UUID participantId) {
         var participant = new Participant();
         participant.setParticipantId(participantId);
-        participant.setParticipantState(ParticipantState.ON_LINE);
-        participant.setLastMsg(TimestampHelper.now());
         return participant;
     }
 
+    /**
+     * Create a new ParticipantReplica.
+     *
+     * @param replicaId the replica id
+     * @return a new ParticipantReplica
+     */
+    public static ParticipantReplica createParticipantReplica(UUID replicaId) {
+        var replica = new ParticipantReplica();
+        replica.setReplicaId(replicaId);
+        replica.setParticipantState(ParticipantState.ON_LINE);
+        replica.setLastMsg(TimestampHelper.now());
+        return replica;
+    }
+
     /**
      * Create a new ParticipantSupportedElementType.
      *
@@ -105,6 +118,10 @@ public class CommonTestData {
         return supportedElementType;
     }
 
+    public static UUID getReplicaId() {
+        return UUID.fromString("201c62b3-8918-41b9-a747-d21eb79c6c09");
+    }
+
     public static UUID getParticipantId() {
         return UUID.fromString("101c62b3-8918-41b9-a747-d21eb79c6c03");
     }