Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / test / java / org / onap / policy / drools / pooling / DmaapManagerTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.doThrow;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.never;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.concurrent.CountDownLatch;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.onap.policy.common.endpoints.event.comm.TopicListener;
42 import org.onap.policy.common.endpoints.event.comm.TopicSink;
43 import org.onap.policy.common.endpoints.event.comm.TopicSource;
44
45 public class DmaapManagerTest {
46
47     private static final String EXPECTED = "expected";
48     private static final String MY_TOPIC = "my.topic";
49     private static final String MSG = "a message";
50
51     private TopicListener listener;
52     private TopicSource source;
53     private boolean gotSources;
54     private TopicSink sink;
55     private boolean gotSinks;
56     private DmaapManager mgr;
57
58     /**
59      * Setup.
60      *
61      * @throws Exception throws an exception
62      */
63     @Before
64     public void setUp() throws Exception {
65         listener = mock(TopicListener.class);
66         source = mock(TopicSource.class);
67         gotSources = false;
68         sink = mock(TopicSink.class);
69         gotSinks = false;
70
71         when(source.getTopic()).thenReturn(MY_TOPIC);
72
73         when(sink.getTopic()).thenReturn(MY_TOPIC);
74         when(sink.send(any())).thenReturn(true);
75
76         mgr = new DmaapManagerImpl(MY_TOPIC);
77     }
78
79     @Test
80     public void testDmaapManager() {
81         // verify that the init methods were called
82         assertTrue(gotSources);
83         assertTrue(gotSinks);
84     }
85
86     @Test(expected = PoolingFeatureException.class)
87     public void testDmaapManager_PoolingEx() throws PoolingFeatureException {
88         // force error by having no topics match
89         when(source.getTopic()).thenReturn("");
90
91         new DmaapManagerImpl(MY_TOPIC);
92     }
93
94     @Test(expected = PoolingFeatureException.class)
95     public void testDmaapManager_IllegalArgEx() throws PoolingFeatureException {
96         // force error
97         new DmaapManagerImpl(MY_TOPIC) {
98             @Override
99             protected List<TopicSource> getTopicSources() {
100                 throw new IllegalArgumentException(EXPECTED);
101             }
102         };
103     }
104
105     @Test
106     public void testGetTopic() {
107         assertEquals(MY_TOPIC, mgr.getTopic());
108     }
109
110     @Test(expected = PoolingFeatureException.class)
111     public void testFindTopicSource_NotFound() throws PoolingFeatureException {
112         // one item in list, and its topic doesn't match
113         new DmaapManagerImpl(MY_TOPIC) {
114             @Override
115             protected List<TopicSource> getTopicSources() {
116                 return Arrays.asList(mock(TopicSource.class));
117             }
118         };
119     }
120
121     @Test(expected = PoolingFeatureException.class)
122     public void testFindTopicSource_EmptyList() throws PoolingFeatureException {
123         // empty list
124         new DmaapManagerImpl(MY_TOPIC) {
125             @Override
126             protected List<TopicSource> getTopicSources() {
127                 return Collections.emptyList();
128             }
129         };
130     }
131
132     @Test(expected = PoolingFeatureException.class)
133     public void testFindTopicSink_NotFound() throws PoolingFeatureException {
134         // one item in list, and its topic doesn't match
135         new DmaapManagerImpl(MY_TOPIC) {
136             @Override
137             protected List<TopicSink> getTopicSinks() {
138                 return Arrays.asList(mock(TopicSink.class));
139             }
140         };
141     }
142
143     @Test(expected = PoolingFeatureException.class)
144     public void testFindTopicSink_EmptyList() throws PoolingFeatureException {
145         // empty list
146         new DmaapManagerImpl(MY_TOPIC) {
147             @Override
148             protected List<TopicSink> getTopicSinks() {
149                 return Collections.emptyList();
150             }
151         };
152     }
153
154     @Test
155     public void testStartPublisher() throws PoolingFeatureException {
156
157         mgr.startPublisher();
158
159         // restart should have no effect
160         mgr.startPublisher();
161
162         // should be able to publish now
163         mgr.publish(MSG);
164         verify(sink).send(MSG);
165     }
166
167     @Test
168     public void testStopPublisher() throws PoolingFeatureException {
169         // not publishing yet, so stopping should have no effect
170         mgr.stopPublisher(0);
171
172         // now start it
173         mgr.startPublisher();
174
175         // this time, stop should do something
176         mgr.stopPublisher(0);
177
178         // re-stopping should have no effect
179         assertThatCode(() -> mgr.stopPublisher(0)).doesNotThrowAnyException();
180     }
181
182     @Test
183     public void testStopPublisher_WithDelay() throws PoolingFeatureException {
184
185         mgr.startPublisher();
186
187         long tbeg = System.currentTimeMillis();
188
189         mgr.stopPublisher(100L);
190
191         assertTrue(System.currentTimeMillis() >= tbeg + 100L);
192     }
193
194     @Test
195     public void testStopPublisher_WithDelayInterrupted() throws Exception {
196
197         mgr.startPublisher();
198
199         long minms = 2000L;
200
201         // tell the publisher to stop in minms + additional time
202         CountDownLatch latch = new CountDownLatch(1);
203         Thread thread = new Thread(() -> {
204             latch.countDown();
205             mgr.stopPublisher(minms + 3000L);
206         });
207         thread.start();
208
209         // wait for the thread to start
210         latch.await();
211
212         // interrupt it - it should immediately finish its work
213         thread.interrupt();
214
215         // wait for it to stop, but only wait the minimum time
216         thread.join(minms);
217
218         assertFalse(thread.isAlive());
219     }
220
221     @Test
222     public void testStartConsumer() {
223         // not started yet
224         verify(source, never()).register(any());
225
226         mgr.startConsumer(listener);
227         verify(source).register(listener);
228
229         // restart should have no effect
230         mgr.startConsumer(listener);
231         verify(source).register(listener);
232     }
233
234     @Test
235     public void testStopConsumer() {
236         // not consuming yet, so stopping should have no effect
237         mgr.stopConsumer(listener);
238         verify(source, never()).unregister(any());
239
240         // now start it
241         mgr.startConsumer(listener);
242
243         // this time, stop should do something
244         mgr.stopConsumer(listener);
245         verify(source).unregister(listener);
246
247         // re-stopping should have no effect
248         mgr.stopConsumer(listener);
249         verify(source).unregister(listener);
250     }
251
252     @Test
253     public void testPublish() throws PoolingFeatureException {
254         // cannot publish before starting
255         assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,pre").isInstanceOf(PoolingFeatureException.class);
256
257         mgr.startPublisher();
258
259         // publish several messages
260         mgr.publish(MSG);
261         verify(sink).send(MSG);
262
263         mgr.publish(MSG + "a");
264         verify(sink).send(MSG + "a");
265
266         mgr.publish(MSG + "b");
267         verify(sink).send(MSG + "b");
268
269         // stop and verify we can no longer publish
270         mgr.stopPublisher(0);
271         assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,stopped").isInstanceOf(PoolingFeatureException.class);
272     }
273
274     @Test(expected = PoolingFeatureException.class)
275     public void testPublish_SendFailed() throws PoolingFeatureException {
276         mgr.startPublisher();
277
278         // arrange for send() to fail
279         when(sink.send(MSG)).thenReturn(false);
280
281         mgr.publish(MSG);
282     }
283
284     @Test(expected = PoolingFeatureException.class)
285     public void testPublish_SendEx() throws PoolingFeatureException {
286         mgr.startPublisher();
287
288         // arrange for send() to throw an exception
289         doThrow(new IllegalStateException(EXPECTED)).when(sink).send(MSG);
290
291         mgr.publish(MSG);
292     }
293
294     /**
295      * Manager with overrides.
296      */
297     private class DmaapManagerImpl extends DmaapManager {
298
299         public DmaapManagerImpl(String topic) throws PoolingFeatureException {
300             super(topic);
301         }
302
303         @Override
304         protected List<TopicSource> getTopicSources() {
305             gotSources = true;
306
307             // three sources, with the desired one in the middle
308             return Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class));
309         }
310
311         @Override
312         protected List<TopicSink> getTopicSinks() {
313             gotSinks = true;
314
315             // three sinks, with the desired one in the middle
316             return Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class));
317         }
318     }
319 }