2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.fail;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
32 import com.att.nsa.cambria.client.CambriaConsumer;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.CountDownLatch;
38 import org.apache.commons.collections4.IteratorUtils;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
42 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
43 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
44 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
45 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
46 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
47 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
49 import org.powermock.reflect.Whitebox;
51 public class BusConsumerTest extends TopicTestBase {
53 private static final int SHORT_TIMEOUT_MILLIS = 10;
54 private static final int LONG_TIMEOUT_MILLIS = 3000;
63 public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
65 var cons = new FetchingBusConsumer(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
67 private CountDownLatch started = new CountDownLatch(1);
70 protected void sleepAfterFetchFailure() {
72 super.sleepAfterFetchFailure();
76 public Iterable<String> fetch() throws IOException {
82 long tstart = System.currentTimeMillis();
83 cons.sleepAfterFetchFailure();
84 assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
86 // close while sleeping - sleep should halt prematurely
87 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
88 cons.started = new CountDownLatch(1);
89 Thread thread = new Thread(cons::sleepAfterFetchFailure);
90 tstart = System.currentTimeMillis();
95 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
97 // interrupt while sleeping - sleep should halt prematurely
98 cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
99 cons.started = new CountDownLatch(1);
100 thread = new Thread(cons::sleepAfterFetchFailure);
101 tstart = System.currentTimeMillis();
103 cons.started.await();
106 assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
110 public void testCambriaConsumerWrapper() {
111 // verify that different wrappers can be built
112 new CambriaConsumerWrapper(makeBuilder().build());
113 new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
114 new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
115 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
116 new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
117 new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
118 new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
119 new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
120 new CambriaConsumerWrapper(makeBuilder().userName(null).build());
121 new CambriaConsumerWrapper(makeBuilder().password(null).build());
123 assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
124 .doesNotThrowAnyException();
128 public void testCambriaConsumerWrapperFetch() throws Exception {
129 CambriaConsumer inner = mock(CambriaConsumer.class);
130 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
131 when(inner.fetch()).thenReturn(lst);
133 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
134 Whitebox.setInternalState(cons, "consumer", inner);
136 assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
138 // arrange to throw exception next time fetch is called
139 IOException ex = new IOException(EXPECTED);
140 when(inner.fetch()).thenThrow(ex);
142 cons.fetchTimeout = 10;
146 fail("missing exception");
148 } catch (IOException e) {
154 public void testCambriaConsumerWrapperClose() {
155 CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
156 assertThatCode(() -> cons.close()).doesNotThrowAnyException();
160 public void testCambriaConsumerWrapperToString() {
161 assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
165 public void testDmaapConsumerWrapper() throws Exception {
166 // verify that different wrappers can be built
167 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
170 @Test(expected = IllegalArgumentException.class)
171 public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
172 new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
176 public void testDmaapConsumerWrapperFetch() throws Exception {
177 DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
178 MRConsumerImpl cons = mock(MRConsumerImpl.class);
180 dmaap.fetchTimeout = 5;
181 dmaap.consumer = cons;
184 when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
185 assertFalse(dmaap.fetch().iterator().hasNext());
187 // with messages, 200
188 List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
189 MRConsumerResponse resp = new MRConsumerResponse();
190 resp.setResponseCode("200");
191 resp.setActualMessages(lst);
192 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
194 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
197 resp.setActualMessages(null);
198 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
200 assertFalse(dmaap.fetch().iterator().hasNext());
202 // with messages, NOT 200
203 resp.setResponseCode("400");
204 resp.setActualMessages(lst);
205 when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
207 assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
211 public void testDmaapConsumerWrapperClose() throws Exception {
212 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
216 public void testDmaapConsumerWrapperToString() throws Exception {
217 assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
221 public void testDmaapAafConsumerWrapper() throws Exception {
222 // verify that different wrappers can be built
223 new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
224 assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
225 .doesNotThrowAnyException();
228 @Test(expected = IllegalArgumentException.class)
229 public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
231 * Unfortunately, the MR code intercepts this and throws an exception before the
232 * wrapper gets a chance to check it, thus this test does not improve the coverage
233 * for the constructor.
235 new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
239 public void testDmaapAafConsumerWrapperToString() throws Exception {
240 assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
244 public void testDmaapDmeConsumerWrapper() throws Exception {
245 // verify that different wrappers can be built
246 new DmaapDmeConsumerWrapper(makeBuilder().build());
247 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
248 new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
249 new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
251 addProps.put(ROUTE_PROP, MY_ROUTE);
252 new DmaapDmeConsumerWrapper(makeBuilder().build());
253 assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
254 .doesNotThrowAnyException();
257 @Test(expected = IllegalArgumentException.class)
258 public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
259 new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
262 @Test(expected = IllegalArgumentException.class)
263 public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
264 new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
267 @Test(expected = IllegalArgumentException.class)
268 public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
269 new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
272 @Test(expected = IllegalArgumentException.class)
273 public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
274 new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
277 @Test(expected = IllegalArgumentException.class)
278 public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
279 new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());