2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.client;
23 import java.util.Arrays;
24 import java.util.List;
26 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
28 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
34 * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
35 * requests and the other to receive responses.
38 public class BidirectionalTopicClient {
39 private final String sinkTopic;
40 private final String sourceTopic;
41 private final TopicSink sink;
42 private final TopicSource source;
43 private final CommInfrastructure sinkTopicCommInfrastructure;
44 private final CommInfrastructure sourceTopicCommInfrastructure;
47 * Constructs the object.
49 * @param sinkTopic sink topic name
50 * @param sourceTopic source topic name
51 * @throws BidirectionalTopicClientException if either topic does not exist
53 public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
54 this.sinkTopic = sinkTopic;
55 this.sourceTopic = sourceTopic;
58 List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
59 if (sinks.isEmpty()) {
60 throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
61 } else if (sinks.size() > 1) {
62 throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
65 this.sink = sinks.get(0);
68 List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
69 if (sources.isEmpty()) {
70 throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
71 } else if (sources.size() > 1) {
72 throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
75 this.source = sources.get(0);
77 this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
78 this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
81 public boolean send(String message) {
82 return sink.send(message);
85 public void register(TopicListener topicListener) {
86 source.register(topicListener);
89 public boolean offer(String event) {
90 return source.offer(event);
93 public void unregister(TopicListener topicListener) {
94 source.unregister(topicListener);
97 // these may be overridden by junit tests
99 protected TopicEndpoint getTopicEndpointManager() {
100 return TopicEndpointManager.getManager();