Add Synchronization topic in acm runtime 56/138156/2
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>
Mon, 10 Jun 2024 16:08:04 +0000 (17:08 +0100)
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>
Tue, 11 Jun 2024 08:57:17 +0000 (09:57 +0100)
New sync topic for acm-ppnt synchronization
Added publisher for the sync topic
Refactor MessageDispatcherActivator for processing more than one topic
parameter.

Issue-ID: POLICY-5030
Change-Id: Id765b433beaf3f51fad9a9c66403a93d21c33797
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
19 files changed:
models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantMessageType.java
models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java
models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java [new file with mode: 0644]
models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestartTest.java
models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java [new file with mode: 0644]
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java [new file with mode: 0644]
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java [new file with mode: 0644]
runtime-acm/src/main/resources/application.yaml
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivatorTest.java
runtime-acm/src/test/java/org/onap/policy/clamp/acm/runtime/supervision/comm/SupervisionMessagesTest.java
runtime-acm/src/test/resources/application-prometheus-noauth.yaml
runtime-acm/src/test/resources/application-test.yaml
runtime-acm/src/test/resources/parameters/TestParameters.json

index 29c2c01..e6e42e8 100644 (file)
@@ -110,5 +110,10 @@ public enum ParticipantMessageType {
      * Used by acm runtime to migrate from a composition to another one in participants, triggers a
      * AUTOMATION_COMPOSITION_MIGRATION message with result of AUTOMATION_COMPOSITION_STATE_CHANGE operation.
      */
-    AUTOMATION_COMPOSITION_MIGRATION
+    AUTOMATION_COMPOSITION_MIGRATION,
+
+    /**
+     * Used by runtime to send composition and instances to sync participant replicas.
+     */
+    PARTICIPANT_SYNC_MSG
 }
index 103be68..119cdf0 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023,2024 Nordix Foundation.
+ *  Copyright (C) 2023-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ public class ParticipantRestart extends ParticipantMessage {
     // element definition
     private List<ParticipantDefinition> participantDefinitionUpdates = new ArrayList<>();
 
-    // automationcomposition instances list
+    // automation composition instances list
     private List<ParticipantRestartAc> automationcompositionList = new ArrayList<>();
 
     /**
@@ -51,6 +51,14 @@ public class ParticipantRestart extends ParticipantMessage {
         super(ParticipantMessageType.PARTICIPANT_RESTART);
     }
 
+    /**
+     * Constructor with message type.
+     * @param messageType messageType
+     */
+    public ParticipantRestart(ParticipantMessageType messageType) {
+        super(messageType);
+    }
+
     /**
      * Constructs the object, making a deep copy.
      *
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
new file mode 100644 (file)
index 0000000..33a7309
--- /dev/null
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.messages.kafka.participant;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@ToString(callSuper = true)
+public class ParticipantSync extends ParticipantRestart {
+
+    /**
+     * Constructor.
+     */
+    public ParticipantSync() {
+        super(ParticipantMessageType.PARTICIPANT_SYNC_MSG);
+    }
+
+    /**
+     * Constructs the object, making a deep copy.
+     *
+     * @param source source from which to copy
+     */
+    public ParticipantSync(ParticipantSync source) {
+        super(source);
+    }
+}
index 3353de6..95b718e 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.onap.policy.clamp.models.acm.messages.kafka.participant;
 
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.assertSerializable;
 import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.removeVariableFields;
@@ -43,7 +42,6 @@ class ParticipantRestartTest {
 
     @Test
     void testCopyConstructor() throws CoderException {
-        assertThatThrownBy(() -> new ParticipantRestart(null)).isInstanceOf(NullPointerException.class);
 
         final var orig = new ParticipantRestart();
         // verify with null values
diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSyncTest.java
new file mode 100644 (file)
index 0000000..970b948
--- /dev/null
@@ -0,0 +1,86 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.messages.kafka.participant;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.assertSerializable;
+import static org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageUtils.removeVariableFields;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
+import org.onap.policy.clamp.models.acm.utils.CommonTestData;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+
+
+
+public class ParticipantSyncTest {
+
+    @Test
+    void testCopyConstructor() throws CoderException {
+
+        final var orig = new ParticipantSync();
+        // verify with null values
+        assertEquals(removeVariableFields(orig.toString()),
+                removeVariableFields(new ParticipantSync(orig).toString()));
+
+        orig.setMessageId(UUID.randomUUID());
+        orig.setCompositionId(UUID.randomUUID());
+        orig.setTimestamp(Instant.ofEpochMilli(3000));
+        orig.setParticipantId(CommonTestData.getParticipantId());
+
+        var participantDefinitionUpdate = new ParticipantDefinition();
+        var type = new ToscaConceptIdentifier("id", "1.2.3");
+        var acDefinition = CommonTestData.getAcElementDefinition(type);
+        participantDefinitionUpdate.setAutomationCompositionElementDefinitionList(List.of(acDefinition));
+        orig.setParticipantDefinitionUpdates(List.of(participantDefinitionUpdate));
+
+        var acElement = new AcElementRestart();
+        acElement.setId(UUID.randomUUID());
+        var id = new ToscaConceptIdentifier("id", "1.2.3");
+        acElement.setDefinition(id);
+        acElement.setDeployState(DeployState.DEPLOYED);
+        acElement.setLockState(LockState.LOCKED);
+        acElement.setOperationalState("OperationalState");
+        acElement.setUseState("UseState");
+        acElement.setProperties(Map.of("key", "value"));
+        acElement.setOutProperties(Map.of("keyOut", "valueOut"));
+
+        var acRestart = new ParticipantRestartAc();
+        acRestart.setAcElementList(List.of(acElement));
+        acRestart.setAutomationCompositionId(UUID.randomUUID());
+
+        orig.setAutomationcompositionList(List.of(acRestart));
+
+        assertEquals(removeVariableFields(orig.toString()),
+                removeVariableFields(new ParticipantSync(orig).toString()));
+
+        assertSerializable(orig, ParticipantSync.class);
+    }
+}
index 0d9de20..a3e55c3 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation.
+ *  Copyright (C) 2021,2024 Nordix Foundation.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,9 +24,12 @@ package org.onap.policy.clamp.acm.runtime.config.messaging;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
 import lombok.Getter;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
 import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
+import org.onap.policy.common.endpoints.event.comm.Topic;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -65,8 +68,14 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
         topicSources = TopicEndpointManager.getManager()
                 .addTopicSources(acRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
 
+        var topics = acRuntimeParameterGroup.getTopics();
+
         msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
 
+        var topicMap = topicSinks.stream()
+                .collect(Collectors.toMap(Topic::getTopic, UnaryOperator.identity()));
+
+
         // @formatter:off
         addAction("Topic endpoint management",
             () -> TopicEndpointManager.getManager().start(),
@@ -74,7 +83,8 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
 
         publishers.forEach(publisher ->
             addAction("Publisher " + publisher.getClass().getSimpleName(),
-                () -> publisher.active(topicSinks),
+                () -> publisher.active(publisher.isDefaultTopic() ? topicMap.get(topics.getOperationTopic())
+                        : topicMap.get(topics.getSyncTopic())),
                 publisher::stop));
 
         listeners.forEach(listener ->
@@ -90,7 +100,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
      * Registers the dispatcher with the topic source(s).
      */
     private void registerMsgDispatcher() {
-        for (final TopicSource source : topicSources) {
+        for (final var source : topicSources) {
             source.register(msgDispatcher);
         }
     }
@@ -99,7 +109,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
      * Unregisters the dispatcher from the topic source(s).
      */
     private void unregisterMsgDispatcher() {
-        for (final TopicSource source : topicSources) {
+        for (final var source : topicSources) {
             source.unregister(msgDispatcher);
         }
     }
index a7acc47..a76a09d 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation.
+ *  Copyright (C) 2021,2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
 package org.onap.policy.clamp.acm.runtime.config.messaging;
 
 import java.util.List;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 
 /**
@@ -28,7 +29,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink;
  */
 public interface Publisher {
 
-    void active(List<TopicSink> topicSinks);
+    void active(TopicSink topicSink);
 
     void stop();
+
+    boolean isDefaultTopic();
 }
index a30b531..a0b6fe1 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021,2023 Nordix Foundation.
+ *  Copyright (C) 2021,2023-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -50,4 +50,8 @@ public class AcRuntimeParameterGroup {
     @Valid
     @NotNull
     private AcmParameters acmParameters = new AcmParameters();
+
+    @Valid
+    @NotNull
+    private Topics topics;
 }
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java
new file mode 100644 (file)
index 0000000..d485a24
--- /dev/null
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.main.parameters;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.validation.annotation.Validated;
+
+@Getter
+@Setter
+@Validated
+@AllArgsConstructor
+public class Topics {
+
+    private String operationTopic;
+    private String syncTopic;
+}
index 246d1c1..5014f7d 100644 (file)
@@ -22,7 +22,9 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
 
 import jakarta.ws.rs.core.Response.Status;
 import java.util.List;
+import java.util.Optional;
 import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
 import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
@@ -47,11 +49,8 @@ public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMe
 
 
     @Override
-    public void active(List<TopicSink> topicSinks) {
-        if (topicSinks.size() != 1) {
-            throw new IllegalArgumentException("Topic Sink must be one");
-        }
-        this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+    public void active(TopicSink topicSink) {
+        this.topicSinkClient = new TopicSinkClient(topicSink);
         active = true;
     }
 
@@ -59,4 +58,13 @@ public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMe
     public void stop() {
         active = false;
     }
+
+    /**
+     * Is default topic.
+     * @return true if default
+     */
+    @Override
+    public boolean isDefaultTopic() {
+        return true;
+    }
 }
index d17cd73..5afb7eb 100644 (file)
@@ -22,12 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
 
 import jakarta.ws.rs.core.Response.Status;
 import java.util.List;
+import java.util.Optional;
 import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
 import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
 
+
 public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> implements Publisher {
 
     private TopicSinkClient topicSinkClient;
@@ -47,11 +50,8 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage>
 
 
     @Override
-    public void active(List<TopicSink> topicSinks) {
-        if (topicSinks.size() != 1) {
-            throw new IllegalArgumentException("Topic Sink must be one");
-        }
-        this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+    public void active(TopicSink topicSink) {
+        this.topicSinkClient = new TopicSinkClient(topicSink);
         active = true;
     }
 
@@ -59,4 +59,13 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage>
     public void stop() {
         active = false;
     }
+
+    /**
+     * Is default topic.
+     * @return true if default
+     */
+    @Override
+    public boolean isDefaultTopic() {
+        return true;
+    }
 }
index 50fa6d1..4f28eab 100644 (file)
@@ -86,12 +86,12 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa
         super.send(message);
     }
 
-    private List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
+    protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
             AutomationCompositionDefinition acmDefinition) {
         var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
                 acRuntimeParameterGroup.getAcmParameters().getToscaElementName());
 
-        // list of entry entry filtered by participantId
+        // list of entry filtered by participantId
         List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
         Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
         for (var elementEntry : acElements) {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
new file mode 100644 (file)
index 0000000..ae7eda1
--- /dev/null
@@ -0,0 +1,101 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.supervision.comm;
+
+import io.micrometer.core.annotation.Timed;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
+import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class);
+
+    private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+
+    public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) {
+        super(acRuntimeParameterGroup);
+        this.acRuntimeParameterGroup = acRuntimeParameterGroup;
+    }
+
+
+    /**
+     * Send sync msg to Participant.
+     *
+     * @param participantId the ParticipantId
+     * @param acmDefinition the AutomationComposition Definition
+     * @param automationCompositions the list of automationCompositions
+     */
+    @Override
+    @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+    public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
+                     List<AutomationComposition> automationCompositions) {
+
+        var message = new ParticipantSync();
+        message.setParticipantId(participantId);
+        message.setCompositionId(acmDefinition.getCompositionId());
+        message.setMessageId(UUID.randomUUID());
+        message.setTimestamp(Instant.now());
+        message.setState(acmDefinition.getState());
+        message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+        var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
+
+        for (var automationComposition : automationCompositions) {
+            var syncAc = new ParticipantRestartAc();
+            syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+            for (var element : automationComposition.getElements().values()) {
+                if (participantId.equals(element.getParticipantId())) {
+                    var acElementSync = AcmUtils.createAcElementRestart(element);
+                    acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
+                    syncAc.getAcElementList().add(acElementSync);
+                }
+            }
+            message.getAutomationcompositionList().add(syncAc);
+        }
+
+        LOGGER.debug("Participant Sync sent {}", message);
+        super.send(message);
+    }
+
+    /**
+     * Is default topic.
+     * @return true if default
+     */
+    @Override
+    public boolean isDefaultTopic() {
+        return false;
+    }
+
+}
index d93418e..58e590b 100644 (file)
@@ -40,20 +40,29 @@ server:
     path: /error
 
 runtime:
+  topics:
+    operationTopic: policy-acruntime-participant
+    syncTopic: acm-ppnt-sync
   participantParameters:
     heartBeatMs: 20000
     maxStatusWaitMs: 200000
   topicParameterGroup:
     topicSources:
       -
-        topic: policy-acruntime-participant
+        topic: ${runtime.topics.operationTopic}
         servers:
           - ${topicServer:kafka:9092}
         topicCommInfrastructure: NOOP
         fetchTimeout: 15000
     topicSinks:
       -
-        topic: policy-acruntime-participant
+        topic: ${runtime.topics.operationTopic}
+        servers:
+          - ${topicServer:kafka:9092}
+        topicCommInfrastructure: NOOP
+
+      -
+        topic: ${runtime.topics.syncTopic}
         servers:
           - ${topicServer:kafka:9092}
         topicCommInfrastructure: NOOP
index 899e35f..66595c8 100644 (file)
@@ -78,8 +78,8 @@ class MessageDispatcherActivatorTest {
             // repeat start - should throw an exception
             assertThatIllegalStateException().isThrownBy(activator::start);
             assertTrue(activator.isAlive());
-            verify(publisherFirst, times(1)).active(anyList());
-            verify(publisherSecond, times(1)).active(anyList());
+            verify(publisherFirst, times(1)).active(any());
+            verify(publisherSecond, times(1)).active(any());
 
             var sco = CODER.decode("{messageType:" + TOPIC_FIRST + "}", StandardCoderObject.class);
             activator.getMsgDispatcher().onTopicEvent(null, "msg", sco);
index 295d2d7..31cd659 100644 (file)
@@ -36,6 +36,7 @@ import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.runtime.instantiation.InstantiationUtils;
 import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
 import org.onap.policy.clamp.acm.runtime.participants.AcmParticipantProvider;
 import org.onap.policy.clamp.acm.runtime.supervision.SupervisionAcHandler;
 import org.onap.policy.clamp.acm.runtime.supervision.SupervisionHandler;
@@ -74,7 +75,7 @@ class SupervisionMessagesTest {
     void testSendParticipantRegisterAck() {
         var acRegisterAckPublisher = new ParticipantRegisterAckPublisher();
         var topicSink = mock(TopicSink.class);
-        acRegisterAckPublisher.active(List.of(topicSink));
+        acRegisterAckPublisher.active(topicSink);
         acRegisterAckPublisher.send(new ParticipantRegisterAck());
         verify(topicSink).send(anyString());
         acRegisterAckPublisher.stop();
@@ -100,7 +101,7 @@ class SupervisionMessagesTest {
     void testSendParticipantDeregisterAck() {
         var acDeregisterAckPublisher = new ParticipantDeregisterAckPublisher();
         var topicSink = mock(TopicSink.class);
-        acDeregisterAckPublisher.active(Collections.singletonList(topicSink));
+        acDeregisterAckPublisher.active(topicSink);
         acDeregisterAckPublisher.send(new ParticipantDeregisterAck());
         verify(topicSink).send(anyString());
         acDeregisterAckPublisher.stop();
@@ -140,7 +141,7 @@ class SupervisionMessagesTest {
     void testSendAutomationCompositionStateChangePublisher() {
         var publisher = new AutomationCompositionStateChangePublisher();
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
         publisher.send(getAutomationComposition(), 0, true);
         verify(topicSink).send(anyString());
         publisher.stop();
@@ -151,7 +152,7 @@ class SupervisionMessagesTest {
         var publisher = new ParticipantPrimePublisher(mock(ParticipantProvider.class),
                 mock(AcmParticipantProvider.class), mock(AcRuntimeParameterGroup.class));
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
         publisher.sendDepriming(UUID.randomUUID());
         verify(topicSink).send(anyString());
     }
@@ -173,7 +174,7 @@ class SupervisionMessagesTest {
         var publisher = new ParticipantPrimePublisher(participantProvider, mock(AcmParticipantProvider.class),
                 CommonTestData.getTestParamaterGroup());
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         serviceTemplate.setName("Name");
         serviceTemplate.setVersion("1.0.0");
@@ -192,7 +193,7 @@ class SupervisionMessagesTest {
     void testParticipantStatusReqPublisher() {
         var publisher = new ParticipantStatusReqPublisher();
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
         publisher.send(CommonTestData.getParticipantId());
         verify(topicSink).send(anyString());
     }
@@ -201,7 +202,7 @@ class SupervisionMessagesTest {
     void testParticipantRegisterAckPublisher() {
         var publisher = new ParticipantRegisterAckPublisher();
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
         publisher.send(UUID.randomUUID(), CommonTestData.getParticipantId());
         verify(topicSink).send(anyString());
     }
@@ -210,7 +211,7 @@ class SupervisionMessagesTest {
     void testParticipantDeregisterAckPublisher() {
         var publisher = new ParticipantDeregisterAckPublisher();
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
         publisher.send(UUID.randomUUID());
         verify(topicSink).send(anyString());
     }
@@ -219,7 +220,7 @@ class SupervisionMessagesTest {
     void testAcElementPropertiesPublisher() {
         var publisher = new AcElementPropertiesPublisher();
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
         publisher.send(automationComposition);
@@ -230,7 +231,7 @@ class SupervisionMessagesTest {
     void testAutomationCompositionMigrationPublisher() {
         var publisher = new AutomationCompositionMigrationPublisher();
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
         var automationComposition =
                 InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
         publisher.send(automationComposition, UUID.randomUUID());
@@ -241,7 +242,31 @@ class SupervisionMessagesTest {
     void testParticipantRestartPublisher() {
         var publisher = new ParticipantRestartPublisher(CommonTestData.getTestParamaterGroup());
         var topicSink = mock(TopicSink.class);
-        publisher.active(List.of(topicSink));
+        publisher.active(topicSink);
+
+        var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
+        var acmDefinition = new AutomationCompositionDefinition();
+        acmDefinition.setCompositionId(UUID.randomUUID());
+        acmDefinition.setServiceTemplate(serviceTemplate);
+        var acElements = AcmUtils
+                .extractAcElementsFromServiceTemplate(serviceTemplate, "");
+        acmDefinition.setElementStateMap(AcmUtils.createElementStateMap(acElements, AcTypeState.PRIMED));
+
+        var automationComposition =
+                InstantiationUtils.getAutomationCompositionFromResource(AC_INSTANTIATION_UPDATE_JSON, "Crud");
+
+        var participantId = automationComposition.getElements().values().iterator().next().getParticipantId();
+        acmDefinition.getElementStateMap().values().iterator().next().setParticipantId(participantId);
+
+        publisher.send(participantId, acmDefinition, List.of(automationComposition));
+        verify(topicSink).send(anyString());
+    }
+
+    @Test
+    void testParticipantSyncPublisher() {
+        var publisher = new ParticipantSyncPublisher(CommonTestData.getTestParamaterGroup());
+        var topicSink = mock(TopicSink.class);
+        publisher.active(topicSink);
 
         var serviceTemplate = InstantiationUtils.getToscaServiceTemplate(TOSCA_SERVICE_TEMPLATE_YAML);
         var acmDefinition = new AutomationCompositionDefinition();
index 1c71252..620e753 100644 (file)
@@ -19,13 +19,16 @@ server:
     context-path: /onap/policy/clamp/acm
 
 runtime:
+  topics:
+    operationTopic: policy-acruntime-participant
+    syncTopic: acm-ppnt-sync
   participantParameters:
     updateParameters:
       maxRetryCount: 3
   topicParameterGroup:
     topicSources:
       -
-        topic: POLICY-ACRUNTIME-PARTICIPANT
+        topic: ${runtime.topics.operationTopic}
         servers:
           - localhost
         topicCommInfrastructure: noop
@@ -35,7 +38,12 @@ runtime:
         topicCommInfrastructure: noop
         servers:
           - localhost
-        topic: POLICY-ACRUNTIME-PARTICIPANT
+        topic: ${runtime.topics.operationTopic}
+
+      - topic: ${runtime.topics.syncTopic}
+        servers:
+          - ${topicServer:kafka:9092}
+        topicCommInfrastructure: noop
 
 tracing:
   enabled: true
index 13b1f78..5d616d5 100644 (file)
@@ -19,13 +19,16 @@ server:
     context-path: /onap/policy/clamp/acm
 
 runtime:
+  topics:
+    operationTopic: policy-acruntime-participant
+    syncTopic: acm-ppnt-sync
   participantParameters:
     updateParameters:
       maxRetryCount: 3
   topicParameterGroup:
     topicSources:
       -
-        topic: policy-acruntime-participant
+        topic: ${runtime.topics.operationTopic}
         servers:
           - kafka:9092
         topicCommInfrastructure: NOOP
@@ -35,7 +38,12 @@ runtime:
         topicCommInfrastructure: NOOP
         servers:
           - kafka:9092
-        topic: policy-acruntime-participant
+        topic: ${runtime.topics.operationTopic}
+      -
+        topic: ${runtime.topics.syncTopic}
+        servers:
+            - ${topicServer:kafka:9092}
+        topicCommInfrastructure: NOOP
   acmParameters:
     acElementName: org.onap.policy.clamp.acm.AutomationCompositionElement
     acNodeType: org.onap.policy.clamp.acm.AutomationComposition
index 8192b72..1558abc 100644 (file)
         "databasePassword": "P01icY",
         "persistenceUnit": "InstantiationTests"
     },
+    "topics":{
+        "operationTopic": "policy-acruntime-participant",
+        "syncTopic": "acm-ppnt-sync"
+    },
     "topicParameterGroup": {
+
         "topicSources": [
             {
-                "topic": "POLICY-ACRUNTIME-PARTICIPANT",
+                "topic": "${topics.operationTopic}",
                 "servers": [
                     "localhost"
                 ],
         ],
         "topicSinks": [
             {
-                "topic": "POLICY-ACRUNTIME-PARTICIPANT",
+                "topic": "${topics.operationTopic}",
+                "servers": [
+                    "localhost"
+                ],
+                "topicCommInfrastructure": "NOOP"
+            },
+            {
+                "topic": "${topics.syncTopic}",
                 "servers": [
                     "localhost"
                 ],