2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.doThrow;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.never;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.onap.policy.common.endpoints.event.comm.TopicListener;
42 import org.onap.policy.common.endpoints.event.comm.TopicSink;
43 import org.onap.policy.common.endpoints.event.comm.TopicSource;
45 public class DmaapManagerTest {
47 private static final String EXPECTED = "expected";
48 private static final String MY_TOPIC = "my.topic";
49 private static final String MSG = "a message";
51 private TopicListener listener;
52 private TopicSource source;
53 private boolean gotSources;
54 private TopicSink sink;
55 private boolean gotSinks;
56 private DmaapManager mgr;
61 * @throws Exception throws an exception
64 public void setUp() throws Exception {
65 listener = mock(TopicListener.class);
66 source = mock(TopicSource.class);
68 sink = mock(TopicSink.class);
71 when(source.getTopic()).thenReturn(MY_TOPIC);
73 when(sink.getTopic()).thenReturn(MY_TOPIC);
74 when(sink.send(any())).thenReturn(true);
76 mgr = new DmaapManagerImpl(MY_TOPIC);
80 public void testDmaapManager() {
81 // verify that the init methods were called
82 assertTrue(gotSources);
86 @Test(expected = PoolingFeatureException.class)
87 public void testDmaapManager_PoolingEx() throws PoolingFeatureException {
88 // force error by having no topics match
89 when(source.getTopic()).thenReturn("");
91 new DmaapManagerImpl(MY_TOPIC);
94 @Test(expected = PoolingFeatureException.class)
95 public void testDmaapManager_IllegalArgEx() throws PoolingFeatureException {
97 new DmaapManagerImpl(MY_TOPIC) {
99 protected List<TopicSource> getTopicSources() {
100 throw new IllegalArgumentException(EXPECTED);
106 public void testGetTopic() {
107 assertEquals(MY_TOPIC, mgr.getTopic());
110 @Test(expected = PoolingFeatureException.class)
111 public void testFindTopicSource_NotFound() throws PoolingFeatureException {
112 // one item in list, and its topic doesn't match
113 new DmaapManagerImpl(MY_TOPIC) {
115 protected List<TopicSource> getTopicSources() {
116 return Arrays.asList(mock(TopicSource.class));
121 @Test(expected = PoolingFeatureException.class)
122 public void testFindTopicSource_EmptyList() throws PoolingFeatureException {
124 new DmaapManagerImpl(MY_TOPIC) {
126 protected List<TopicSource> getTopicSources() {
127 return Collections.emptyList();
132 @Test(expected = PoolingFeatureException.class)
133 public void testFindTopicSink_NotFound() throws PoolingFeatureException {
134 // one item in list, and its topic doesn't match
135 new DmaapManagerImpl(MY_TOPIC) {
137 protected List<TopicSink> getTopicSinks() {
138 return Arrays.asList(mock(TopicSink.class));
143 @Test(expected = PoolingFeatureException.class)
144 public void testFindTopicSink_EmptyList() throws PoolingFeatureException {
146 new DmaapManagerImpl(MY_TOPIC) {
148 protected List<TopicSink> getTopicSinks() {
149 return Collections.emptyList();
155 public void testStartPublisher() throws PoolingFeatureException {
157 mgr.startPublisher();
159 // restart should have no effect
160 mgr.startPublisher();
162 // should be able to publish now
164 verify(sink).send(MSG);
168 public void testStopPublisher() throws PoolingFeatureException {
169 // not publishing yet, so stopping should have no effect
170 mgr.stopPublisher(0);
173 mgr.startPublisher();
175 // this time, stop should do something
176 mgr.stopPublisher(0);
178 // re-stopping should have no effect
179 assertThatCode(() -> mgr.stopPublisher(0)).doesNotThrowAnyException();
183 public void testStopPublisher_WithDelay() throws PoolingFeatureException {
185 mgr.startPublisher();
187 long tbeg = System.currentTimeMillis();
189 mgr.stopPublisher(100L);
191 assertTrue(System.currentTimeMillis() >= tbeg + 100L);
195 public void testStopPublisher_WithDelayInterrupted() throws Exception {
197 mgr.startPublisher();
201 // tell the publisher to stop in minms + additional time
202 CountDownLatch latch = new CountDownLatch(1);
203 Thread thread = new Thread(() -> {
205 mgr.stopPublisher(minms + 3000L);
209 // wait for the thread to start
212 // interrupt it - it should immediately finish its work
215 // wait for it to stop, but only wait the minimum time
218 assertFalse(thread.isAlive());
222 public void testStartConsumer() {
224 verify(source, never()).register(any());
226 mgr.startConsumer(listener);
227 verify(source).register(listener);
229 // restart should have no effect
230 mgr.startConsumer(listener);
231 verify(source).register(listener);
235 public void testStopConsumer() {
236 // not consuming yet, so stopping should have no effect
237 mgr.stopConsumer(listener);
238 verify(source, never()).unregister(any());
241 mgr.startConsumer(listener);
243 // this time, stop should do something
244 mgr.stopConsumer(listener);
245 verify(source).unregister(listener);
247 // re-stopping should have no effect
248 mgr.stopConsumer(listener);
249 verify(source).unregister(listener);
253 public void testPublish() throws PoolingFeatureException {
254 // cannot publish before starting
255 assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,pre").isInstanceOf(PoolingFeatureException.class);
257 mgr.startPublisher();
259 // publish several messages
261 verify(sink).send(MSG);
263 mgr.publish(MSG + "a");
264 verify(sink).send(MSG + "a");
266 mgr.publish(MSG + "b");
267 verify(sink).send(MSG + "b");
269 // stop and verify we can no longer publish
270 mgr.stopPublisher(0);
271 assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,stopped").isInstanceOf(PoolingFeatureException.class);
274 @Test(expected = PoolingFeatureException.class)
275 public void testPublish_SendFailed() throws PoolingFeatureException {
276 mgr.startPublisher();
278 // arrange for send() to fail
279 when(sink.send(MSG)).thenReturn(false);
284 @Test(expected = PoolingFeatureException.class)
285 public void testPublish_SendEx() throws PoolingFeatureException {
286 mgr.startPublisher();
288 // arrange for send() to throw an exception
289 doThrow(new IllegalStateException(EXPECTED)).when(sink).send(MSG);
295 * Manager with overrides.
297 private class DmaapManagerImpl extends DmaapManager {
299 public DmaapManagerImpl(String topic) throws PoolingFeatureException {
304 protected List<TopicSource> getTopicSources() {
307 // three sources, with the desired one in the middle
308 return Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class));
312 protected List<TopicSink> getTopicSinks() {
315 // three sinks, with the desired one in the middle
316 return Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class));