Add topic checker 62/123362/4
authorJim Hahn <jrh3@att.com>
Wed, 18 Aug 2021 14:46:28 +0000 (10:46 -0400)
committerJim Hahn <jrh3@att.com>
Wed, 18 Aug 2021 20:20:40 +0000 (16:20 -0400)
Added a method to check the readiness of a bidirectional topic.

Issue-ID: POLICY-3531
Change-Id: I2fefae7ba1ea5ed9ed33140717d05828e6dec94d
Signed-off-by: Jim Hahn <jrh3@att.com>
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java

index 57a331c..2a9f144 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,6 +22,9 @@ package org.onap.policy.common.endpoints.event.comm.client;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -29,6 +32,11 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
@@ -36,6 +44,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
  */
 @Getter
 public class BidirectionalTopicClient {
+    private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class);
+    private static final Coder coder = new StandardCoder();
+
     private final String sinkTopic;
     private final String sourceTopic;
     private final TopicSink sink;
@@ -43,6 +54,16 @@ public class BidirectionalTopicClient {
     private final CommInfrastructure sinkTopicCommInfrastructure;
     private final CommInfrastructure sourceTopicCommInfrastructure;
 
+    /**
+     * Used when checking whether or not a message sent on the sink topic can be received
+     * on the source topic. When a matching message is received on the incoming topic,
+     * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting
+     * thread is interrupted, then {@code false} is placed on the queue. Whenever a value
+     * is pulled from the queue, it is immediately placed back on the queue.
+     */
+    private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>();
+
+
     /**
      * Constructs the object.
      *
@@ -94,9 +115,103 @@ public class BidirectionalTopicClient {
         source.unregister(topicListener);
     }
 
+    /**
+     * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has
+     * previously returned {@code true}).
+     *
+     * @return {@code true}, if the topic is ready to send and receive
+     */
+    public boolean isReady() {
+        return Boolean.TRUE.equals(checkerQueue.peek());
+    }
+
+    /**
+     * Waits for the bidirectional topic to become "ready" by publishing a message on the
+     * sink topic and awaiting receipt of the message on the source topic. If the message
+     * is not received within a few seconds, then it tries again. This process is
+     * continued until the message is received, {@link #stop()} is called, or this thread
+     * is interrupted. Once this returns, subsequent calls will return immediately, always
+     * with the same value.
+     *
+     * @param message message to be sent to the sink topic. Note: the equals() method must
+     *        return {@code true} if and only if two messages are the same
+     * @param waitMs time to wait, in milliseconds, before re-sending the message
+     * @return {@code true} if the message was received from the source topic,
+     *         {@code false} if this method was stopped or interrupted before receipt of
+     *         the message
+     * @throws CoderException if the message cannot be encoded
+     */
+    public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException {
+        // see if we already know the answer
+        if (!checkerQueue.isEmpty()) {
+            return checkerQueue.peek();
+        }
+
+        final String messageText = coder.encode(message);
+
+        // class of message to be decoded
+        @SuppressWarnings("unchecked")
+        final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
+
+        // create a listener to detect when a matching message is received
+        final TopicListener listener = (infra, topic, msg) -> {
+            try {
+                T incoming = decode(msg, clazz);
+
+                if (message.equals(incoming)) {
+                    logger.info("topic {} is ready; found matching message {}", topic, incoming);
+                    checkerQueue.add(Boolean.TRUE);
+                }
+
+            } catch (CoderException e) {
+                logger.warn("cannot decode message from topic {}", topic, e);
+                decodeFailed();
+            }
+        };
+
+        source.register(listener);
+
+        // loop until the message is received
+        try {
+            Boolean result;
+            do {
+                send(messageText);
+            } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null);
+
+            // put it back on the queue
+            checkerQueue.add(result);
+
+        } catch (InterruptedException e) {
+            logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e);
+            Thread.currentThread().interrupt();
+            checkerQueue.add(Boolean.FALSE);
+
+        } finally {
+            source.unregister(listener);
+        }
+
+        return checkerQueue.peek();
+    }
+
+    /**
+     * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
+     * adding {@code false} to the queue.
+     */
+    public void stopWaiting() {
+        checkerQueue.add(Boolean.FALSE);
+    }
+
     // these may be overridden by junit tests
 
     protected TopicEndpoint getTopicEndpointManager() {
         return TopicEndpointManager.getManager();
     }
+
+    protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
+        return coder.decode(msg, clazz);
+    }
+
+    protected void decodeFailed() {
+        // already logged - nothing else to do
+    }
 }
index cfb65e1..2605c14 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.policy.common.endpoints.event.comm.client;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -28,16 +29,24 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
@@ -46,11 +55,18 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
 
 @RunWith(MockitoJUnitRunner.class)
 public class BidirectionalTopicClientTest {
+    private static final Coder coder = new StandardCoder();
+    private static final long MAX_WAIT_MS = 5000;
+    private static final long SHORT_WAIT_MS = 1;
     private static final String SINK_TOPIC = "my-sink-topic";
     private static final String SOURCE_TOPIC = "my-source-topic";
+    private static final String MY_TEXT = "my-text";
 
     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB;
     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
@@ -64,7 +80,10 @@ public class BidirectionalTopicClientTest {
     @Mock
     private TopicListener listener;
 
+    private MyMessage theMessage;
+
     private BidirectionalTopicClient client;
+    private Context context;
 
     /**
      * Configures the endpoints.
@@ -104,7 +123,16 @@ public class BidirectionalTopicClientTest {
         when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
         when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
 
+        theMessage = new MyMessage(MY_TEXT);
+
         client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
+
+        context = new Context();
+    }
+
+    @After
+    public void tearDown() {
+        context.stop();
     }
 
     @Test
@@ -169,6 +197,151 @@ public class BidirectionalTopicClientTest {
         assertNotSame(source, client.getSource());
     }
 
+    @Test
+    public void testAwaitReceipt() throws Exception {
+        context.start(theMessage);
+        assertThat(context.awaitSend(1)).isTrue();
+
+        verify(source).register(any());
+        verify(sink, atLeast(1)).send(any());
+        assertThat(context.checker.isReady()).isFalse();
+
+        inject(theMessage);
+
+        verifyReceipt();
+    }
+
+    @Test
+    public void testAwaitReceipt_AlreadyDone() throws Exception {
+        context.start(theMessage);
+        assertThat(context.awaitSend(1)).isTrue();
+
+        inject(theMessage);
+
+        verifyReceipt();
+
+        // calling again should result in "true" again, without injecting message
+        context.start(theMessage);
+        verifyReceipt();
+    }
+
+    @Test
+    public void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
+        context.start(theMessage);
+        assertThat(context.awaitSend(1)).isTrue();
+
+        // non-matching message
+        inject("{}");
+
+        // wait for a few more calls to "send" and then inject a matching message
+        assertThat(context.awaitSend(3)).isTrue();
+        inject(theMessage);
+
+        verifyReceipt();
+    }
+
+    @Test
+    public void testAwaitReceipt_DecodeFails() throws Exception {
+        context.start(theMessage);
+        assertThat(context.awaitSend(1)).isTrue();
+
+        // force a failure and inject the message
+        context.forceDecodeFailure = true;
+        inject(theMessage);
+
+        assertThat(context.awaitDecodeFailure()).isTrue();
+
+        // no more failures
+        context.forceDecodeFailure = false;
+        inject(theMessage);
+
+        verifyReceipt();
+    }
+
+    @Test
+    public void testAwaitReceipt_Interrupted() throws InterruptedException {
+        context.start(theMessage);
+        assertThat(context.awaitSend(1)).isTrue();
+
+        context.interrupt();
+
+        verifyNoReceipt();
+    }
+
+    @Test
+    public void testAwaitReceipt_MultipleLoops() throws Exception {
+        context.start(theMessage);
+
+        // wait for multiple "send" calls
+        assertThat(context.awaitSend(3)).isTrue();
+
+        inject(theMessage);
+
+        verifyReceipt();
+    }
+
+    @Test
+    public void testStop() throws InterruptedException {
+        context.start(theMessage);
+        assertThat(context.awaitSend(1)).isTrue();
+
+        context.stop();
+
+        verifyNoReceipt();
+    }
+
+    /**
+     * Verifies that awaitReceipt() returns {@code true}.
+     *
+     * @throws InterruptedException if interrupted while waiting for the thread to
+     *         terminate
+     */
+    private void verifyReceipt() throws InterruptedException {
+        assertThat(context.join()).isTrue();
+        assertThat(context.result).isTrue();
+        assertThat(context.exception).isNull();
+        assertThat(context.checker.isReady()).isTrue();
+
+        verify(source).unregister(any());
+    }
+
+    /**
+     * Verifies that awaitReceipt() returns {@code false}.
+     *
+     * @throws InterruptedException if interrupted while waiting for the thread to
+     *         terminate
+     */
+    private void verifyNoReceipt() throws InterruptedException {
+        assertThat(context.join()).isTrue();
+        assertThat(context.result).isFalse();
+        assertThat(context.exception).isNull();
+        assertThat(context.checker.isReady()).isFalse();
+
+        verify(source).unregister(any());
+    }
+
+    /**
+     * Injects a message into the source topic.
+     *
+     * @param message message to be injected
+     * @throws CoderException if the message cannot be encoded
+     */
+    private void inject(MyMessage message) throws CoderException {
+        inject(coder.encode(message));
+    }
+
+    /**
+     * Injects a message into the source topic.
+     *
+     * @param message message to be injected
+     */
+    private void inject(String message) {
+        ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
+        verify(source).register(cap.capture());
+
+        cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
+    }
+
 
     /**
      * BidirectionalTopicClient with some overrides.
@@ -185,4 +358,95 @@ public class BidirectionalTopicClientTest {
             return endpoint;
         }
     }
+
+    private class Context {
+        private Thread thread;
+        private boolean result;
+        private Exception exception;
+        private boolean forceDecodeFailure;
+
+        // released every time the checker publishes a message
+        private final Semaphore sendSem = new Semaphore(0);
+
+        // released every time a message-decode fails
+        private final Semaphore decodeFailedSem = new Semaphore(0);
+
+        private final BidirectionalTopicClient2 checker;
+
+        public Context() throws BidirectionalTopicClientException {
+
+            checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
+
+                @Override
+                public boolean send(String messageText) {
+                    boolean result = super.send(messageText);
+                    sendSem.release();
+                    return result;
+                }
+
+                @Override
+                protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
+                    if (forceDecodeFailure) {
+                        throw new CoderException("expected exception");
+                    }
+
+                    return super.decode(msg, clazz);
+                }
+
+                @Override
+                protected void decodeFailed() {
+                    super.decodeFailed();
+                    decodeFailedSem.release();
+                }
+            };
+        }
+
+        /**
+         * Starts the thread.
+         *
+         * @param message message to be sent to the sink topic
+         */
+        public void start(MyMessage message) {
+            thread = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        result = checker.awaitReady(message, SHORT_WAIT_MS);
+                    } catch (Exception e) {
+                        exception = e;
+                    }
+                }
+            };
+            thread.setDaemon(true);
+            thread.start();
+        }
+
+        public void stop() {
+            checker.stopWaiting();
+        }
+
+        public boolean join() throws InterruptedException {
+            thread.join(MAX_WAIT_MS);
+            return !thread.isAlive();
+        }
+
+        public void interrupt() {
+            thread.interrupt();
+        }
+
+        public boolean awaitSend(int npermits) throws InterruptedException {
+            return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+        }
+
+        public boolean awaitDecodeFailure() throws InterruptedException {
+            return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Data
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class MyMessage {
+        private String text;
+    }
 }