2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.fail;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.when;
33 import com.att.nsa.cambria.client.CambriaConsumer;
34 import java.io.IOException;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.concurrent.CountDownLatch;
40 import org.apache.commons.collections4.IteratorUtils;
41 import org.apache.kafka.clients.consumer.ConsumerConfig;
42 import org.apache.kafka.clients.consumer.KafkaConsumer;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
46 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
47 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
49 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
50 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
51 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
52 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
53 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
54 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
55 import org.springframework.test.util.ReflectionTestUtils;
57 public class BusConsumerTest extends TopicTestBase {
59 private static final int SHORT_TIMEOUT_MILLIS = 10;
60 private static final int LONG_TIMEOUT_MILLIS = 3000;
69 public void testFetchingBusConsumer() throws InterruptedException {
70 // should not be negative
71 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
72 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
75 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
76 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
78 // should not be too large
79 cons = new FetchingBusConsumerImpl(
80 makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
81 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
83 // should not be what was specified
84 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
85 assertThat(cons.getSleepTime()).isEqualTo(100);
89 public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
91 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
93 private CountDownLatch started = new CountDownLatch(1);
96 protected void sleepAfterFetchFailure() {
98 super.sleepAfterFetchFailure();
103 long tstart = System.currentTimeMillis();
104 cons.sleepAfterFetchFailure();
105 assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
107 // close while sleeping - sleep should halt prematurely
108 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
109 cons.started = new CountDownLatch(1);
110 Thread thread = new Thread(cons::sleepAfterFetchFailure);
111 tstart = System.currentTimeMillis();
113 cons.started.await();
116 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
118 // interrupt while sleeping - sleep should halt prematurely
119 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
120 cons.started = new CountDownLatch(1);
121 thread = new Thread(cons::sleepAfterFetchFailure);
122 tstart = System.currentTimeMillis();
124 cons.started.await();
127 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
131 public void testCambriaConsumerWrapper() {
132 // verify that different wrappers can be built
133 new CambriaConsumerWrapper(makeBuilder().build());
134 new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
135 new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
136 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
137 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
138 new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
139 new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
140 new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
141 new CambriaConsumerWrapper(makeBuilder().userName(null).build());
142 new CambriaConsumerWrapper(makeBuilder().password(null).build());
144 assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
145 .doesNotThrowAnyException();
149 public void testCambriaConsumerWrapperFetch() throws Exception {
150 CambriaConsumer inner = mock(CambriaConsumer.class);
151 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
152 when(inner.fetch()).thenReturn(lst);
154 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
155 ReflectionTestUtils.setField(cons, "consumer", inner);
157 assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
159 // arrange to throw exception next time fetch is called
160 IOException ex = new IOException(EXPECTED);
161 when(inner.fetch()).thenThrow(ex);
163 cons.fetchTimeout = 10;
167 fail("missing exception");
169 } catch (IOException e) {
175 public void testCambriaConsumerWrapperClose() {
176 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
177 assertThatCode(() -> cons.close()).doesNotThrowAnyException();
181 public void testCambriaConsumerWrapperToString() {
182 assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
186 public void testDmaapConsumerWrapper() throws Exception {
187 // verify that different wrappers can be built
188 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
191 @Test(expected = IllegalArgumentException.class)
192 public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
193 new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
197 public void testDmaapConsumerWrapperFetch() throws Exception {
198 DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
199 MRConsumerImpl cons = mock(MRConsumerImpl.class);
201 dmaap.fetchTimeout = 5;
202 dmaap.consumer = cons;
205 when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
206 assertFalse(dmaap.fetch().iterator().hasNext());
208 // with messages, 200
209 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
210 MRConsumerResponse resp = new MRConsumerResponse();
211 resp.setResponseCode("200");
212 resp.setActualMessages(lst);
213 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
215 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
218 resp.setActualMessages(null);
219 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
221 assertFalse(dmaap.fetch().iterator().hasNext());
223 // with messages, NOT 200
224 resp.setResponseCode("400");
225 resp.setActualMessages(lst);
226 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
228 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
232 public void testDmaapConsumerWrapperClose() throws Exception {
233 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
237 public void testDmaapConsumerWrapperToString() throws Exception {
238 assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
242 public void testDmaapAafConsumerWrapper() throws Exception {
243 // verify that different wrappers can be built
244 new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
245 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
246 .doesNotThrowAnyException();
249 @Test(expected = IllegalArgumentException.class)
250 public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
252 * Unfortunately, the MR code intercepts this and throws an exception before the
253 * wrapper gets a chance to check it, thus this test does not improve the coverage
254 * for the constructor.
256 new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
260 public void testDmaapAafConsumerWrapperToString() throws Exception {
261 assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
265 public void testDmaapDmeConsumerWrapper() throws Exception {
266 // verify that different wrappers can be built
267 new DmaapDmeConsumerWrapper(makeBuilder().build());
268 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
269 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
270 new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
272 addProps.put(ROUTE_PROP, MY_ROUTE);
273 new DmaapDmeConsumerWrapper(makeBuilder().build());
274 assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
275 .doesNotThrowAnyException();
278 @Test(expected = IllegalArgumentException.class)
279 public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
280 new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
283 @Test(expected = IllegalArgumentException.class)
284 public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
285 new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
288 @Test(expected = IllegalArgumentException.class)
289 public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
290 new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
293 @Test(expected = IllegalArgumentException.class)
294 public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
295 new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
298 @Test(expected = IllegalArgumentException.class)
299 public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
300 new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
304 public void testKafkaConsumerWrapper() throws Exception {
305 // verify that different wrappers can be built
306 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
309 @Test(expected = IllegalArgumentException.class)
310 public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
311 new KafkaConsumerWrapper(makeBuilder().topic(null).build());
314 @Test(expected = java.lang.IllegalStateException.class)
315 public void testKafkaConsumerWrapperFetch() throws Exception {
317 //Setup Properties for consumer
318 Properties kafkaProps = new Properties();
319 kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
320 kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
321 kafkaProps.setProperty("enable.auto.commit", "true");
322 kafkaProps.setProperty("auto.commit.interval.ms", "1000");
323 kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
324 "org.apache.kafka.common.serialization.StringDeserializer");
325 kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
326 "org.apache.kafka.common.serialization.StringDeserializer");
327 kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
328 kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
330 KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
331 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
332 kafka.consumer = consumer;
334 assertFalse(kafka.fetch().iterator().hasNext());
339 public void testKafkaConsumerWrapperClose() throws Exception {
340 assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
344 public void testKafkaConsumerWrapperToString() throws Exception {
345 assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
348 private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
350 protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
351 super(busTopicParams);
355 public Iterable<String> fetch() throws IOException {