b5d53909e4f080356b37d0dff7724836104f332a
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.client;
23
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.concurrent.BlockingDeque;
28 import java.util.concurrent.LinkedBlockingDeque;
29 import java.util.concurrent.TimeUnit;
30 import lombok.Getter;
31 import org.jetbrains.annotations.NotNull;
32 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
33 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
34 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
35 import org.onap.policy.common.endpoints.event.comm.TopicListener;
36 import org.onap.policy.common.endpoints.event.comm.TopicSink;
37 import org.onap.policy.common.endpoints.event.comm.TopicSource;
38 import org.onap.policy.common.utils.coder.Coder;
39 import org.onap.policy.common.utils.coder.CoderException;
40 import org.onap.policy.common.utils.coder.StandardCoder;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
46  * requests and the other to receive responses.
47  */
48 @Getter
49 public class BidirectionalTopicClient {
50     private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicClient.class);
51     private static final Coder coder = new StandardCoder();
52
53     private final String sinkTopic;
54     private final String sourceTopic;
55     private final TopicSink sink;
56     private final TopicSource source;
57     private final CommInfrastructure sinkTopicCommInfrastructure;
58     private final CommInfrastructure sourceTopicCommInfrastructure;
59
60     /**
61      * Used when checking whether a message sent on the sink topic can be received
62      * on the source topic. When a matching message is received on the incoming topic,
63      * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting
64      * thread is interrupted, then {@code false} is placed on the queue. Whenever a value
65      * is pulled from the queue, it is immediately placed back on the queue.
66      */
67     private final BlockingDeque<Boolean> checkerQueue = new LinkedBlockingDeque<>();
68
69
70     /**
71      * Constructs the object.
72      *
73      * @param sinkTopic sink topic name
74      * @param sourceTopic source topic name
75      * @throws BidirectionalTopicClientException if either topic does not exist
76      */
77     public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
78         this.sinkTopic = sinkTopic.toLowerCase();
79         this.sourceTopic = sourceTopic.toLowerCase();
80
81         // init sinkClient
82         List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
83         if (sinks.isEmpty()) {
84             throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
85         } else if (sinks.size() > 1) {
86             throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
87         }
88
89         this.sink = sinks.get(0);
90
91         // init source
92         List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic));
93         if (sources.isEmpty()) {
94             throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
95         } else if (sources.size() > 1) {
96             throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
97         }
98
99         this.source = sources.get(0);
100
101         this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
102         this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
103     }
104
105     public boolean send(String message) {
106         return sink.send(message);
107     }
108
109     public void register(TopicListener topicListener) {
110         source.register(topicListener);
111     }
112
113     public boolean offer(String event) {
114         return source.offer(event);
115     }
116
117     public void unregister(TopicListener topicListener) {
118         source.unregister(topicListener);
119     }
120
121     /**
122      * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has
123      * previously returned {@code true}).
124      *
125      * @return {@code true}, if the topic is ready to send and receive
126      */
127     public boolean isReady() {
128         return Boolean.TRUE.equals(checkerQueue.peek());
129     }
130
131     /**
132      * Waits for the bidirectional topic to become "ready" by publishing a message on the
133      * sink topic and awaiting receipt of the message on the source topic. If the message
134      * is not received within a few seconds, then it tries again. This process is
135      * continued until the message is received, {@link #stopWaiting()} is called, or this thread
136      * is interrupted. Once this returns, subsequent calls will return immediately, always
137      * with the same value.
138      *
139      * @param message message to be sent to the sink topic. Note: the equals() method must
140      *        return {@code true} if and only if two messages are the same
141      * @param waitMs time to wait, in milliseconds, before re-sending the message
142      * @return {@code true} if the message was received from the source topic,
143      *         {@code false} if this method was stopped or interrupted before receipt of
144      *         the message
145      * @throws CoderException if the message cannot be encoded
146      */
147     public synchronized <T> boolean awaitReady(T message, long waitMs) throws CoderException {
148         // see if we already know the answer
149         if (!checkerQueue.isEmpty()) {
150             return checkerQueue.peek();
151         }
152
153         final String messageText = coder.encode(message);
154
155         // class of message to be decoded
156         final TopicListener listener = getTopicListener(message);
157
158         source.register(listener);
159
160         // loop until the message is received
161         try {
162             Boolean result;
163             do {
164                 send(messageText);
165             } while ((result = checkerQueue.poll(waitMs, TimeUnit.MILLISECONDS)) == null);
166
167             // put it back on the queue
168             checkerQueue.add(result);
169
170         } catch (InterruptedException e) {
171             logger.error("interrupted waiting for topic sink {} source {}", sink.getTopic(), source.getTopic(), e);
172             Thread.currentThread().interrupt();
173             checkerQueue.add(Boolean.FALSE);
174
175         } finally {
176             source.unregister(listener);
177         }
178
179         return checkerQueue.peek();
180     }
181
182     @NotNull
183     private <T> TopicListener getTopicListener(T message) {
184         @SuppressWarnings("unchecked")
185         final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
186
187         // create a listener to detect when a matching message is received
188         return (infra, topic, msg) -> {
189             try {
190                 T incoming = decode(msg, clazz);
191
192                 if (message.equals(incoming)) {
193                     logger.info("topic {} is ready; found matching message {}", topic, incoming);
194                     checkerQueue.add(Boolean.TRUE);
195                 }
196
197             } catch (CoderException e) {
198                 logger.warn("cannot decode message from topic {}", topic, e);
199                 decodeFailed();
200             }
201         };
202     }
203
204     /**
205      * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
206      * adding {@code false} to the queue.
207      */
208     public void stopWaiting() {
209         checkerQueue.add(Boolean.FALSE);
210     }
211
212     // these may be overridden by junit tests
213
214     protected TopicEndpoint getTopicEndpointManager() {
215         return TopicEndpointManager.getManager();
216     }
217
218     protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
219         return coder.decode(msg, clazz);
220     }
221
222     protected void decodeFailed() {
223         // already logged - nothing else to do
224     }
225 }