2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.client;
23 import java.util.Arrays;
24 import java.util.List;
26 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
28 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
34 * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
35 * requests and the other to receive responses.
38 public class BidirectionalTopicClient {
39 private final String sinkTopic;
40 private final String sourceTopic;
41 private final TopicSinkClient sinkClient;
42 private final TopicSource source;
43 private final CommInfrastructure sinkTopicCommInfrastructure;
44 private final CommInfrastructure sourceTopicCommInfrastructure;
47 * Constructs the object.
49 * @param sinkTopic sink topic name
50 * @param sourceTopic source topic name
51 * @throws BidirectionalTopicClientException if either topic does not exist
53 public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
54 this.sinkTopic = sinkTopic;
55 this.sourceTopic = sourceTopic;
59 // if the manager is overridden here, then override it in the sink client, too
60 this.sinkClient = new TopicSinkClient(sinkTopic) {
62 protected List<TopicSink> getTopicSinks(String topic) {
63 return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
66 } catch (TopicSinkClientException e) {
67 throw new BidirectionalTopicClientException(e);
71 List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
72 if (sources.isEmpty()) {
73 throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
74 } else if (sources.size() > 1) {
75 throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
78 this.source = sources.get(0);
80 this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
81 this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
84 public TopicSink getSink() {
85 return sinkClient.getSink();
88 public boolean send(Object message) {
89 return sinkClient.send(message);
92 public void register(TopicListener topicListener) {
93 source.register(topicListener);
96 public boolean offer(String event) {
97 return source.offer(event);
100 public void unregister(TopicListener topicListener) {
101 source.unregister(topicListener);
104 // these may be overridden by junit tests
106 protected TopicEndpoint getTopicEndpointManager() {
107 return TopicEndpointManager.getManager();