Add Sync topic for participant Intermediary 60/138160/2
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>
Tue, 11 Jun 2024 10:23:44 +0000 (11:23 +0100)
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>
Tue, 11 Jun 2024 12:46:56 +0000 (13:46 +0100)
Add new sync topic config for Intermediary
Add sync topic listener
Refactor IntermediaryActivator for processing multiple topic source

Issue-ID: POLICY-5030
Change-Id: Idce9839a85571a92048e589bd82ce33699add640
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
20 files changed:
participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
participant/participant-impl/participant-impl-kubernetes/src/test/java/org/onap/policy/clamp/acm/participant/kubernetes/parameters/CommonTestData.java
participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/acm/participant/policy/main/parameters/CommonTestData.java
participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
participant/participant-impl/participant-impl-simulator/src/test/java/org/onap/policy/clamp/acm/participant/sim/comm/CommonTestData.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java [new file with mode: 0644]
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java [new file with mode: 0644]
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantCommTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivatorTest.java
participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/main/parameters/CommonTestData.java

index 011fafe..18ffde6 100644 (file)
@@ -23,18 +23,26 @@ a1pms:
 
 participant:
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c00
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+        - topic: ${participant.intermediaryparameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
index ed68a46..9e86d49 100644 (file)
@@ -14,18 +14,26 @@ security:
   enable-csrf: false
 participant:
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+        - topic: ${participant.intermediaryparameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
index aadad18..6ccd2fc 100644 (file)
@@ -22,18 +22,26 @@ security:
 
 participant:
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c04
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+        - topic: ${participant.intermediaryparameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
index 969d77a..d66faee 100644 (file)
@@ -23,20 +23,26 @@ participant:
   localChartDirectory: /home/policy/local-charts
   infoFileName: CHART_INFO.json
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c02
     clampAutomationCompositionTopics:
       topicSources:
-        -
-          topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+        - topic: ${participant.intermediaryparameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        -
-          topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
index b806cdb..3bb6009 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2023 Nordix Foundation.
+ *  Copyright (C) 2021-2024 Nordix Foundation.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.onap.policy.common.endpoints.parameters.TopicParameters;
@@ -39,7 +40,8 @@ public class CommonTestData {
     public static final String PARTICIPANT_GROUP_NAME = "AutomationCompositionParticipantGroup";
     public static final String DESCRIPTION = "Participant description";
     public static final long TIME_INTERVAL = 2000;
-    public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+    public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getTopicParams());
+    public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getTopicParams(), getSyncTopicParams());
     public static final Coder CODER = new StandardCoder();
     private static final UUID AC_ID = UUID.randomUUID();
     private static final String KEY_NAME =
@@ -110,6 +112,7 @@ public class CommonTestData {
             map.put("participantId", getParticipantId());
             map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
             map.put("participantSupportedElementTypes", new ArrayList<>());
+            map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
         }
 
         return map;
@@ -133,8 +136,8 @@ public class CommonTestData {
     public Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
         final Map<String, Object> map = new TreeMap<>();
         if (!isEmpty) {
-            map.put("topicSources", TOPIC_PARAMS);
-            map.put("topicSinks", TOPIC_PARAMS);
+            map.put("topicSources", SOURCE_TOPIC_PARAMS);
+            map.put("topicSinks", SINK_TOPIC_PARAMS);
         }
         return map;
     }
@@ -152,6 +155,19 @@ public class CommonTestData {
         return topicParams;
     }
 
+    /**
+     * Returns sync topic parameters for test cases.
+     *
+     * @return topic parameters
+     */
+    public static TopicParameters getSyncTopicParams() {
+        final TopicParameters topicParams = new TopicParameters();
+        topicParams.setTopic("acm-ppnt-sync");
+        topicParams.setTopicCommInfrastructure("NOOP");
+        topicParams.setServers(List.of("localhost"));
+        return topicParams;
+    }
+
     /**
      * Get automation composition id.
      * @return UUID automationCompositionId
index 6a357dd..7a0ef8d 100644 (file)
@@ -30,20 +30,26 @@ participant:
     useHttps: true
     allowSelfSignedCerts: true
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c03
     clampAutomationCompositionTopics:
       topicSources:
-        -
-          topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+        - topic: ${participant.intermediaryparameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        -
-          topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
index 3b2550d..555383b 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2023 Nordix Foundation.
+ *  Copyright (C) 2021-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
 import org.onap.policy.common.endpoints.parameters.TopicParameters;
 import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
@@ -37,7 +38,8 @@ public class CommonTestData {
     public static final String PARTICIPANT_GROUP_NAME = "AutomationCompositionParticipantGroup";
     public static final String DESCRIPTION = "Participant description";
     public static final long TIME_INTERVAL = 2000;
-    public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+    public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getSinkTopicParams());
+    public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getSinkTopicParams(), getSyncTopicParams());
 
     public static final Coder CODER = new StandardCoder();
 
@@ -124,6 +126,7 @@ public class CommonTestData {
             map.put("participantId", getParticipantId());
             map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
             map.put("participantSupportedElementTypes", new ArrayList<>());
+            map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
         }
 
         return map;
@@ -138,8 +141,8 @@ public class CommonTestData {
     public Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
         final Map<String, Object> map = new TreeMap<>();
         if (!isEmpty) {
-            map.put("topicSources", TOPIC_PARAMS);
-            map.put("topicSinks", TOPIC_PARAMS);
+            map.put("topicSources", SOURCE_TOPIC_PARAMS);
+            map.put("topicSinks", SINK_TOPIC_PARAMS);
         }
         return map;
     }
@@ -149,7 +152,7 @@ public class CommonTestData {
      *
      * @return topic parameters
      */
-    public static TopicParameters getTopicParams() {
+    public static TopicParameters getSinkTopicParams() {
         final TopicParameters topicParams = new TopicParameters();
         topicParams.setTopic("policy-acruntime-participant");
         topicParams.setTopicCommInfrastructure("NOOP");
@@ -157,6 +160,19 @@ public class CommonTestData {
         return topicParams;
     }
 
+    /**
+     * Returns sync topic parameters for test cases.
+     *
+     * @return topic parameters
+     */
+    public static TopicParameters getSyncTopicParams() {
+        final TopicParameters topicParams = new TopicParameters();
+        topicParams.setTopic("acm-ppnt-sync");
+        topicParams.setTopicCommInfrastructure("NOOP");
+        topicParams.setServers(List.of("localhost"));
+        return topicParams;
+    }
+
     /**
      * Returns participantId for test cases.
      *
index f3731d7..77a3ef4 100644 (file)
@@ -14,18 +14,26 @@ security:
   enable-csrf: false
 participant:
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+        - topic: ${participant.intermediaryparameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryparameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
index 54f1930..5499931 100644 (file)
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
 import org.onap.policy.clamp.acm.participant.sim.parameters.ParticipantSimParameters;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
@@ -37,7 +38,8 @@ public class CommonTestData {
     public static final Coder CODER = new StandardCoder();
     public static final String DESCRIPTION = "Participant description";
     public static final long TIME_INTERVAL = 2000;
-    public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+    public static final List<TopicParameters> SINK_TOPIC_PARAMS = List.of(getSinkTopicParams());
+    public static final List<TopicParameters> SOURCE_TOPIC_PARAMS = List.of(getSinkTopicParams(), getSyncTopicParams());
 
     /**
      * Get ParticipantSimParameters.
@@ -77,6 +79,7 @@ public class CommonTestData {
         map.put("participantId", getParticipantId());
         map.put("clampAutomationCompositionTopics", getTopicParametersMap());
         map.put("participantSupportedElementTypes", new ArrayList<>());
+        map.put("topics", new Topics("policy-acruntime-participant", "acm-ppnt-sync"));
 
         return map;
     }
@@ -88,8 +91,8 @@ public class CommonTestData {
      */
     private static Map<String, Object> getTopicParametersMap() {
         final Map<String, Object> map = new TreeMap<>();
-        map.put("topicSources", TOPIC_PARAMS);
-        map.put("topicSinks", TOPIC_PARAMS);
+        map.put("topicSources", SOURCE_TOPIC_PARAMS);
+        map.put("topicSinks", SINK_TOPIC_PARAMS);
         return map;
     }
 
@@ -98,7 +101,7 @@ public class CommonTestData {
      *
      * @return topic parameters
      */
-    private static TopicParameters getTopicParams() {
+    private static TopicParameters getSinkTopicParams() {
         final TopicParameters topicParams = new TopicParameters();
         topicParams.setTopic("policy-acruntime-participant");
         topicParams.setTopicCommInfrastructure("NOOP");
@@ -106,6 +109,19 @@ public class CommonTestData {
         return topicParams;
     }
 
+    /**
+     * Returns sync topic parameters for test cases.
+     *
+     * @return topic parameters
+     */
+    private static TopicParameters getSyncTopicParams() {
+        final TopicParameters topicParams = new TopicParameters();
+        topicParams.setTopic("acm-ppnt-sync");
+        topicParams.setTopicCommInfrastructure("NOOP");
+        topicParams.setServers(List.of("localhost"));
+        return topicParams;
+    }
+
     /**
      * Returns participantId for test cases.
      *
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java
new file mode 100644 (file)
index 0000000..0b359f9
--- /dev/null
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.comm;
+
+import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ParticipantSyncListener extends ParticipantListener<ParticipantSync> {
+
+    /**
+     * Constructs the object.
+     *
+     * @param participantHandler the handler for managing the state of the participant
+     */
+    public ParticipantSyncListener(ParticipantHandler participantHandler) {
+        super(ParticipantSync.class, participantHandler, participantHandler::handleParticipantSync);
+    }
+
+    @Override
+    public String getType() {
+        return ParticipantMessageType.PARTICIPANT_SYNC_MSG.name();
+    }
+
+    @Override
+    public boolean isDefaultTopic() {
+        return false;
+    }
+}
index a77d524..2c54a22 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation.
+ *  Copyright (C) 2021,2024 Nordix Foundation.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,6 +28,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import lombok.Getter;
 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -55,6 +56,9 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
     @Getter
     private final MessageTypeDispatcher msgDispatcher;
 
+    @Getter
+    private final MessageTypeDispatcher syncMsgDispatcher;
+
     /**
      * Instantiate the activator for participant.
      *
@@ -75,22 +79,32 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
 
         msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
 
+        syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+
         // @formatter:off
         addAction("Topic endpoint management",
             () -> TopicEndpointManager.getManager().start(),
             () -> TopicEndpointManager.getManager().shutdown());
 
-        listeners.forEach(listener ->
-                addAction("Listener " + listener.getClass().getSimpleName(),
+        listeners.stream().filter(Listener::isDefaultTopic)
+                .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
                         () -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
                         () -> msgDispatcher.unregister(listener.getType())));
 
+        listeners.stream().filter(l -> ! l.isDefaultTopic())
+                .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
+                        () -> syncMsgDispatcher.register(listener.getType(), listener.getScoListener()),
+                        () -> syncMsgDispatcher.unregister(listener.getType())));
+
         publishers.forEach(publisher ->
             addAction("Publisher " + publisher.getClass().getSimpleName(),
                 () -> publisher.active(topicSinks),
                 publisher::stop));
 
-        addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
+        var topics = parameters.getIntermediaryParameters().getTopics();
+
+        addAction("Topic Message Dispatcher", () -> this.registerMsgDispatcher(topics),
+                () -> this.unregisterMsgDispatcher(topics));
         // @formatter:on
     }
 
@@ -133,18 +147,26 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
     /**
      * Registers the dispatcher with the topic source(s).
      */
-    private void registerMsgDispatcher() {
-        for (final TopicSource source : topicSources) {
-            source.register(msgDispatcher);
+    private void registerMsgDispatcher(Topics topics) {
+        for (final var source : topicSources) {
+            if (source.getTopic().equals(topics.getOperationTopic())) {
+                source.register(msgDispatcher);
+            } else if (source.getTopic().equals(topics.getSyncTopic())) {
+                source.register(syncMsgDispatcher);
+            }
         }
     }
 
     /**
      * Unregisters the dispatcher from the topic source(s).
      */
-    private void unregisterMsgDispatcher() {
-        for (final TopicSource source : topicSources) {
-            source.unregister(msgDispatcher);
+    private void unregisterMsgDispatcher(Topics topics) {
+        for (final var source : topicSources) {
+            if (source.getTopic().equals(topics.getOperationTopic())) {
+                source.unregister(msgDispatcher);
+            } else if (source.getTopic().equals(topics.getSyncTopic())) {
+                source.unregister(syncMsgDispatcher);
+            }
         }
     }
 
index 56ed554..27585cf 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021 Nordix Foundation.
+ *  Copyright (C) 2021,2024 Nordix Foundation.
  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -38,4 +38,10 @@ public interface Listener<T> {
      * @return listener to register
      */
     ScoListener<T> getScoListener();
+
+    /**
+     * Check if default topic.
+     * @return true if default topic
+     */
+    boolean isDefaultTopic();
 }
index 54a0591..7ac58ae 100644 (file)
@@ -39,6 +39,7 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRe
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.slf4j.Logger;
@@ -208,6 +209,18 @@ public class ParticipantHandler {
         acDefinitionHandler.handleParticipantRestart(participantRestartMsg);
     }
 
+    /**
+     * Handle a ParticipantSync message.
+     *
+     * @param participantSyncMsg the participantSync message
+     */
+    @Timed(value = "listener.participant_sync_msg", description = "PARTICIPANT_SYNC messages received")
+    public void handleParticipantSync(ParticipantSync participantSyncMsg) {
+        LOGGER.debug("ParticipantSync message received for participantId {}",
+                participantSyncMsg.getParticipantId());
+        acDefinitionHandler.handleParticipantRestart(participantSyncMsg);
+    }
+
     /**
      * Dispatch a heartbeat for this participant.
      */
index 5477404..1c36ad1 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2023 Nordix Foundation.
+ *  Copyright (C) 2021-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -60,4 +60,8 @@ public class ParticipantIntermediaryParameters {
     @Valid
     private List<ParticipantSupportedElementType> participantSupportedElementTypes;
 
+    @NotNull
+    @Valid
+    private Topics topics;
+
 }
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java
new file mode 100644 (file)
index 0000000..ddf7205
--- /dev/null
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.parameters;
+
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Class to hold topic names for operation and synchronization.
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+public class Topics {
+
+    @NotNull
+    @Valid
+    private String operationTopic;
+
+    @NotNull
+    @Valid
+    private String syncTopic;
+}
index 10f9d45..d7c97bd 100644 (file)
@@ -82,6 +82,10 @@ class ParticipantCommTest {
         assertEquals(ParticipantMessageType.PARTICIPANT_RESTART.name(),
                 participantRestartListener.getType());
 
+        var participantSyncListener = new ParticipantSyncListener(participantHandler);
+        assertEquals(ParticipantMessageType.PARTICIPANT_SYNC_MSG.name(),
+                participantSyncListener.getType());
+
         var acMigrationListener = new AutomationCompositionMigrationListener(participantHandler);
         assertEquals(ParticipantMessageType.AUTOMATION_COMPOSITION_MIGRATION.name(), acMigrationListener.getType());
     }
index ac9cbfa..8868c73 100644 (file)
@@ -34,8 +34,10 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import org.junit.jupiter.api.Test;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantStatusReqListener;
+import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantSyncListener;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
 import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
@@ -59,10 +61,12 @@ class IntermediaryActivatorTest {
         var listenerFirst = mock(ParticipantStatusReqListener.class);
         when(listenerFirst.getType()).thenReturn(TOPIC_FIRST);
         when(listenerFirst.getScoListener()).thenReturn(listenerFirst);
+        when(listenerFirst.isDefaultTopic()).thenReturn(true);
 
         var listenerSecond = mock(ParticipantStatusReqListener.class);
         when(listenerSecond.getType()).thenReturn(TOPIC_SECOND);
         when(listenerSecond.getScoListener()).thenReturn(listenerSecond);
+        when(listenerSecond.isDefaultTopic()).thenReturn(false);
 
         List<Listener<ParticipantStatusReq>> listeners = List.of(listenerFirst, listenerSecond);
 
@@ -84,7 +88,7 @@ class IntermediaryActivatorTest {
             verify(listenerFirst, times(1)).onTopicEvent(any(), any(), any());
 
             sco = CODER.decode("{messageType:" + TOPIC_SECOND + "}", StandardCoderObject.class);
-            activator.getMsgDispatcher().onTopicEvent(null, "msg", sco);
+            activator.getSyncMsgDispatcher().onTopicEvent(null, "msg", sco);
             verify(listenerSecond, times(1)).onTopicEvent(any(), any(), any());
 
             activator.close();
index 1536a0b..e8cafa9 100644 (file)
@@ -30,6 +30,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 import org.onap.policy.clamp.acm.participant.intermediary.handler.DummyParticipantParameters;
 import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AcElementRestart;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
@@ -58,6 +59,7 @@ public class CommonTestData {
     public static final String DESCRIPTION = "Participant description";
     public static final long TIME_INTERVAL = 2000;
     public static final List<TopicParameters> TOPIC_PARAMS = List.of(getTopicParams());
+    public static final List<TopicParameters> TOPIC_SOURCE_PARAMS = List.of(getTopicParams(), getSyncTopicParams());
     public static final Coder CODER = new StandardCoder();
     public static final UUID AC_ID_0 = UUID.randomUUID();
     public static final UUID AC_ID_1 = UUID.randomUUID();
@@ -116,6 +118,7 @@ public class CommonTestData {
         map.put("description", DESCRIPTION);
         map.put("reportingTimeIntervalMs", TIME_INTERVAL);
         map.put("clampAutomationCompositionTopics", getTopicParametersMap(false));
+        map.put("topics", getTopics());
         var supportedElementType = new ParticipantSupportedElementType();
         supportedElementType.setTypeName("org.onap.policy.clamp.acm.HttpAutomationCompositionElement");
         supportedElementType.setTypeVersion("1.0.0");
@@ -133,7 +136,7 @@ public class CommonTestData {
     public static Map<String, Object> getTopicParametersMap(final boolean isEmpty) {
         final Map<String, Object> map = new TreeMap<>();
         if (!isEmpty) {
-            map.put("topicSources", TOPIC_PARAMS);
+            map.put("topicSources", TOPIC_SOURCE_PARAMS);
             map.put("topicSinks", TOPIC_PARAMS);
         }
         return map;
@@ -152,6 +155,22 @@ public class CommonTestData {
         return topicParams;
     }
 
+    /**
+     * Returns topic parameters for sync topic.
+     * @return topicparamaters
+     */
+    public static TopicParameters getSyncTopicParams() {
+        final var topicParams = new TopicParameters();
+        topicParams.setTopic("acm-ppnt-sync");
+        topicParams.setTopicCommInfrastructure("NOOP");
+        topicParams.setServers(List.of("localhost"));
+        return topicParams;
+    }
+
+    private static Topics getTopics() {
+        return new Topics("policy-acruntime-participant", "acm-ppnt-sync");
+    }
+
     /**
      * Returns participantId for test cases.
      *