Use separate subscription for heartbeats 62/122462/2
authorJim Hahn <jrh3@att.com>
Tue, 6 Jul 2021 14:19:52 +0000 (10:19 -0400)
committerJim Hahn <jrh3@att.com>
Tue, 6 Jul 2021 14:55:13 +0000 (10:55 -0400)
Separated the heartbeat processing onto its own POLICY-HEARTBEAT topic,
still on the "real" (i.e., "effective") POLICY-PDP-PAP topic, like we
had originally been doing with the statistics.  With this change,
statistics processing continues to be part of the heartbeat
class/processing, thus a separate class is not required to listen for,
and handle, statistics.

This new subscription uses a shared consumer group so that only one PAP
will process any given heartbeat message, which should reduce the
likelihood of DB contention and duplicate keys.

This also means that the "saveStatisticsInDb" flag will still be used,
which is a more obvious mechanism for controlling the storing of
statistics than the presence/absence of a topic in the config file.

Issue-ID: POLICY-3460
Change-Id: Ia07132b1c7aef006af86fddbe677fb1243a4e2c3
Signed-off-by: Jim Hahn <jrh3@att.com>
main/src/main/java/org/onap/policy/pap/main/PapConstants.java
main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
main/src/test/resources/e2e/PapConfigParameters.json
main/src/test/resources/parameters/PapConfigParameters.json
main/src/test/resources/parameters/PapConfigParametersStd.json
packages/policy-pap-tarball/src/main/resources/etc/defaultConfig.json
testsuites/stability/src/main/resources/papsetup/config/pap/etc/defaultConfig.json

index 7d4cb7b..f38a82e 100644 (file)
@@ -25,7 +25,7 @@ import org.onap.policy.common.utils.network.NetworkUtil;
 /**
  * Names of various items contained in the Registry.
  */
-public class PapConstants {
+public final class PapConstants {
 
     // Registry keys
     public static final String REG_PAP_ACTIVATOR = "object:activator/pap";
@@ -38,6 +38,7 @@ public class PapConstants {
     // topic names
     public static final String TOPIC_POLICY_PDP_PAP = "POLICY-PDP-PAP";
     public static final String TOPIC_POLICY_NOTIFICATION = "POLICY-NOTIFICATION";
+    public static final String TOPIC_POLICY_HEARTBEAT = "POLICY-HEARTBEAT";
 
     // policy components names
     public static final String POLICY_API = "api";
index 617275b..3b08940 100644 (file)
 
 package org.onap.policy.pap.main.startstop;
 
-import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
 import org.onap.policy.common.endpoints.http.server.RestServer;
@@ -84,15 +85,17 @@ public class PapActivator extends ServiceManagerContainer {
 
     /**
      * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then dispatches them to
-     * {@link #reqIdDispatcher}.
+     * {@link #responseReqIdDispatcher}.
      */
-    private final MessageTypeDispatcher msgDispatcher;
+    private final MessageTypeDispatcher responseMsgDispatcher;
+    private final MessageTypeDispatcher heartbeatMsgDispatcher;
 
     /**
      * Listens for {@link PdpStatus} messages and then routes them to the listener associated with the ID of the
      * originating request.
      */
-    private final RequestIdDispatcher<PdpStatus> reqIdDispatcher;
+    private final RequestIdDispatcher<PdpStatus> responseReqIdDispatcher;
+    private final RequestIdDispatcher<PdpStatus> heartbeatReqIdDispatcher;
 
     /**
      * Listener for anonymous {@link PdpStatus} messages either for registration or heartbeat.
@@ -111,8 +114,10 @@ public class PapActivator extends ServiceManagerContainer {
 
         try {
             this.papParameterGroup = papParameterGroup;
-            this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
-            this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
+            this.responseMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+            this.heartbeatMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+            this.responseReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
+            this.heartbeatReqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
             this.pdpHeartbeatListener = new PdpHeartbeatListener(papParameterGroup.getPdpParameters(),
                             papParameterGroup.isSavePdpStatisticsInDb());
 
@@ -149,16 +154,24 @@ public class PapActivator extends ServiceManagerContainer {
             () -> Registry.unregister(PapConstants.REG_PAP_DAO_FACTORY));
 
         addAction("Pdp Heartbeat Listener",
-            () -> reqIdDispatcher.register(pdpHeartbeatListener),
-            () -> reqIdDispatcher.unregister(pdpHeartbeatListener));
+            () -> heartbeatReqIdDispatcher.register(pdpHeartbeatListener),
+            () -> heartbeatReqIdDispatcher.unregister(pdpHeartbeatListener));
 
-        addAction("Request ID Dispatcher",
-            () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher),
-            () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));
+        addAction("Response Request ID Dispatcher",
+            () -> responseMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.responseReqIdDispatcher),
+            () -> responseMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));
 
-        addAction("Message Dispatcher",
-            this::registerMsgDispatcher,
-            this::unregisterMsgDispatcher);
+        addAction("Heartbeat Request ID Dispatcher",
+            () -> heartbeatMsgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.heartbeatReqIdDispatcher),
+            () -> heartbeatMsgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));
+
+        addAction("Response Message Dispatcher",
+            () -> registerMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP),
+            () -> unregisterMsgDispatcher(responseMsgDispatcher, PapConstants.TOPIC_POLICY_PDP_PAP));
+
+        addAction("Heartbeat Message Dispatcher",
+            () -> registerMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT),
+            () -> unregisterMsgDispatcher(heartbeatMsgDispatcher, PapConstants.TOPIC_POLICY_HEARTBEAT));
 
         addAction("topics",
             TopicEndpointManager.getManager()::start,
@@ -215,7 +228,7 @@ public class PapActivator extends ServiceManagerContainer {
                                     .params(pdpParams)
                                     .policyNotifier(notifier.get())
                                     .pdpPublisher(pdpPub.get())
-                                    .responseDispatcher(reqIdDispatcher)
+                                    .responseDispatcher(responseReqIdDispatcher)
                                     .stateChangeTimers(pdpStChgTimers.get())
                                     .updateTimers(pdpUpdTimers.get())
                                     .savePdpStatistics(papParameterGroup.isSavePdpStatisticsInDb())
@@ -288,21 +301,23 @@ public class PapActivator extends ServiceManagerContainer {
 
     /**
      * Registers the dispatcher with the topic source(s).
+     * @param dispatcher dispatcher to register
+     * @param topic topic of interest
      */
-    private void registerMsgDispatcher() {
-        for (final TopicSource source : TopicEndpointManager.getManager()
-                .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) {
-            source.register(msgDispatcher);
+    private void registerMsgDispatcher(TopicListener dispatcher, String topic) {
+        for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) {
+            source.register(dispatcher);
         }
     }
 
     /**
      * Unregisters the dispatcher from the topic source(s).
+     * @param dispatcher dispatcher to unregister
+     * @param topic topic of interest
      */
-    private void unregisterMsgDispatcher() {
-        for (final TopicSource source : TopicEndpointManager.getManager()
-                .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) {
-            source.unregister(msgDispatcher);
+    private void unregisterMsgDispatcher(TopicListener dispatcher, String topic) {
+        for (final TopicSource source : TopicEndpointManager.getManager().getTopicSources(List.of(topic))) {
+            source.unregister(dispatcher);
         }
     }
 }
index 610ded8..b10f580 100644 (file)
             "topic" : "POLICY-PDP-PAP",
             "servers" : [ "message-router" ],
             "topicCommInfrastructure" : "noop"
+        },
+        {
+            "topic" : "POLICY-HEARTBEAT",
+            "effectiveTopic": "POLICY-PDP-PAP",
+            "consumerGroup": "policy-pap",
+            "servers" : [ "message-router" ],
+            "topicCommInfrastructure" : "noop"
         }],
         "topicSinks" : [{
             "topic" : "POLICY-PDP-PAP",
index 452bc9a..09adfce 100644 (file)
             "topic" : "POLICY-PDP-PAP",
             "servers" : [ "message-router" ],
             "topicCommInfrastructure" : "noop"
+        },
+        {
+            "topic" : "POLICY-HEARTBEAT",
+            "effectiveTopic": "POLICY-PDP-PAP",
+            "consumerGroup": "policy-pap",
+            "servers" : [ "message-router" ],
+            "topicCommInfrastructure" : "noop"
         }],
         "topicSinks" : [{
             "topic" : "POLICY-PDP-PAP",
index 0868aa4..0f88143 100644 (file)
             "topic" : "POLICY-PDP-PAP",
             "servers" : [ "message-router" ],
             "topicCommInfrastructure" : "noop"
+        },
+        {
+            "topic" : "POLICY-HEARTBEAT",
+            "effectiveTopic": "POLICY-PDP-PAP",
+            "consumerGroup": "policy-pap",
+            "servers" : [ "message-router" ],
+            "topicCommInfrastructure" : "noop"
         }],
         "topicSinks" : [{
             "topic" : "POLICY-PDP-PAP",
index 2f37cb6..94315bb 100644 (file)
             "servers" : [ "message-router" ],
             "topicCommInfrastructure" : "dmaap",
             "fetchTimeout": 15000
+        },
+        {
+            "topic" : "POLICY-HEARTBEAT",
+            "effectiveTopic": "POLICY-PDP-PAP",
+            "consumerGroup": "policy-pap",
+            "servers" : [ "message-router" ],
+            "topicCommInfrastructure" : "dmaap",
+            "fetchTimeout": 15000
         }],
         "topicSinks" : [{
             "topic" : "POLICY-PDP-PAP",
index 4af2f5d..b98cdef 100644 (file)
             "servers" : [ "10.2.0.41:3904" ],
             "topicCommInfrastructure" : "dmaap",
             "fetchTimeout": 15000
+        },
+        {
+            "topic" : "PDP-HEARTBEAT",
+            "effectiveTopic": "POLICY-PDP-PAP",
+            "consumerGroup": "policy-pap",
+            "servers" : [ "10.2.0.41:3904" ],
+            "topicCommInfrastructure" : "dmaap",
+            "fetchTimeout": 15000
         }],
         "topicSinks" : [{
             "topic" : "POLICY-PDP-PAP",
@@ -46,4 +54,4 @@
             "topicCommInfrastructure" : "dmaap"
         }]
     }
-}
\ No newline at end of file
+}