2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.Mockito.doThrow;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.never;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
33 import java.io.IOException;
34 import java.net.MalformedURLException;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.mockito.invocation.InvocationOnMock;
41 import org.mockito.stubbing.Answer;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.event.comm.TopicListener;
44 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
45 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
46 import org.onap.policy.common.utils.gson.GsonTestUtils;
48 public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
49 private Thread thread;
50 private BusConsumer cons;
51 private TopicListener listener;
52 private SingleThreadedBusTopicSourceImpl source;
55 * Creates the object to be tested, as well as various mocks.
61 thread = mock(Thread.class);
62 cons = mock(BusConsumer.class);
63 listener = mock(TopicListener.class);
64 source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
68 public void tearDown() {
73 public void testSerialize() {
74 new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class);
78 public void testRegister() {
79 source.register(listener);
80 assertEquals(1, source.initCount);
81 source.offer(MY_MESSAGE);
82 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
84 // register another - should not re-init
85 TopicListener listener2 = mock(TopicListener.class);
86 source.register(listener2);
87 assertEquals(1, source.initCount);
88 source.offer(MY_MESSAGE + "z");
89 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
90 verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
92 // re-register - should not re-init
93 source.register(listener);
94 assertEquals(1, source.initCount);
95 source.offer(MY_MESSAGE2);
96 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
98 // lock & register - should not init
99 source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
101 source.register(listener);
102 assertEquals(0, source.initCount);
104 // exception during init
105 source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
106 source.initEx = true;
107 source.register(listener);
111 public void testUnregister() {
112 TopicListener listener2 = mock(TopicListener.class);
113 source.register(listener);
114 source.register(listener2);
116 // unregister first listener - should NOT invoke close
117 source.unregister(listener);
118 verify(cons, never()).close();
119 assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
121 // unregister same listener - should not invoke close
122 source.unregister(listener);
123 verify(cons, never()).close();
124 assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
126 // unregister second listener - SHOULD invoke close
127 source.unregister(listener2);
128 verify(cons).close();
129 assertTrue(source.snapshotTopicListeners().isEmpty());
131 // unregister same listener - should not invoke close again
132 source.unregister(listener2);
133 verify(cons).close();
134 assertTrue(source.snapshotTopicListeners().isEmpty());
138 public void testToString() {
139 assertTrue(source.toString().startsWith("SingleThreadedBusTopicSource ["));
143 public void testMakePollerThread() {
144 SingleThreadedBusTopicSource source2 = new SingleThreadedBusTopicSource(makeBuilder().build()) {
146 public CommInfrastructure getTopicCommInfrastructure() {
147 return CommInfrastructure.NOOP;
151 public void init() throws MalformedURLException {
156 assertNotNull(source2.makePollerThread());
160 public void testSingleThreadedBusTopicSource() {
161 // verify that different wrappers can be built
162 new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
163 new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
164 new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchTimeout(-1).build());
165 new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchLimit(-1).build());
169 public void testStart() {
171 assertTrue(source.isAlive());
172 assertEquals(1, source.initCount);
173 verify(thread).start();
175 // attempt to start again - nothing should be invoked again
177 assertTrue(source.isAlive());
178 assertEquals(1, source.initCount);
179 verify(thread).start();
184 assertTrue(source.isAlive());
185 assertEquals(2, source.initCount);
186 verify(thread, times(2)).start();
189 @Test(expected = IllegalStateException.class)
190 public void testStart_Locked() {
195 @Test(expected = IllegalStateException.class)
196 public void testStart_InitEx() {
197 source.initEx = true;
202 public void testStop() {
205 verify(cons).close();
207 // stop it again - not re-closed
209 verify(cons).close();
211 // start & stop again, but with an exception
212 doThrow(new RuntimeException(EXPECTED)).when(cons).close();
218 public void testRun() throws Exception {
219 source.register(listener);
222 * Die in the middle of fetching messages. Also, throw an exception during the
223 * first fetch attempt.
225 when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
229 public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
231 source.alive = false;
232 return Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
235 throw new IOException(EXPECTED);
241 assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
242 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
243 verify(listener, never()).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
246 * Die AFTER fetching messages.
248 final String msga = "message-A";
249 final String msgb = "message-B";
250 when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
254 public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
256 source.alive = false;
257 return Collections.emptyList();
260 return Arrays.asList(msga, msgb);
266 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msga);
267 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msgb);
269 assertEquals(Arrays.asList(MY_MESSAGE, msga, msgb), Arrays.asList(source.getRecentEvents()));
273 public void testOffer() {
274 source.register(listener);
275 source.offer(MY_MESSAGE);
276 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
277 assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
280 @Test(expected = IllegalStateException.class)
281 public void testOffer_NotStarted() {
282 source.offer(MY_MESSAGE);
286 public void testSetFilter() {
287 FilterableBusConsumer filt = mock(FilterableBusConsumer.class);
291 source.setFilter("my-filter");
292 verify(filt).setFilter("my-filter");
295 @Test(expected = UnsupportedOperationException.class)
296 public void testSetFilter_Unsupported() {
298 source.setFilter("unsupported-filter");
302 public void testGetConsumerGroup() {
303 assertEquals(MY_CONS_GROUP, source.getConsumerGroup());
307 public void testGetConsumerInstance() {
308 assertEquals(MY_CONS_INST, source.getConsumerInstance());
312 public void testShutdown() {
313 source.register(listener);
316 verify(cons).close();
317 assertTrue(source.snapshotTopicListeners().isEmpty());
321 public void testGetFetchTimeout() {
322 assertEquals(MY_FETCH_TIMEOUT, source.getFetchTimeout());
326 public void testGetFetchLimit() {
327 assertEquals(MY_FETCH_LIMIT, source.getFetchLimit());
331 * Implementation of SingleThreadedBusTopicSource that counts the number of times
334 private class SingleThreadedBusTopicSourceImpl extends SingleThreadedBusTopicSource {
336 private int initCount = 0;
337 private boolean initEx = false;
339 public SingleThreadedBusTopicSourceImpl(BusTopicParams busTopicParams) {
340 super(busTopicParams);
344 public CommInfrastructure getTopicCommInfrastructure() {
345 return CommInfrastructure.NOOP;
349 public void init() throws MalformedURLException {
353 throw new MalformedURLException(EXPECTED);
360 protected Thread makePollerThread() {