2a9f1446d945d66839134533513ec16aa911c7fd
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.client;
22
23 import java.util.Arrays;
24 import java.util.List;
25 import java.util.concurrent.BlockingDeque;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.TimeUnit;
28 import lombok.Getter;
29 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
30 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
32 import org.onap.policy.common.endpoints.event.comm.TopicListener;
33 import org.onap.policy.common.endpoints.event.comm.TopicSink;
34 import org.onap.policy.common.endpoints.event.comm.TopicSource;
35 import org.onap.policy.common.utils.coder.Coder;
36 import org.onap.policy.common.utils.coder.CoderException;
37 import org.onap.policy.common.utils.coder.StandardCoder;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
43  * requests and the other to receive responses.
44  */
45 @Getter
46 public class BidirectionalTopicClient {
47     private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class);
48     private static final Coder coder = new StandardCoder();
49
50     private final String sinkTopic;
51     private final String sourceTopic;
52     private final TopicSink sink;
53     private final TopicSource source;
54     private final CommInfrastructure sinkTopicCommInfrastructure;
55     private final CommInfrastructure sourceTopicCommInfrastructure;
56
57     /**
58      * Used when checking whether or not a message sent on the sink topic can be received
59      * on the source topic. When a matching message is received on the incoming topic,
60      * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting
61      * thread is interrupted, then {@code false} is placed on the queue. Whenever a value
62      * is pulled from the queue, it is immediately placed back on the queue.
63      */
64     private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>();
65
66
67     /**
68      * Constructs the object.
69      *
70      * @param sinkTopic sink topic name
71      * @param sourceTopic source topic name
72      * @throws BidirectionalTopicClientException if either topic does not exist
73      */
74     public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
75         this.sinkTopic = sinkTopic;
76         this.sourceTopic = sourceTopic;
77
78         // init sinkClient
79         List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
80         if (sinks.isEmpty()) {
81             throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
82         } else if (sinks.size() > 1) {
83             throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
84         }
85
86         this.sink = sinks.get(0);
87
88         // init source
89         List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
90         if (sources.isEmpty()) {
91             throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
92         } else if (sources.size() > 1) {
93             throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
94         }
95
96         this.source = sources.get(0);
97
98         this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
99         this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
100     }
101
102     public boolean send(String message) {
103         return sink.send(message);
104     }
105
106     public void register(TopicListener topicListener) {
107         source.register(topicListener);
108     }
109
110     public boolean offer(String event) {
111         return source.offer(event);
112     }
113
114     public void unregister(TopicListener topicListener) {
115         source.unregister(topicListener);
116     }
117
118     /**
119      * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has
120      * previously returned {@code true}).
121      *
122      * @return {@code true}, if the topic is ready to send and receive
123      */
124     public boolean isReady() {
125         return Boolean.TRUE.equals(checkerQueue.peek());
126     }
127
128     /**
129      * Waits for the bidirectional topic to become "ready" by publishing a message on the
130      * sink topic and awaiting receipt of the message on the source topic. If the message
131      * is not received within a few seconds, then it tries again. This process is
132      * continued until the message is received, {@link #stop()} is called, or this thread
133      * is interrupted. Once this returns, subsequent calls will return immediately, always
134      * with the same value.
135      *
136      * @param message message to be sent to the sink topic. Note: the equals() method must
137      *        return {@code true} if and only if two messages are the same
138      * @param waitMs time to wait, in milliseconds, before re-sending the message
139      * @return {@code true} if the message was received from the source topic,
140      *         {@code false} if this method was stopped or interrupted before receipt of
141      *         the message
142      * @throws CoderException if the message cannot be encoded
143      */
144     public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException {
145         // see if we already know the answer
146         if (!checkerQueue.isEmpty()) {
147             return checkerQueue.peek();
148         }
149
150         final String messageText = coder.encode(message);
151
152         // class of message to be decoded
153         @SuppressWarnings("unchecked")
154         final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
155
156         // create a listener to detect when a matching message is received
157         final TopicListener listener = (infra, topic, msg) -> {
158             try {
159                 T incoming = decode(msg, clazz);
160
161                 if (message.equals(incoming)) {
162                     logger.info("topic {} is ready; found matching message {}", topic, incoming);
163                     checkerQueue.add(Boolean.TRUE);
164                 }
165
166             } catch (CoderException e) {
167                 logger.warn("cannot decode message from topic {}", topic, e);
168                 decodeFailed();
169             }
170         };
171
172         source.register(listener);
173
174         // loop until the message is received
175         try {
176             Boolean result;
177             do {
178                 send(messageText);
179             } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null);
180
181             // put it back on the queue
182             checkerQueue.add(result);
183
184         } catch (InterruptedException e) {
185             logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e);
186             Thread.currentThread().interrupt();
187             checkerQueue.add(Boolean.FALSE);
188
189         } finally {
190             source.unregister(listener);
191         }
192
193         return checkerQueue.peek();
194     }
195
196     /**
197      * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
198      * adding {@code false} to the queue.
199      */
200     public void stopWaiting() {
201         checkerQueue.add(Boolean.FALSE);
202     }
203
204     // these may be overridden by junit tests
205
206     protected TopicEndpoint getTopicEndpointManager() {
207         return TopicEndpointManager.getManager();
208     }
209
210     protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
211         return coder.decode(msg, clazz);
212     }
213
214     protected void decodeFailed() {
215         // already logged - nothing else to do
216     }
217 }