a1e03155c0edc5f8e89bbf7a6e43df2843019a26
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.client;
22
23 import java.util.Arrays;
24 import java.util.List;
25 import lombok.Getter;
26 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
28 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
32
33 /**
34  * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
35  * requests and the other to receive responses.
36  */
37 @Getter
38 public class BidirectionalTopicClient {
39     private final String sinkTopic;
40     private final String sourceTopic;
41     private final TopicSinkClient sinkClient;
42     private final TopicSource source;
43     private final CommInfrastructure sinkTopicCommInfrastructure;
44     private final CommInfrastructure sourceTopicCommInfrastructure;
45
46     /**
47      * Constructs the object.
48      *
49      * @param sinkTopic sink topic name
50      * @param sourceTopic source topic name
51      * @throws BidirectionalTopicClientException if either topic does not exist
52      */
53     public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
54         this.sinkTopic = sinkTopic;
55         this.sourceTopic = sourceTopic;
56
57         // init sinkClient
58         try {
59             // if the manager is overridden here, then override it in the sink client, too
60             this.sinkClient = new TopicSinkClient(sinkTopic) {
61                 @Override
62                 protected List<TopicSink> getTopicSinks(String topic) {
63                     return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
64                 }
65             };
66         } catch (TopicSinkClientException e) {
67             throw new BidirectionalTopicClientException(e);
68         }
69
70         // init source
71         List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
72         if (sources.isEmpty()) {
73             throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
74         } else if (sources.size() > 1) {
75             throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
76         }
77
78         this.source = sources.get(0);
79
80         this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
81         this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
82     }
83
84     public TopicSink getSink() {
85         return sinkClient.getSink();
86     }
87
88     public boolean send(Object message) {
89         return sinkClient.send(message);
90     }
91
92     public void register(TopicListener topicListener) {
93         source.register(topicListener);
94     }
95
96     public boolean offer(String event) {
97         return source.offer(event);
98     }
99
100     public void unregister(TopicListener topicListener) {
101         source.unregister(topicListener);
102     }
103
104     // these may be overridden by junit tests
105
106     protected TopicEndpoint getTopicEndpointManager() {
107         return TopicEndpointManager.getManager();
108     }
109 }