a353c98fb93254bb5fec912ee54d943e91ceab6e
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.common.rules.test;
22
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import java.util.function.Function;
28 import java.util.function.Predicate;
29 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
30 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
32 import org.onap.policy.common.endpoints.event.comm.TopicListener;
33 import org.onap.policy.common.endpoints.event.comm.TopicSink;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Listener for messages published on a topic SINK.
39  *
40  * @param <T> message type
41  */
42 public class Listener<T> implements TopicListener {
43     private static final Logger logger = LoggerFactory.getLogger(Listener.class);
44     private static final long DEFAULT_WAIT_SEC = 5L;
45
46     private final TopicSink sink;
47     private final Function<String, T> decoder;
48     private final BlockingQueue<T> messages = new LinkedBlockingQueue<>();
49
50     /**
51      * Constructs the object.
52      *
53      * @param topicName name of the NOOP topic SINK on which to listen
54      * @param decoder function that takes a topic name and a message and decodes it into
55      *        the desired type
56      */
57     public Listener(String topicName, Function<String, T> decoder) {
58         this.sink = getTopicManager().getNoopTopicSink(topicName);
59         this.decoder = decoder;
60         this.sink.register(this);
61     }
62
63     /**
64      * Determines if there are any messages waiting.
65      *
66      * @return {@code true} if there are no messages waiting, {@code false} otherwise
67      */
68     public boolean isEmpty() {
69         return messages.isEmpty();
70     }
71
72     /**
73      * Waits, for the default amount of time, for a message to be published to the topic.
74      *
75      * @return the message that was published
76      * @throws TopicException if interrupted or no message is received within the
77      *         specified time
78      */
79     public T await() {
80         return await(DEFAULT_WAIT_SEC, TimeUnit.SECONDS);
81     }
82
83     /**
84      * Waits, for the specified period of time, for a message to be published to the
85      * topic.
86      *
87      * @param twait maximum time to wait
88      * @param unit time unit
89      * @return the message that was published
90      * @throws TopicException if interrupted or no message is received within the
91      *         specified time
92      */
93     public T await(long twait, TimeUnit unit) {
94         return await(twait, unit, msg -> true);
95     }
96
97     /**
98      * Waits, for the default amount of time, for a message to be published to the topic.
99      *
100      * @param filter filter used to select the message of interest; preceding messages
101      *        that do not pass the filter are discarded
102      * @return the message that was published
103      * @throws TopicException if interrupted or no message is received within the
104      *         specified time
105      */
106     public T await(Predicate<T> filter) {
107         return await(DEFAULT_WAIT_SEC, TimeUnit.SECONDS, filter);
108     }
109
110     /**
111      * Waits, for the specified period of time, for a message to be published to the
112      * topic.
113      *
114      * @param twait maximum time to wait
115      * @param unit time unit
116      * @param filter filter used to select the message of interest; preceding messages
117      *        that do not pass the filter are discarded
118      * @return the message that was published
119      * @throws TopicException if interrupted or no message is received within the
120      *         specified time
121      */
122     public T await(long twait, TimeUnit unit, Predicate<T> filter) {
123         long endMs = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(twait, unit);
124
125         for (;;) {
126             try {
127                 long remainingMs = endMs - System.currentTimeMillis();
128                 if (remainingMs < 0) {
129                     throw new TimeoutException();
130                 }
131
132                 T msg = pollMessage(remainingMs);
133                 if (msg == null) {
134                     throw new TimeoutException();
135                 }
136
137                 if (filter.test(msg)) {
138                     return msg;
139                 }
140
141                 logger.info("message discarded by the filter on topic {}", sink.getTopic());
142
143             } catch (InterruptedException e) {
144                 logger.warn("'await' interrupted on topic {}", sink.getTopic());
145                 Thread.currentThread().interrupt();
146                 throw new TopicException(e);
147
148             } catch (TimeoutException e) {
149                 logger.warn("'await' timed out on topic {}", sink.getTopic());
150                 throw new TopicException(e);
151             }
152         }
153     }
154
155     /**
156      * Unregisters the listener.
157      */
158     public void unregister() {
159         sink.unregister(this);
160     }
161
162     @Override
163     public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
164         try {
165             messages.add(decoder.apply(event));
166         } catch (RuntimeException e) {
167             logger.warn("cannot decode message on topic {} for event {}", topic, event, e);
168         }
169     }
170
171     // these methods may be overridden by junit tests
172
173     protected TopicEndpoint getTopicManager() {
174         return TopicEndpointManager.getManager();
175     }
176
177     protected T pollMessage(long remainingMs) throws InterruptedException {
178         return messages.poll(remainingMs, TimeUnit.MILLISECONDS);
179     }
180 }