2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.client;
23 import java.util.Arrays;
24 import java.util.List;
25 import java.util.concurrent.BlockingDeque;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.TimeUnit;
29 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
30 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
32 import org.onap.policy.common.endpoints.event.comm.TopicListener;
33 import org.onap.policy.common.endpoints.event.comm.TopicSink;
34 import org.onap.policy.common.endpoints.event.comm.TopicSource;
35 import org.onap.policy.common.utils.coder.Coder;
36 import org.onap.policy.common.utils.coder.CoderException;
37 import org.onap.policy.common.utils.coder.StandardCoder;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
42 * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
43 * requests and the other to receive responses.
46 public class BidirectionalTopicClient {
47 private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class);
48 private static final Coder coder = new StandardCoder();
50 private final String sinkTopic;
51 private final String sourceTopic;
52 private final TopicSink sink;
53 private final TopicSource source;
54 private final CommInfrastructure sinkTopicCommInfrastructure;
55 private final CommInfrastructure sourceTopicCommInfrastructure;
58 * Used when checking whether or not a message sent on the sink topic can be received
59 * on the source topic. When a matching message is received on the incoming topic,
60 * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting
61 * thread is interrupted, then {@code false} is placed on the queue. Whenever a value
62 * is pulled from the queue, it is immediately placed back on the queue.
64 private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>();
68 * Constructs the object.
70 * @param sinkTopic sink topic name
71 * @param sourceTopic source topic name
72 * @throws BidirectionalTopicClientException if either topic does not exist
74 public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
75 this.sinkTopic = sinkTopic;
76 this.sourceTopic = sourceTopic;
79 List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
80 if (sinks.isEmpty()) {
81 throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
82 } else if (sinks.size() > 1) {
83 throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
86 this.sink = sinks.get(0);
89 List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
90 if (sources.isEmpty()) {
91 throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
92 } else if (sources.size() > 1) {
93 throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
96 this.source = sources.get(0);
98 this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
99 this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
102 public boolean send(String message) {
103 return sink.send(message);
106 public void register(TopicListener topicListener) {
107 source.register(topicListener);
110 public boolean offer(String event) {
111 return source.offer(event);
114 public void unregister(TopicListener topicListener) {
115 source.unregister(topicListener);
119 * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has
120 * previously returned {@code true}).
122 * @return {@code true}, if the topic is ready to send and receive
124 public boolean isReady() {
125 return Boolean.TRUE.equals(checkerQueue.peek());
129 * Waits for the bidirectional topic to become "ready" by publishing a message on the
130 * sink topic and awaiting receipt of the message on the source topic. If the message
131 * is not received within a few seconds, then it tries again. This process is
132 * continued until the message is received, {@link #stop()} is called, or this thread
133 * is interrupted. Once this returns, subsequent calls will return immediately, always
134 * with the same value.
136 * @param message message to be sent to the sink topic. Note: the equals() method must
137 * return {@code true} if and only if two messages are the same
138 * @param waitMs time to wait, in milliseconds, before re-sending the message
139 * @return {@code true} if the message was received from the source topic,
140 * {@code false} if this method was stopped or interrupted before receipt of
142 * @throws CoderException if the message cannot be encoded
144 public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException {
145 // see if we already know the answer
146 if (!checkerQueue.isEmpty()) {
147 return checkerQueue.peek();
150 final String messageText = coder.encode(message);
152 // class of message to be decoded
153 @SuppressWarnings("unchecked")
154 final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
156 // create a listener to detect when a matching message is received
157 final TopicListener listener = (infra, topic, msg) -> {
159 T incoming = decode(msg, clazz);
161 if (message.equals(incoming)) {
162 logger.info("topic {} is ready; found matching message {}", topic, incoming);
163 checkerQueue.add(Boolean.TRUE);
166 } catch (CoderException e) {
167 logger.warn("cannot decode message from topic {}", topic, e);
172 source.register(listener);
174 // loop until the message is received
179 } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null);
181 // put it back on the queue
182 checkerQueue.add(result);
184 } catch (InterruptedException e) {
185 logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e);
186 Thread.currentThread().interrupt();
187 checkerQueue.add(Boolean.FALSE);
190 source.unregister(listener);
193 return checkerQueue.peek();
197 * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
198 * adding {@code false} to the queue.
200 public void stopWaiting() {
201 checkerQueue.add(Boolean.FALSE);
204 // these may be overridden by junit tests
206 protected TopicEndpoint getTopicEndpointManager() {
207 return TopicEndpointManager.getManager();
210 protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
211 return coder.decode(msg, clazz);
214 protected void decodeFailed() {
215 // already logged - nothing else to do