2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.fail;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
32 import com.att.nsa.cambria.client.CambriaConsumer;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.CountDownLatch;
38 import org.apache.commons.collections4.IteratorUtils;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
42 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
43 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
44 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
45 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
46 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
47 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
49 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
50 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
51 import org.powermock.reflect.Whitebox;
53 public class BusConsumerTest extends TopicTestBase {
55 private static final int SHORT_TIMEOUT_MILLIS = 10;
56 private static final int LONG_TIMEOUT_MILLIS = 3000;
65 public void testFetchingBusConsumer() throws InterruptedException {
66 // should not be negative
67 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
68 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
71 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
72 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
74 // should not be too large
75 cons = new FetchingBusConsumerImpl(
76 makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
77 assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
79 // should not be what was specified
80 cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
81 assertThat(cons.getSleepTime()).isEqualTo(100);
85 public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
87 var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
89 private CountDownLatch started = new CountDownLatch(1);
92 protected void sleepAfterFetchFailure() {
94 super.sleepAfterFetchFailure();
99 long tstart = System.currentTimeMillis();
100 cons.sleepAfterFetchFailure();
101 assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
103 // close while sleeping - sleep should halt prematurely
104 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
105 cons.started = new CountDownLatch(1);
106 Thread thread = new Thread(cons::sleepAfterFetchFailure);
107 tstart = System.currentTimeMillis();
109 cons.started.await();
112 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
114 // interrupt while sleeping - sleep should halt prematurely
115 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
116 cons.started = new CountDownLatch(1);
117 thread = new Thread(cons::sleepAfterFetchFailure);
118 tstart = System.currentTimeMillis();
120 cons.started.await();
123 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
127 public void testCambriaConsumerWrapper() {
128 // verify that different wrappers can be built
129 new CambriaConsumerWrapper(makeBuilder().build());
130 new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
131 new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
132 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
133 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
134 new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
135 new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
136 new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
137 new CambriaConsumerWrapper(makeBuilder().userName(null).build());
138 new CambriaConsumerWrapper(makeBuilder().password(null).build());
140 assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
141 .doesNotThrowAnyException();
145 public void testCambriaConsumerWrapperFetch() throws Exception {
146 CambriaConsumer inner = mock(CambriaConsumer.class);
147 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
148 when(inner.fetch()).thenReturn(lst);
150 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
151 Whitebox.setInternalState(cons, "consumer", inner);
153 assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
155 // arrange to throw exception next time fetch is called
156 IOException ex = new IOException(EXPECTED);
157 when(inner.fetch()).thenThrow(ex);
159 cons.fetchTimeout = 10;
163 fail("missing exception");
165 } catch (IOException e) {
171 public void testCambriaConsumerWrapperClose() {
172 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
173 assertThatCode(() -> cons.close()).doesNotThrowAnyException();
177 public void testCambriaConsumerWrapperToString() {
178 assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
182 public void testDmaapConsumerWrapper() throws Exception {
183 // verify that different wrappers can be built
184 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
187 @Test(expected = IllegalArgumentException.class)
188 public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
189 new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
193 public void testDmaapConsumerWrapperFetch() throws Exception {
194 DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
195 MRConsumerImpl cons = mock(MRConsumerImpl.class);
197 dmaap.fetchTimeout = 5;
198 dmaap.consumer = cons;
201 when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
202 assertFalse(dmaap.fetch().iterator().hasNext());
204 // with messages, 200
205 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
206 MRConsumerResponse resp = new MRConsumerResponse();
207 resp.setResponseCode("200");
208 resp.setActualMessages(lst);
209 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
211 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
214 resp.setActualMessages(null);
215 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
217 assertFalse(dmaap.fetch().iterator().hasNext());
219 // with messages, NOT 200
220 resp.setResponseCode("400");
221 resp.setActualMessages(lst);
222 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
224 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
228 public void testDmaapConsumerWrapperClose() throws Exception {
229 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
233 public void testDmaapConsumerWrapperToString() throws Exception {
234 assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
238 public void testDmaapAafConsumerWrapper() throws Exception {
239 // verify that different wrappers can be built
240 new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
241 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
242 .doesNotThrowAnyException();
245 @Test(expected = IllegalArgumentException.class)
246 public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
248 * Unfortunately, the MR code intercepts this and throws an exception before the
249 * wrapper gets a chance to check it, thus this test does not improve the coverage
250 * for the constructor.
252 new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
256 public void testDmaapAafConsumerWrapperToString() throws Exception {
257 assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
261 public void testDmaapDmeConsumerWrapper() throws Exception {
262 // verify that different wrappers can be built
263 new DmaapDmeConsumerWrapper(makeBuilder().build());
264 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
265 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
266 new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
268 addProps.put(ROUTE_PROP, MY_ROUTE);
269 new DmaapDmeConsumerWrapper(makeBuilder().build());
270 assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
271 .doesNotThrowAnyException();
274 @Test(expected = IllegalArgumentException.class)
275 public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
276 new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
279 @Test(expected = IllegalArgumentException.class)
280 public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
281 new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
284 @Test(expected = IllegalArgumentException.class)
285 public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
286 new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
289 @Test(expected = IllegalArgumentException.class)
290 public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
291 new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
294 @Test(expected = IllegalArgumentException.class)
295 public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
296 new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
300 public void testKafkaConsumerWrapper() throws Exception {
301 // verify that different wrappers can be built
302 assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
306 public void testKafkaConsumerWrapperToString() throws Exception {
307 assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString());
310 private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
312 protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
313 super(busTopicParams);
317 public Iterable<String> fetch() throws IOException {