TopicSinkClient support for unmanaged topics 16/82616/1
authorJorge Hernandez <jorge.hernandez-herrero@att.com>
Mon, 18 Mar 2019 20:59:04 +0000 (15:59 -0500)
committerJorge Hernandez <jorge.hernandez-herrero@att.com>
Mon, 18 Mar 2019 20:59:04 +0000 (15:59 -0500)
Change-Id: I6b92dcc0f225aa712b34adb9a1f9ab47df412c81
Issue-ID: POLICY-1608
Signed-off-by: Jorge Hernandez <jorge.hernandez-herrero@att.com>
policy-endpoints/lombok.config [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientTest.java

diff --git a/policy-endpoints/lombok.config b/policy-endpoints/lombok.config
new file mode 100644 (file)
index 0000000..2384843
--- /dev/null
@@ -0,0 +1,3 @@
+config.stopBubbling = true
+lombok.addLombokGeneratedAnnotation = true
+lombok.nonNull.exceptionType = IllegalArgumentException
index e44d1f7..f08a138 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.policy.common.endpoints.event.comm.client;
 
 import java.util.List;
 import lombok.Getter;
+import lombok.NonNull;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.utils.coder.Coder;
@@ -42,15 +43,10 @@ public class TopicSinkClient {
      */
     private static final Coder CODER = new StandardCoder();
 
-    /**
-     * Topic to which messages are published.
-     */
-    @Getter
-    private final String topic;
-
     /**
      * Where messages are published.
      */
+    @Getter
     private final TopicSink sink;
 
     /**
@@ -60,8 +56,6 @@ public class TopicSinkClient {
      * @throws TopicSinkClientException if the topic does not exist
      */
     public TopicSinkClient(final String topic) throws TopicSinkClientException {
-        this.topic = topic;
-
         final List<TopicSink> lst = getTopicSinks(topic);
         if (lst.isEmpty()) {
             throw new TopicSinkClientException("no sinks for topic: " + topic);
@@ -70,6 +64,25 @@ public class TopicSinkClient {
         this.sink = lst.get(0);
     }
 
+    /**
+     * Constructs the client from a sink object.
+     *
+     * @param sink topic sink publisher
+     */
+    public TopicSinkClient(@NonNull TopicSink sink) {
+        this.sink = sink;
+    }
+
+
+    /**
+     * Gets the canonical topic name.
+     *
+     * @return topic name
+     */
+    public String getTopic() {
+        return this.sink.getTopic();
+    }
+
     /**
      * Sends a message to the topic, after encoding the message as json.
      *
@@ -82,7 +95,7 @@ public class TopicSinkClient {
             return sink.send(json);
 
         } catch (RuntimeException | CoderException e) {
-            logger.warn("send to {} failed because of {}", topic, e.getMessage(), e);
+            logger.warn("send to {} failed because of {}", sink.getTopic(), e.getMessage(), e);
             return false;
         }
     }
index 725c041..07630cd 100644 (file)
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -36,18 +35,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 
 public class TopicSinkClientTest {
-    private static final String SINK_FIELD_NAME = "sink";
     private static final String TOPIC = "my-topic";
 
     private TopicSinkClient client;
@@ -67,10 +61,17 @@ public class TopicSinkClientTest {
         sinks = Arrays.asList(sink, null);
 
         client = new TopicSinkClient2(TOPIC);
+
+        Properties props = new Properties();
+        props.setProperty("noop.sink.topics", TOPIC);
+
+        // clear all topics and then configure one topic
+        TopicEndpoint.manager.shutdown();
+        TopicEndpoint.manager.addTopicSinks(props);
     }
 
     @AfterClass
-    public static void tearDown() throws Exception {
+    public static void tearDown() {
         // clear all topics after the tests
         TopicEndpoint.manager.shutdown();
     }
@@ -80,25 +81,13 @@ public class TopicSinkClientTest {
      */
     @Test
     public void testGetTopicSinks() throws Exception {
-        // clear all topics and then configure one topic
-        TopicEndpoint.manager.shutdown();
-
-        final Properties props = new Properties();
-        props.setProperty("noop.sink.topics", TOPIC);
-        TopicEndpoint.manager.addTopicSinks(props);
 
         sink = TopicEndpoint.manager.getNoopTopicSink(TOPIC);
         assertNotNull(sink);
 
         final AtomicReference<String> evref = new AtomicReference<>(null);
 
-        sink.register(new TopicListener() {
-            @Override
-            public void onTopicEvent(final CommInfrastructure infra, final String topic, final String event) {
-                evref.set(event);
-            }
-        });
-
+        sink.register((infra, topic, event) -> evref.set(event));
         sink.start();
 
         client = new TopicSinkClient(TOPIC);
@@ -108,10 +97,7 @@ public class TopicSinkClientTest {
     }
 
     @Test
-    public void testTopicSinkClient_testGetTopic() {
-        assertEquals(TOPIC, client.getTopic());
-        assertSame(sink, Whitebox.getInternalState(client, SINK_FIELD_NAME));
-
+    public void testTopicSinkClient() {
         // unknown topic -> should throw exception
         sinks = new LinkedList<>();
         assertThatThrownBy(() -> new TopicSinkClient2(TOPIC)).isInstanceOf(TopicSinkClientException.class)
@@ -119,7 +105,17 @@ public class TopicSinkClientTest {
     }
 
     @Test
-    public void testSend() throws Exception {
+    public void testTopicSinkClient_GetTopic() throws TopicSinkClientException {
+        assertEquals(TOPIC, new TopicSinkClient(TopicEndpoint.manager.getNoopTopicSink(TOPIC)).getTopic());
+        assertEquals(TOPIC, new TopicSinkClient(TOPIC).getTopic());
+
+        assertThatThrownBy(() -> new TopicSinkClient((TopicSink) null)).isInstanceOf(IllegalArgumentException.class);
+        assertThatThrownBy(() -> new TopicSinkClient("blah")).isInstanceOf(TopicSinkClientException.class)
+                            .hasMessage("no sinks for topic: blah");
+    }
+
+    @Test
+    public void testSend() {
         client.send(Arrays.asList("abc", "def"));
         verify(sink).send("['abc','def']".replace('\'', '"'));