7df5d129023fa32272d92a2fcd53d70a18b55f35
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.fail;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
31
32 import com.att.nsa.cambria.client.CambriaConsumer;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.Properties;
38 import java.util.concurrent.CountDownLatch;
39 import org.apache.commons.collections4.IteratorUtils;
40 import org.apache.kafka.clients.consumer.ConsumerConfig;
41 import org.apache.kafka.clients.consumer.KafkaConsumer;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
45 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
46 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
47 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
49 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
50 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
51 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
52 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
53 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
54 import org.powermock.reflect.Whitebox;
55
56 public class BusConsumerTest extends TopicTestBase {
57
58     private static final int SHORT_TIMEOUT_MILLIS = 10;
59     private static final int LONG_TIMEOUT_MILLIS = 3000;
60
61     @Before
62     @Override
63     public void setUp() {
64         super.setUp();
65     }
66
67     @Test
68     public void testFetchingBusConsumer() throws InterruptedException {
69         // should not be negative
70         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
71         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
72
73         // should not be zero
74         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
75         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
76
77         // should not be too large
78         cons = new FetchingBusConsumerImpl(
79                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
80         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
81
82         // should not be what was specified
83         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
84         assertThat(cons.getSleepTime()).isEqualTo(100);
85     }
86
87     @Test
88     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
89
90         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
91
92             private CountDownLatch started = new CountDownLatch(1);
93
94             @Override
95             protected void sleepAfterFetchFailure() {
96                 started.countDown();
97                 super.sleepAfterFetchFailure();
98             }
99         };
100
101         // full sleep
102         long tstart = System.currentTimeMillis();
103         cons.sleepAfterFetchFailure();
104         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
105
106         // close while sleeping - sleep should halt prematurely
107         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
108         cons.started = new CountDownLatch(1);
109         Thread thread = new Thread(cons::sleepAfterFetchFailure);
110         tstart = System.currentTimeMillis();
111         thread.start();
112         cons.started.await();
113         cons.close();
114         thread.join();
115         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
116
117         // interrupt while sleeping - sleep should halt prematurely
118         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
119         cons.started = new CountDownLatch(1);
120         thread = new Thread(cons::sleepAfterFetchFailure);
121         tstart = System.currentTimeMillis();
122         thread.start();
123         cons.started.await();
124         thread.interrupt();
125         thread.join();
126         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
127     }
128
129     @Test
130     public void testCambriaConsumerWrapper() {
131         // verify that different wrappers can be built
132         new CambriaConsumerWrapper(makeBuilder().build());
133         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
134         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
135         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
136         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
137         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
138         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
139         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
140         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
141         new CambriaConsumerWrapper(makeBuilder().password(null).build());
142
143         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
144                         .doesNotThrowAnyException();
145     }
146
147     @Test
148     public void testCambriaConsumerWrapperFetch() throws Exception {
149         CambriaConsumer inner = mock(CambriaConsumer.class);
150         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
151         when(inner.fetch()).thenReturn(lst);
152
153         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
154         Whitebox.setInternalState(cons, "consumer", inner);
155
156         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
157
158         // arrange to throw exception next time fetch is called
159         IOException ex = new IOException(EXPECTED);
160         when(inner.fetch()).thenThrow(ex);
161
162         cons.fetchTimeout = 10;
163
164         try {
165             cons.fetch();
166             fail("missing exception");
167
168         } catch (IOException e) {
169             assertEquals(ex, e);
170         }
171     }
172
173     @Test
174     public void testCambriaConsumerWrapperClose() {
175         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
176         assertThatCode(() -> cons.close()).doesNotThrowAnyException();
177     }
178
179     @Test
180     public void testCambriaConsumerWrapperToString() {
181         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
182     }
183
184     @Test
185     public void testDmaapConsumerWrapper() throws Exception {
186         // verify that different wrappers can be built
187         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
188     }
189
190     @Test(expected = IllegalArgumentException.class)
191     public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
192         new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
193     }
194
195     @Test
196     public void testDmaapConsumerWrapperFetch() throws Exception {
197         DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
198         MRConsumerImpl cons = mock(MRConsumerImpl.class);
199
200         dmaap.fetchTimeout = 5;
201         dmaap.consumer = cons;
202
203         // null return
204         when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
205         assertFalse(dmaap.fetch().iterator().hasNext());
206
207         // with messages, 200
208         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
209         MRConsumerResponse resp = new MRConsumerResponse();
210         resp.setResponseCode("200");
211         resp.setActualMessages(lst);
212         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
213
214         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
215
216         // null messages
217         resp.setActualMessages(null);
218         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
219
220         assertFalse(dmaap.fetch().iterator().hasNext());
221
222         // with messages, NOT 200
223         resp.setResponseCode("400");
224         resp.setActualMessages(lst);
225         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
226
227         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
228     }
229
230     @Test
231     public void testDmaapConsumerWrapperClose() throws Exception {
232         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
233     }
234
235     @Test
236     public void testDmaapConsumerWrapperToString() throws Exception {
237         assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
238     }
239
240     @Test
241     public void testDmaapAafConsumerWrapper() throws Exception {
242         // verify that different wrappers can be built
243         new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
244         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
245                         .doesNotThrowAnyException();
246     }
247
248     @Test(expected = IllegalArgumentException.class)
249     public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
250         /*
251          * Unfortunately, the MR code intercepts this and throws an exception before the
252          * wrapper gets a chance to check it, thus this test does not improve the coverage
253          * for the constructor.
254          */
255         new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
256     }
257
258     @Test
259     public void testDmaapAafConsumerWrapperToString() throws Exception {
260         assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
261     }
262
263     @Test
264     public void testDmaapDmeConsumerWrapper() throws Exception {
265         // verify that different wrappers can be built
266         new DmaapDmeConsumerWrapper(makeBuilder().build());
267         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
268         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
269         new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
270
271         addProps.put(ROUTE_PROP, MY_ROUTE);
272         new DmaapDmeConsumerWrapper(makeBuilder().build());
273         assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
274                         .doesNotThrowAnyException();
275     }
276
277     @Test(expected = IllegalArgumentException.class)
278     public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
279         new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
280     }
281
282     @Test(expected = IllegalArgumentException.class)
283     public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
284         new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
285     }
286
287     @Test(expected = IllegalArgumentException.class)
288     public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
289         new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
290     }
291
292     @Test(expected = IllegalArgumentException.class)
293     public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
294         new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
295     }
296
297     @Test(expected = IllegalArgumentException.class)
298     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
299         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
300     }
301
302     @Test
303     public void testKafkaConsumerWrapper() throws Exception {
304         // verify that different wrappers can be built
305         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
306     }
307
308     @Test(expected = IllegalArgumentException.class)
309     public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
310         new KafkaConsumerWrapper(makeBuilder().topic(null).build());
311     }
312
313     @Test(expected = java.lang.IllegalStateException.class)
314     public void testKafkaConsumerWrapperFetch() throws Exception {
315
316         //Setup Properties for consumer
317         Properties kafkaProps = new Properties();
318         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
319         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
320         kafkaProps.setProperty("enable.auto.commit", "true");
321         kafkaProps.setProperty("auto.commit.interval.ms", "1000");
322         kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
323             "org.apache.kafka.common.serialization.StringDeserializer");
324         kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
325             "org.apache.kafka.common.serialization.StringDeserializer");
326         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
327         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
328
329         KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
330         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
331         kafka.consumer = consumer;
332
333         assertFalse(kafka.fetch().iterator().hasNext());
334         consumer.close();
335     }
336
337     @Test
338     public void testKafkaConsumerWrapperClose() throws Exception {
339         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
340     }
341
342     @Test
343     public void testKafkaConsumerWrapperToString() throws Exception {
344         assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
345     }
346
347     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
348
349         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
350             super(busTopicParams);
351         }
352
353         @Override
354         public Iterable<String> fetch() throws IOException {
355             return null;
356         }
357     }
358 }