BidirectionalTopic should use plain TopicSink 07/102807/1
authorJim Hahn <jrh3@att.com>
Tue, 3 Mar 2020 04:43:06 +0000 (23:43 -0500)
committerJim Hahn <jrh3@att.com>
Tue, 3 Mar 2020 04:43:06 +0000 (23:43 -0500)
BidirectionalTopicClient use plain TopicSink instead of
TopicSinkClient, because the latter encodes its message, while
BidirectionalTopicClient should not, because encoding should
be left up to the user of the class.

Issue-ID: POLICY-1625
Signed-off-by: Jim Hahn <jrh3@att.com>
Change-Id: I6c67e1ee0c56e96a0efcc90eaf1c0a940902e8b3

policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java

index a1e0315..57a331c 100644 (file)
@@ -38,7 +38,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource;
 public class BidirectionalTopicClient {
     private final String sinkTopic;
     private final String sourceTopic;
-    private final TopicSinkClient sinkClient;
+    private final TopicSink sink;
     private final TopicSource source;
     private final CommInfrastructure sinkTopicCommInfrastructure;
     private final CommInfrastructure sourceTopicCommInfrastructure;
@@ -55,18 +55,15 @@ public class BidirectionalTopicClient {
         this.sourceTopic = sourceTopic;
 
         // init sinkClient
-        try {
-            // if the manager is overridden here, then override it in the sink client, too
-            this.sinkClient = new TopicSinkClient(sinkTopic) {
-                @Override
-                protected List<TopicSink> getTopicSinks(String topic) {
-                    return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
-                }
-            };
-        } catch (TopicSinkClientException e) {
-            throw new BidirectionalTopicClientException(e);
+        List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
+        if (sinks.isEmpty()) {
+            throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
+        } else if (sinks.size() > 1) {
+            throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
         }
 
+        this.sink = sinks.get(0);
+
         // init source
         List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
         if (sources.isEmpty()) {
@@ -77,16 +74,12 @@ public class BidirectionalTopicClient {
 
         this.source = sources.get(0);
 
-        this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
+        this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
         this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
     }
 
-    public TopicSink getSink() {
-        return sinkClient.getSink();
-    }
-
-    public boolean send(Object message) {
-        return sinkClient.send(message);
+    public boolean send(String message) {
+        return sink.send(message);
     }
 
     public void register(TopicListener topicListener) {
index 9b1018d..3f0a002 100644 (file)
@@ -33,7 +33,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
-import java.util.Map;
 import java.util.Properties;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -111,7 +110,6 @@ public class BidirectionalTopicClientTest {
 
     @Test
     public void testBidirectionalTopicClient_testGetters() {
-        assertNotNull(client.getSinkClient());
         assertSame(sink, client.getSink());
         assertSame(source, client.getSource());
         assertEquals(SINK_TOPIC, client.getSinkTopic());
@@ -127,7 +125,7 @@ public class BidirectionalTopicClientTest {
     public void testBidirectionalTopicClientExceptions() {
         assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
                         .isInstanceOf(BidirectionalTopicClientException.class)
-                        .hasCauseInstanceOf(TopicSinkClientException.class);
+                        .hasMessage("no sinks for topic: unknown-sink");
 
         assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
                         .isInstanceOf(BidirectionalTopicClientException.class)
@@ -146,8 +144,8 @@ public class BidirectionalTopicClientTest {
      */
     @Test
     public void testDelegates() {
-        assertTrue(client.send(Map.of("outgoing", "outgoing-text")));
-        verify(sink).send("{\"outgoing\":\"outgoing-text\"}");
+        assertTrue(client.send("hello"));
+        verify(sink).send("hello");
 
         assertTrue(client.offer("incoming"));
         verify(source).offer("incoming");