2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.common.rules.test;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.function.Function;
28 import java.util.function.Predicate;
29 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
30 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
32 import org.onap.policy.common.endpoints.event.comm.TopicListener;
33 import org.onap.policy.common.endpoints.event.comm.TopicSink;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Listener for messages published on a topic SINK.
40 * @param <T> message type
42 public class Listener<T> implements TopicListener {
43 private static final Logger logger = LoggerFactory.getLogger(Listener.class);
44 private static final long DEFAULT_WAIT_SEC = 10L;
46 private final TopicSink sink;
47 private final Function<String, T> decoder;
48 private final BlockingQueue<T> messages = new LinkedBlockingQueue<>();
51 * Constructs the object.
53 * @param topicName name of the NOOP topic SINK on which to listen
54 * @param decoder function that takes a topic name and a message and decodes it into
57 public Listener(String topicName, Function<String, T> decoder) {
58 this.sink = getTopicManager().getNoopTopicSink(topicName);
59 this.decoder = decoder;
60 this.sink.register(this);
64 * Determines if there are any messages waiting.
66 * @return {@code true} if there are no messages waiting, {@code false} otherwise
68 public boolean isEmpty() {
69 return messages.isEmpty();
73 * Waits, for the default amount of time, for a message to be published to the topic.
75 * @return the message that was published
76 * @throws TopicException if interrupted or no message is received within the
80 return await(DEFAULT_WAIT_SEC, TimeUnit.SECONDS);
84 * Waits, for the specified period of time, for a message to be published to the
87 * @param twait maximum time to wait
88 * @param unit time unit
89 * @return the message that was published
90 * @throws TopicException if interrupted or no message is received within the
93 public T await(long twait, TimeUnit unit) {
94 return await(twait, unit, msg -> true);
98 * Waits, for the default amount of time, for a message to be published to the topic.
100 * @param filter filter used to select the message of interest; preceding messages
101 * that do not pass the filter are discarded
102 * @return the message that was published
103 * @throws TopicException if interrupted or no message is received within the
106 public T await(Predicate<T> filter) {
107 return await(DEFAULT_WAIT_SEC, TimeUnit.SECONDS, filter);
111 * Waits, for the specified period of time, for a message to be published to the
114 * @param twait maximum time to wait
115 * @param unit time unit
116 * @param filter filter used to select the message of interest; preceding messages
117 * that do not pass the filter are discarded
118 * @return the message that was published
119 * @throws TopicException if interrupted or no message is received within the
122 public T await(long twait, TimeUnit unit, Predicate<T> filter) {
123 long endMs = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(twait, unit);
127 long remainingMs = endMs - System.currentTimeMillis();
128 if (remainingMs < 0) {
129 throw new TimeoutException();
132 T msg = pollMessage(remainingMs);
134 throw new TimeoutException();
137 if (filter.test(msg)) {
141 logger.info("message discarded by the filter on topic {}", sink.getTopic());
143 } catch (InterruptedException e) {
144 logger.warn("'await' interrupted on topic {}", sink.getTopic());
145 Thread.currentThread().interrupt();
146 throw new TopicException(e);
148 } catch (TimeoutException e) {
149 logger.warn("'await' timed out on topic {}", sink.getTopic());
150 throw new TopicException(e);
156 * Unregisters the listener.
158 public void unregister() {
159 sink.unregister(this);
163 public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
165 messages.add(decoder.apply(event));
166 } catch (RuntimeException e) {
167 logger.warn("cannot decode message on topic {} for event {}", topic, event, e);
171 // these methods may be overridden by junit tests
173 protected TopicEndpoint getTopicManager() {
174 return TopicEndpointManager.getManager();
177 protected T pollMessage(long remainingMs) throws InterruptedException {
178 return messages.poll(remainingMs, TimeUnit.MILLISECONDS);