57a331c13d5027b9d8464f68f6bbcfeada0f7990
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.client;
22
23 import java.util.Arrays;
24 import java.util.List;
25 import lombok.Getter;
26 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
28 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.TopicSink;
31 import org.onap.policy.common.endpoints.event.comm.TopicSource;
32
33 /**
34  * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
35  * requests and the other to receive responses.
36  */
37 @Getter
38 public class BidirectionalTopicClient {
39     private final String sinkTopic;
40     private final String sourceTopic;
41     private final TopicSink sink;
42     private final TopicSource source;
43     private final CommInfrastructure sinkTopicCommInfrastructure;
44     private final CommInfrastructure sourceTopicCommInfrastructure;
45
46     /**
47      * Constructs the object.
48      *
49      * @param sinkTopic sink topic name
50      * @param sourceTopic source topic name
51      * @throws BidirectionalTopicClientException if either topic does not exist
52      */
53     public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
54         this.sinkTopic = sinkTopic;
55         this.sourceTopic = sourceTopic;
56
57         // init sinkClient
58         List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
59         if (sinks.isEmpty()) {
60             throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
61         } else if (sinks.size() > 1) {
62             throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
63         }
64
65         this.sink = sinks.get(0);
66
67         // init source
68         List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
69         if (sources.isEmpty()) {
70             throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
71         } else if (sources.size() > 1) {
72             throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
73         }
74
75         this.source = sources.get(0);
76
77         this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
78         this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
79     }
80
81     public boolean send(String message) {
82         return sink.send(message);
83     }
84
85     public void register(TopicListener topicListener) {
86         source.register(topicListener);
87     }
88
89     public boolean offer(String event) {
90         return source.offer(event);
91     }
92
93     public void unregister(TopicListener topicListener) {
94         source.unregister(topicListener);
95     }
96
97     // these may be overridden by junit tests
98
99     protected TopicEndpoint getTopicEndpointManager() {
100         return TopicEndpointManager.getManager();
101     }
102 }