2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.never;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import java.io.IOException;
35 import java.net.MalformedURLException;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.invocation.InvocationOnMock;
42 import org.mockito.stubbing.Answer;
43 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
44 import org.onap.policy.common.endpoints.event.comm.TopicListener;
45 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
46 import org.onap.policy.common.utils.gson.GsonTestUtils;
48 public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
49 private Thread thread;
50 private BusConsumer cons;
51 private TopicListener listener;
52 private SingleThreadedBusTopicSourceImpl source;
55 * Creates the object to be tested, as well as various mocks.
62 thread = mock(Thread.class);
63 cons = mock(BusConsumer.class);
64 listener = mock(TopicListener.class);
65 source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
69 public void tearDown() {
74 public void testSerialize() {
75 assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class))
76 .doesNotThrowAnyException();
80 public void testRegister() {
81 source.register(listener);
82 assertEquals(1, source.initCount);
83 source.offer(MY_MESSAGE);
84 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
86 // register another - should not re-init
87 TopicListener listener2 = mock(TopicListener.class);
88 source.register(listener2);
89 assertEquals(1, source.initCount);
90 source.offer(MY_MESSAGE + "z");
91 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
92 verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
94 // re-register - should not re-init
95 source.register(listener);
96 assertEquals(1, source.initCount);
97 source.offer(MY_MESSAGE2);
98 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
100 // lock & register - should not init
101 source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
103 source.register(listener);
104 assertEquals(0, source.initCount);
106 // exception during init
107 source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
108 source.initEx = true;
109 source.register(listener);
113 public void testUnregister() {
114 TopicListener listener2 = mock(TopicListener.class);
115 source.register(listener);
116 source.register(listener2);
118 // unregister first listener - should NOT invoke close
119 source.unregister(listener);
120 verify(cons, never()).close();
121 assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
123 // unregister same listener - should not invoke close
124 source.unregister(listener);
125 verify(cons, never()).close();
126 assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
128 // unregister second listener - SHOULD invoke close
129 source.unregister(listener2);
130 verify(cons).close();
131 assertTrue(source.snapshotTopicListeners().isEmpty());
133 // unregister same listener - should not invoke close again
134 source.unregister(listener2);
135 verify(cons).close();
136 assertTrue(source.snapshotTopicListeners().isEmpty());
140 public void testToString() {
141 assertTrue(source.toString().startsWith("SingleThreadedBusTopicSource ["));
145 public void testMakePollerThread() {
146 SingleThreadedBusTopicSource source2 = new SingleThreadedBusTopicSource(makeBuilder().build()) {
148 public CommInfrastructure getTopicCommInfrastructure() {
149 return CommInfrastructure.NOOP;
153 public void init() throws MalformedURLException {
158 assertNotNull(source2.makePollerThread());
162 public void testSingleThreadedBusTopicSource() {
163 // verify that different wrappers can be built
164 new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
165 new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
166 new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchTimeout(-1).build());
167 assertThatCode(() -> new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchLimit(-1).build()))
168 .doesNotThrowAnyException();
172 public void testStart() {
174 assertTrue(source.isAlive());
175 assertEquals(1, source.initCount);
176 verify(thread).start();
178 // attempt to start again - nothing should be invoked again
180 assertTrue(source.isAlive());
181 assertEquals(1, source.initCount);
182 verify(thread).start();
187 assertTrue(source.isAlive());
188 assertEquals(2, source.initCount);
189 verify(thread, times(2)).start();
192 @Test(expected = IllegalStateException.class)
193 public void testStart_Locked() {
198 @Test(expected = IllegalStateException.class)
199 public void testStart_InitEx() {
200 source.initEx = true;
205 public void testStop() {
208 verify(cons).close();
210 // stop it again - not re-closed
212 verify(cons).close();
214 // start & stop again, but with an exception
215 doThrow(new RuntimeException(EXPECTED)).when(cons).close();
221 public void testRun() throws Exception {
222 source.register(listener);
225 * Die in the middle of fetching messages. Also, throw an exception during the
226 * first fetch attempt.
228 when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
232 public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
234 source.alive = false;
235 return Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
238 throw new IOException(EXPECTED);
244 assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
245 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
246 verify(listener, never()).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
249 * Die AFTER fetching messages.
251 final String msga = "message-A";
252 final String msgb = "message-B";
253 when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
257 public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
259 source.alive = false;
260 return Collections.emptyList();
263 return Arrays.asList(msga, msgb);
269 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msga);
270 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msgb);
272 assertEquals(Arrays.asList(MY_MESSAGE, msga, msgb), Arrays.asList(source.getRecentEvents()));
276 public void testOffer() {
277 source.register(listener);
278 source.offer(MY_MESSAGE);
279 verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
280 assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
283 @Test(expected = IllegalStateException.class)
284 public void testOffer_NotStarted() {
285 source.offer(MY_MESSAGE);
289 public void testGetConsumerGroup() {
290 assertEquals(MY_CONS_GROUP, source.getConsumerGroup());
294 public void testGetConsumerInstance() {
295 assertEquals(MY_CONS_INST, source.getConsumerInstance());
299 public void testShutdown() {
300 source.register(listener);
303 verify(cons).close();
304 assertTrue(source.snapshotTopicListeners().isEmpty());
308 public void testGetFetchTimeout() {
309 assertEquals(MY_FETCH_TIMEOUT, source.getFetchTimeout());
313 public void testGetFetchLimit() {
314 assertEquals(MY_FETCH_LIMIT, source.getFetchLimit());
318 * Implementation of SingleThreadedBusTopicSource that counts the number of times
321 private class SingleThreadedBusTopicSourceImpl extends SingleThreadedBusTopicSource {
323 private int initCount = 0;
324 private boolean initEx = false;
326 public SingleThreadedBusTopicSourceImpl(BusTopicParams busTopicParams) {
327 super(busTopicParams);
331 public CommInfrastructure getTopicCommInfrastructure() {
332 return CommInfrastructure.NOOP;
336 public void init() throws MalformedURLException {
340 throw new MalformedURLException(EXPECTED);
347 protected Thread makePollerThread() {