public class BidirectionalTopicClient {
private final String sinkTopic;
private final String sourceTopic;
- private final TopicSinkClient sinkClient;
+ private final TopicSink sink;
private final TopicSource source;
private final CommInfrastructure sinkTopicCommInfrastructure;
private final CommInfrastructure sourceTopicCommInfrastructure;
this.sourceTopic = sourceTopic;
// init sinkClient
- try {
- // if the manager is overridden here, then override it in the sink client, too
- this.sinkClient = new TopicSinkClient(sinkTopic) {
- @Override
- protected List<TopicSink> getTopicSinks(String topic) {
- return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
- }
- };
- } catch (TopicSinkClientException e) {
- throw new BidirectionalTopicClientException(e);
+ List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
+ if (sinks.isEmpty()) {
+ throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
+ } else if (sinks.size() > 1) {
+ throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
}
+ this.sink = sinks.get(0);
+
// init source
List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
if (sources.isEmpty()) {
this.source = sources.get(0);
- this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
+ this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
}
- public TopicSink getSink() {
- return sinkClient.getSink();
- }
-
- public boolean send(Object message) {
- return sinkClient.send(message);
+ public boolean send(String message) {
+ return sink.send(message);
}
public void register(TopicListener topicListener) {
import static org.mockito.Mockito.when;
import java.util.Arrays;
-import java.util.Map;
import java.util.Properties;
import org.junit.AfterClass;
import org.junit.Before;
@Test
public void testBidirectionalTopicClient_testGetters() {
- assertNotNull(client.getSinkClient());
assertSame(sink, client.getSink());
assertSame(source, client.getSource());
assertEquals(SINK_TOPIC, client.getSinkTopic());
public void testBidirectionalTopicClientExceptions() {
assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
.isInstanceOf(BidirectionalTopicClientException.class)
- .hasCauseInstanceOf(TopicSinkClientException.class);
+ .hasMessage("no sinks for topic: unknown-sink");
assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
.isInstanceOf(BidirectionalTopicClientException.class)
*/
@Test
public void testDelegates() {
- assertTrue(client.send(Map.of("outgoing", "outgoing-text")));
- verify(sink).send("{\"outgoing\":\"outgoing-text\"}");
+ assertTrue(client.send("hello"));
+ verify(sink).send("hello");
assertTrue(client.offer("incoming"));
verify(source).offer("incoming");