Wait for pdp-pap topic in xacml-pdp 01/123401/1
authorJim Hahn <jrh3@att.com>
Thu, 19 Aug 2021 13:43:49 +0000 (09:43 -0400)
committerJim Hahn <jrh3@att.com>
Thu, 19 Aug 2021 21:21:06 +0000 (17:21 -0400)
Added code to use the new topic-checker functionality to verify
communication on the PDP-PAP topic before sending the first heartbeat.
This functionality can be disabled by setting probeHeartbeatTopicMs to
0 in the xacml config file.

Issue-ID: POLICY-3531
Change-Id: I113792c67aa26fbb188767d3e973b21dbe04f570
Signed-off-by: Jim Hahn <jrh3@att.com>
main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
main/src/main/java/org/onap/policy/pdpx/main/parameters/XacmlPdpParameterGroup.java
main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
main/src/test/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisherTest.java
main/src/test/java/org/onap/policy/pdpx/main/startstop/TestXacmlPdpActivator.java

index 3177c09..8c1f792 100644 (file)
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,18 +25,24 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Getter;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpTopicCheck;
 import org.onap.policy.pdpx.main.XacmlState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class XacmlPdpHearbeatPublisher implements Runnable {
-    public static final int DEFAULT_INTERVAL_MS = 60000;
+    public static final int DEFAULT_HB_INTERVAL_MS = 60000;
 
     private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpHearbeatPublisher.class);
+    private static final Coder CODER = new StandardCoder();
 
-    private final TopicSinkClient topicSinkClient;
+    private final BidirectionalTopicClient topicChecker;
+    private final long probeHeartbeatTopicMs;
 
     /**
      * Tracks the state of this PDP.
@@ -47,7 +53,7 @@ public class XacmlPdpHearbeatPublisher implements Runnable {
      * Current timer interval, in milliseconds.
      */
     @Getter
-    private long intervalMs = DEFAULT_INTERVAL_MS;
+    private long intervalMs = DEFAULT_HB_INTERVAL_MS;
 
     private ScheduledExecutorService timerThread;
 
@@ -57,26 +63,53 @@ public class XacmlPdpHearbeatPublisher implements Runnable {
     /**
      * Constructor for instantiating XacmlPdpPublisher.
      *
-     * @param topicSinkClient used to send heart beat message
+     * @param topicChecker used to check the topic before sending heart beat message
+     * @param probeHeartbeatTopicMs frequency, in milliseconds, with which to probe the
+     *        heartbeat topic before sending the first heartbeat. Zero disables probing
      * @param state tracks the state of this PDP
      */
-    public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlState state) {
-        this.topicSinkClient = topicSinkClient;
+    public XacmlPdpHearbeatPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs,
+                    XacmlState state) {
+        LOGGER.info("heartbeat topic probe {}ms", probeHeartbeatTopicMs);
+        this.topicChecker = topicChecker;
+        this.probeHeartbeatTopicMs = probeHeartbeatTopicMs;
         this.currentState = state;
     }
 
     @Override
     public void run() {
-        PdpStatus message = currentState.genHeartbeat();
-        LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message);
+        try {
+            if (!isTopicReady()) {
+                return;
+            }
+
+            PdpStatus message = currentState.genHeartbeat();
+            LOGGER.info("Sending Xacml PDP heartbeat to the PAP - {}", message);
+
+            String json = CODER.encode(message);
+            topicChecker.send(json);
 
-        topicSinkClient.send(message);
+        } catch (RuntimeException | CoderException e) {
+            LOGGER.warn("send to {} failed because of {}", topicChecker.getSink().getTopic(), e.getMessage(), e);
+        }
+    }
+
+    private boolean isTopicReady() throws CoderException {
+        if (probeHeartbeatTopicMs <= 0 || topicChecker.isReady()) {
+            return true;
+        }
+
+        var check = new PdpTopicCheck();
+        check.setName(XacmlState.PDP_NAME);
+        return topicChecker.awaitReady(check, probeHeartbeatTopicMs);
     }
 
     /**
      * Method to terminate the heart beat.
      */
     public synchronized void terminate() {
+        topicChecker.stopWaiting();
+
         if (timerThread != null) {
             timerThread.shutdownNow();
             timerThread = null;
index b994fe9..a9332e9 100644 (file)
@@ -22,6 +22,7 @@
 package org.onap.policy.pdpx.main.parameters;
 
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 import org.onap.policy.common.endpoints.parameters.RestClientParameters;
 import org.onap.policy.common.endpoints.parameters.RestServerParameters;
@@ -29,6 +30,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
 import org.onap.policy.common.parameters.BeanValidationResult;
 import org.onap.policy.common.parameters.ParameterGroupImpl;
 import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.parameters.annotations.Min;
 import org.onap.policy.common.parameters.annotations.NotBlank;
 import org.onap.policy.common.parameters.annotations.NotNull;
 import org.onap.policy.common.parameters.annotations.Valid;
@@ -41,6 +43,7 @@ import org.onap.policy.models.base.Validated;
 @Getter
 @NotNull
 @NotBlank
+@NoArgsConstructor
 public class XacmlPdpParameterGroup extends ParameterGroupImpl {
     public static final String PARAM_POLICY_API = "policyApiParameters";
 
@@ -54,6 +57,13 @@ public class XacmlPdpParameterGroup extends ParameterGroupImpl {
     private TopicParameterGroup topicParameterGroup;
     @Valid
     private XacmlApplicationParameters applicationParameters;
+    /**
+     * Frequency, in seconds, with which to probe the heartbeat topic before sending the
+     * first heartbeat. Set to zero to disable probing.
+     */
+    @Min(0)
+    private long probeHeartbeatTopicSec = 4;
+
 
     /**
      * Create the xacml pdp parameter group.
index 4dd8a9b..050d8b2 100644 (file)
 
 package org.onap.policy.pdpx.main.startstop;
 
-import java.util.Arrays;
 import lombok.Getter;
 import lombok.Setter;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
 import org.onap.policy.common.endpoints.http.client.HttpClient;
 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
@@ -69,6 +68,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
     // The parameters of this policy xacml pdp activator
     private final XacmlPdpParameterGroup xacmlPdpParameterGroup;
 
+    /**
+     * POLICY-PDP-PAP client.
+     */
+    private BidirectionalTopicClient topicClient;
+
     /**
      * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then
      * dispatches them to appropriate listener.
@@ -108,8 +112,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
             this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
             this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
 
-            sinkClient = new TopicSinkClient(TOPIC);
-            heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state);
+            topicClient = new BidirectionalTopicClient(TOPIC, TOPIC);
+            sinkClient = new TopicSinkClient(topicClient.getSink());
+
+            heartbeat = new XacmlPdpHearbeatPublisher(topicClient,
+                            xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state);
 
             /*
              * since the dispatcher isn't registered with the topic yet, we can go ahead
@@ -123,7 +130,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
             restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(),
                     XacmlPdpAafFilter.class, XacmlPdpRestController.class);
 
-        } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) {
+        } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) {
             throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
         }
 
@@ -197,18 +204,14 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
      * Registers the dispatcher with the topic source(s).
      */
     private void registerMsgDispatcher() {
-        for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
-            source.register(msgDispatcher);
-        }
+        topicClient.getSource().register(msgDispatcher);
     }
 
     /**
      * Unregisters the dispatcher from the topic source(s).
      */
     private void unregisterMsgDispatcher() {
-        for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
-            source.unregister(msgDispatcher);
-        }
+        topicClient.getSource().unregister(msgDispatcher);
     }
 
     /**
index 7f90211..5168958 100644 (file)
@@ -42,7 +42,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.models.pdp.concepts.PdpStatus;
 import org.onap.policy.pdpx.main.XacmlState;
 
@@ -54,7 +56,10 @@ public class XacmlPdpHearbeatPublisherTest {
     private static final long INTERVAL_INVALID = 0;
 
     @Mock
-    private TopicSinkClient client;
+    private TopicSink sink;
+
+    @Mock
+    private BidirectionalTopicClient checker;
 
     @Mock
     private XacmlState state;
@@ -68,7 +73,6 @@ public class XacmlPdpHearbeatPublisherTest {
     @Mock
     private ScheduledFuture<?> timer2;
 
-    @Mock
     private PdpStatus status;
 
     private Queue<ScheduledFuture<?>> timers;
@@ -81,13 +85,17 @@ public class XacmlPdpHearbeatPublisherTest {
      */
     @Before
     public void setUp() {
+        when(sink.getTopic()).thenReturn("my-topic");
+        when(checker.getSink()).thenReturn(sink);
+        when(checker.isReady()).thenReturn(true);
         when(state.genHeartbeat()).thenReturn(status);
 
+        status = new PdpStatus();
         timers = new LinkedList<>(Arrays.asList(timer1, timer2));
 
         when(executor.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(args -> timers.remove());
 
-        publisher = new MyPublisher(client, state);
+        publisher = new MyPublisher(checker, 10, state);
     }
 
     @Test
@@ -95,7 +103,75 @@ public class XacmlPdpHearbeatPublisherTest {
         publisher.run();
 
         verify(state).genHeartbeat();
-        verify(client).send(status);
+        verify(checker).send(any());
+    }
+
+    /**
+     * Tests the run() method when the probe is disabled.
+     */
+    @Test
+    public void testRunNoProbe() throws CoderException {
+        publisher = new MyPublisher(checker, 0, state);
+
+        publisher.run();
+
+        verify(checker, never()).isReady();
+        verify(checker, never()).awaitReady(any(), anyLong());
+
+        verify(state).genHeartbeat();
+        verify(checker).send(any());
+    }
+
+    /**
+     * Tests the run() method when the topic is not ready, and then becomes ready.
+     */
+    @Test
+    public void testRunNotReady() throws CoderException {
+        // not ready yet
+        when(checker.isReady()).thenReturn(false);
+        when(checker.awaitReady(any(), anyLong())).thenReturn(false);
+
+        publisher.run();
+        verify(state, never()).genHeartbeat();
+        verify(checker, never()).send(any());
+
+        // isReady is still false, but awaitReady is now true - should generate heartbeat
+        when(checker.awaitReady(any(), anyLong())).thenReturn(true);
+
+        publisher.run();
+        verify(state).genHeartbeat();
+        verify(checker).send(any());
+
+        // now isReady is true, too - should not rerun awaitReady
+        when(checker.isReady()).thenReturn(true);
+
+        publisher.run();
+        verify(state, times(2)).genHeartbeat();
+        verify(checker, times(2)).send(any());
+        verify(checker, times(2)).awaitReady(any(), anyLong());
+    }
+
+    /**
+     * Tests the run() method when the checker throws an exception.
+     */
+    @Test
+    public void testRunCheckerEx() throws CoderException {
+        // force it to call awaitReady
+        when(checker.isReady()).thenReturn(false);
+
+        when(checker.awaitReady(any(), anyLong()))
+            .thenThrow(new CoderException("expected exception"))
+            .thenReturn(true);
+
+        // exception thrown - should not generate heartbeat
+        publisher.run();
+        verify(state, never()).genHeartbeat();
+        verify(checker, never()).send(any());
+
+        // no exception this time - SHOULD generate heartbeat
+        publisher.run();
+        verify(state).genHeartbeat();
+        verify(checker).send(any());
     }
 
     @Test
@@ -103,6 +179,8 @@ public class XacmlPdpHearbeatPublisherTest {
         // not yet started
         publisher.terminate();
 
+        verify(checker).stopWaiting();
+
 
         // now start it and then try again
         publisher.start();
@@ -156,7 +234,7 @@ public class XacmlPdpHearbeatPublisherTest {
     public void testStart() {
         publisher.start();
 
-        verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_INTERVAL_MS,
+        verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_HB_INTERVAL_MS,
                         TimeUnit.MILLISECONDS);
 
         // repeat - nothing more should happen
@@ -168,7 +246,7 @@ public class XacmlPdpHearbeatPublisherTest {
     @Test
     public void testMakeTimerThread() {
         // create a plain listener to test the "real" makeTimer() method
-        publisher = new XacmlPdpHearbeatPublisher(client, state);
+        publisher = new XacmlPdpHearbeatPublisher(checker, 1, state);
 
         assertThatCode(() -> {
             publisher.start();
@@ -179,8 +257,8 @@ public class XacmlPdpHearbeatPublisherTest {
 
     private class MyPublisher extends XacmlPdpHearbeatPublisher {
 
-        public MyPublisher(TopicSinkClient topicSinkClient, XacmlState state) {
-            super(topicSinkClient, state);
+        public MyPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, XacmlState state) {
+            super(topicChecker, probeHeartbeatTopicMs, state);
         }
 
         @Override
index 4286ccf..9025722 100644 (file)
@@ -35,6 +35,7 @@ import org.onap.policy.pdpx.main.PolicyXacmlPdpException;
 import org.onap.policy.pdpx.main.parameters.CommonTestData;
 import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterGroup;
 import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterHandler;
+import org.powermock.reflect.Whitebox;
 
 
 /**
@@ -42,6 +43,8 @@ import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterHandler;
  *
  */
 public class TestXacmlPdpActivator extends CommonRest {
+    private static final String PROBE_FIELD_NAME = "probeHeartbeatTopicSec";
+
     private static XacmlPdpParameterGroup parGroup;
 
     private XacmlPdpActivator activator = null;
@@ -67,6 +70,7 @@ public class TestXacmlPdpActivator extends CommonRest {
     @Override
     @Before
     public void setUp() {
+        Whitebox.setInternalState(parGroup, PROBE_FIELD_NAME, 4);
         activator = new XacmlPdpActivator(parGroup);
     }
 
@@ -99,6 +103,14 @@ public class TestXacmlPdpActivator extends CommonRest {
         assertTrue(activator.isXacmlRestControllerAlive());
     }
 
+    @Test
+    public void testXacmlPdpActivator_NoProbe() throws Exception {
+        Whitebox.setInternalState(parGroup, PROBE_FIELD_NAME, 0);
+        activator = new XacmlPdpActivator(parGroup);
+        activator.start();
+        assertTrue(activator.isAlive());
+    }
+
     @Test
     public void testGetCurrent_testSetCurrent() {
         XacmlPdpActivator.setCurrent(activator);