2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.fail;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
32 import com.att.nsa.cambria.client.CambriaConsumer;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.Properties;
38 import java.util.concurrent.CountDownLatch;
39 import org.apache.commons.collections4.IteratorUtils;
40 import org.apache.kafka.clients.consumer.ConsumerConfig;
41 import org.apache.kafka.clients.consumer.KafkaConsumer;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
45 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
46 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
47 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
49 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
50 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
51 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
52 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
53 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
54 import org.powermock.reflect.Whitebox;
56 public class BusConsumerTest extends TopicTestBase {
58 private static final int SHORT_TIMEOUT_MILLIS = 10;
59 private static final int LONG_TIMEOUT_MILLIS = 3000;
68 public void testFetchingBusConsumer() throws InterruptedException {
69 // should not be negative
70 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
71 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
74 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
75 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
77 // should not be too large
78 cons = new FetchingBusConsumerImpl(
79 makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
80 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
82 // should not be what was specified
83 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
84 assertThat(cons.getSleepTime()).isEqualTo(100);
88 public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
90 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
92 private CountDownLatch started = new CountDownLatch(1);
95 protected void sleepAfterFetchFailure() {
97 super.sleepAfterFetchFailure();
102 long tstart = System.currentTimeMillis();
103 cons.sleepAfterFetchFailure();
104 assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
106 // close while sleeping - sleep should halt prematurely
107 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
108 cons.started = new CountDownLatch(1);
109 Thread thread = new Thread(cons::sleepAfterFetchFailure);
110 tstart = System.currentTimeMillis();
112 cons.started.await();
115 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
117 // interrupt while sleeping - sleep should halt prematurely
118 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
119 cons.started = new CountDownLatch(1);
120 thread = new Thread(cons::sleepAfterFetchFailure);
121 tstart = System.currentTimeMillis();
123 cons.started.await();
126 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
130 public void testCambriaConsumerWrapper() {
131 // verify that different wrappers can be built
132 new CambriaConsumerWrapper(makeBuilder().build());
133 new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
134 new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
135 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
136 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
137 new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
138 new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
139 new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
140 new CambriaConsumerWrapper(makeBuilder().userName(null).build());
141 new CambriaConsumerWrapper(makeBuilder().password(null).build());
143 assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
144 .doesNotThrowAnyException();
148 public void testCambriaConsumerWrapperFetch() throws Exception {
149 CambriaConsumer inner = mock(CambriaConsumer.class);
150 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
151 when(inner.fetch()).thenReturn(lst);
153 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
154 Whitebox.setInternalState(cons, "consumer", inner);
156 assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
158 // arrange to throw exception next time fetch is called
159 IOException ex = new IOException(EXPECTED);
160 when(inner.fetch()).thenThrow(ex);
162 cons.fetchTimeout = 10;
166 fail("missing exception");
168 } catch (IOException e) {
174 public void testCambriaConsumerWrapperClose() {
175 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
176 assertThatCode(() -> cons.close()).doesNotThrowAnyException();
180 public void testCambriaConsumerWrapperToString() {
181 assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
185 public void testDmaapConsumerWrapper() throws Exception {
186 // verify that different wrappers can be built
187 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
190 @Test(expected = IllegalArgumentException.class)
191 public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
192 new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
196 public void testDmaapConsumerWrapperFetch() throws Exception {
197 DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
198 MRConsumerImpl cons = mock(MRConsumerImpl.class);
200 dmaap.fetchTimeout = 5;
201 dmaap.consumer = cons;
204 when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
205 assertFalse(dmaap.fetch().iterator().hasNext());
207 // with messages, 200
208 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
209 MRConsumerResponse resp = new MRConsumerResponse();
210 resp.setResponseCode("200");
211 resp.setActualMessages(lst);
212 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
214 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
217 resp.setActualMessages(null);
218 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
220 assertFalse(dmaap.fetch().iterator().hasNext());
222 // with messages, NOT 200
223 resp.setResponseCode("400");
224 resp.setActualMessages(lst);
225 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
227 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
231 public void testDmaapConsumerWrapperClose() throws Exception {
232 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
236 public void testDmaapConsumerWrapperToString() throws Exception {
237 assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
241 public void testDmaapAafConsumerWrapper() throws Exception {
242 // verify that different wrappers can be built
243 new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
244 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
245 .doesNotThrowAnyException();
248 @Test(expected = IllegalArgumentException.class)
249 public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
251 * Unfortunately, the MR code intercepts this and throws an exception before the
252 * wrapper gets a chance to check it, thus this test does not improve the coverage
253 * for the constructor.
255 new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
259 public void testDmaapAafConsumerWrapperToString() throws Exception {
260 assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
264 public void testDmaapDmeConsumerWrapper() throws Exception {
265 // verify that different wrappers can be built
266 new DmaapDmeConsumerWrapper(makeBuilder().build());
267 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
268 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
269 new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
271 addProps.put(ROUTE_PROP, MY_ROUTE);
272 new DmaapDmeConsumerWrapper(makeBuilder().build());
273 assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
274 .doesNotThrowAnyException();
277 @Test(expected = IllegalArgumentException.class)
278 public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
279 new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
282 @Test(expected = IllegalArgumentException.class)
283 public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
284 new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
287 @Test(expected = IllegalArgumentException.class)
288 public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
289 new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
292 @Test(expected = IllegalArgumentException.class)
293 public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
294 new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
297 @Test(expected = IllegalArgumentException.class)
298 public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
299 new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
303 public void testKafkaConsumerWrapper() throws Exception {
304 // verify that different wrappers can be built
305 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
308 @Test(expected = IllegalArgumentException.class)
309 public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
310 new KafkaConsumerWrapper(makeBuilder().topic(null).build());
313 @Test(expected = java.lang.IllegalStateException.class)
314 public void testKafkaConsumerWrapperFetch() throws Exception {
316 //Setup Properties for consumer
317 Properties kafkaProps = new Properties();
318 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
319 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
320 kafkaProps.setProperty("enable.auto.commit", "true");
321 kafkaProps.setProperty("auto.commit.interval.ms", "1000");
322 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
323 "org.apache.kafka.common.serialization.StringDeserializer");
324 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
325 "org.apache.kafka.common.serialization.StringDeserializer");
326 kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
327 kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
329 KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
330 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
331 kafka.consumer = consumer;
333 assertFalse(kafka.fetch().iterator().hasNext());
338 public void testKafkaConsumerWrapperClose() throws Exception {
339 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
343 public void testKafkaConsumerWrapperToString() throws Exception {
344 assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
347 private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
349 protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
350 super(busTopicParams);
354 public Iterable<String> fetch() throws IOException {