Wait for pdp-pap topic in xacml-pdp
[policy/xacml-pdp.git] / main / src / test / java / org / onap / policy / pdpx / main / comm / XacmlPdpHearbeatPublisherTest.java
index 7f90211..5168958 100644 (file)
@@ -42,7 +42,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.models.pdp.concepts.PdpStatus;
 import org.onap.policy.pdpx.main.XacmlState;
 
@@ -54,7 +56,10 @@ public class XacmlPdpHearbeatPublisherTest {
     private static final long INTERVAL_INVALID = 0;
 
     @Mock
-    private TopicSinkClient client;
+    private TopicSink sink;
+
+    @Mock
+    private BidirectionalTopicClient checker;
 
     @Mock
     private XacmlState state;
@@ -68,7 +73,6 @@ public class XacmlPdpHearbeatPublisherTest {
     @Mock
     private ScheduledFuture<?> timer2;
 
-    @Mock
     private PdpStatus status;
 
     private Queue<ScheduledFuture<?>> timers;
@@ -81,13 +85,17 @@ public class XacmlPdpHearbeatPublisherTest {
      */
     @Before
     public void setUp() {
+        when(sink.getTopic()).thenReturn("my-topic");
+        when(checker.getSink()).thenReturn(sink);
+        when(checker.isReady()).thenReturn(true);
         when(state.genHeartbeat()).thenReturn(status);
 
+        status = new PdpStatus();
         timers = new LinkedList<>(Arrays.asList(timer1, timer2));
 
         when(executor.scheduleWithFixedDelay(any(), anyLong(), anyLong(), any())).thenAnswer(args -> timers.remove());
 
-        publisher = new MyPublisher(client, state);
+        publisher = new MyPublisher(checker, 10, state);
     }
 
     @Test
@@ -95,7 +103,75 @@ public class XacmlPdpHearbeatPublisherTest {
         publisher.run();
 
         verify(state).genHeartbeat();
-        verify(client).send(status);
+        verify(checker).send(any());
+    }
+
+    /**
+     * Tests the run() method when the probe is disabled.
+     */
+    @Test
+    public void testRunNoProbe() throws CoderException {
+        publisher = new MyPublisher(checker, 0, state);
+
+        publisher.run();
+
+        verify(checker, never()).isReady();
+        verify(checker, never()).awaitReady(any(), anyLong());
+
+        verify(state).genHeartbeat();
+        verify(checker).send(any());
+    }
+
+    /**
+     * Tests the run() method when the topic is not ready, and then becomes ready.
+     */
+    @Test
+    public void testRunNotReady() throws CoderException {
+        // not ready yet
+        when(checker.isReady()).thenReturn(false);
+        when(checker.awaitReady(any(), anyLong())).thenReturn(false);
+
+        publisher.run();
+        verify(state, never()).genHeartbeat();
+        verify(checker, never()).send(any());
+
+        // isReady is still false, but awaitReady is now true - should generate heartbeat
+        when(checker.awaitReady(any(), anyLong())).thenReturn(true);
+
+        publisher.run();
+        verify(state).genHeartbeat();
+        verify(checker).send(any());
+
+        // now isReady is true, too - should not rerun awaitReady
+        when(checker.isReady()).thenReturn(true);
+
+        publisher.run();
+        verify(state, times(2)).genHeartbeat();
+        verify(checker, times(2)).send(any());
+        verify(checker, times(2)).awaitReady(any(), anyLong());
+    }
+
+    /**
+     * Tests the run() method when the checker throws an exception.
+     */
+    @Test
+    public void testRunCheckerEx() throws CoderException {
+        // force it to call awaitReady
+        when(checker.isReady()).thenReturn(false);
+
+        when(checker.awaitReady(any(), anyLong()))
+            .thenThrow(new CoderException("expected exception"))
+            .thenReturn(true);
+
+        // exception thrown - should not generate heartbeat
+        publisher.run();
+        verify(state, never()).genHeartbeat();
+        verify(checker, never()).send(any());
+
+        // no exception this time - SHOULD generate heartbeat
+        publisher.run();
+        verify(state).genHeartbeat();
+        verify(checker).send(any());
     }
 
     @Test
@@ -103,6 +179,8 @@ public class XacmlPdpHearbeatPublisherTest {
         // not yet started
         publisher.terminate();
 
+        verify(checker).stopWaiting();
+
 
         // now start it and then try again
         publisher.start();
@@ -156,7 +234,7 @@ public class XacmlPdpHearbeatPublisherTest {
     public void testStart() {
         publisher.start();
 
-        verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_INTERVAL_MS,
+        verify(executor).scheduleWithFixedDelay(publisher, 0, XacmlPdpHearbeatPublisher.DEFAULT_HB_INTERVAL_MS,
                         TimeUnit.MILLISECONDS);
 
         // repeat - nothing more should happen
@@ -168,7 +246,7 @@ public class XacmlPdpHearbeatPublisherTest {
     @Test
     public void testMakeTimerThread() {
         // create a plain listener to test the "real" makeTimer() method
-        publisher = new XacmlPdpHearbeatPublisher(client, state);
+        publisher = new XacmlPdpHearbeatPublisher(checker, 1, state);
 
         assertThatCode(() -> {
             publisher.start();
@@ -179,8 +257,8 @@ public class XacmlPdpHearbeatPublisherTest {
 
     private class MyPublisher extends XacmlPdpHearbeatPublisher {
 
-        public MyPublisher(TopicSinkClient topicSinkClient, XacmlState state) {
-            super(topicSinkClient, state);
+        public MyPublisher(BidirectionalTopicClient topicChecker, long probeHeartbeatTopicMs, XacmlState state) {
+            super(topicChecker, probeHeartbeatTopicMs, state);
         }
 
         @Override