da9f792be09bf9376585618744a7648bdf0eee54
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.fail;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
31
32 import com.att.nsa.cambria.client.CambriaConsumer;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.CountDownLatch;
38 import org.apache.commons.collections4.IteratorUtils;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
42 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
43 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
44 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
45 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
46 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
47 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
49 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
50 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
51 import org.powermock.reflect.Whitebox;
52
53 public class BusConsumerTest extends TopicTestBase {
54
55     private static final int SHORT_TIMEOUT_MILLIS = 10;
56     private static final int LONG_TIMEOUT_MILLIS = 3000;
57
58     @Before
59     @Override
60     public void setUp() {
61         super.setUp();
62     }
63
64     @Test
65     public void testFetchingBusConsumer() throws InterruptedException {
66         // should not be negative
67         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
68         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
69
70         // should not be zero
71         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
72         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
73
74         // should not be too large
75         cons = new FetchingBusConsumerImpl(
76                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
77         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
78
79         // should not be what was specified
80         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
81         assertThat(cons.getSleepTime()).isEqualTo(100);
82     }
83
84     @Test
85     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
86
87         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
88
89             private CountDownLatch started = new CountDownLatch(1);
90
91             @Override
92             protected void sleepAfterFetchFailure() {
93                 started.countDown();
94                 super.sleepAfterFetchFailure();
95             }
96         };
97
98         // full sleep
99         long tstart = System.currentTimeMillis();
100         cons.sleepAfterFetchFailure();
101         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
102
103         // close while sleeping - sleep should halt prematurely
104         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
105         cons.started = new CountDownLatch(1);
106         Thread thread = new Thread(cons::sleepAfterFetchFailure);
107         tstart = System.currentTimeMillis();
108         thread.start();
109         cons.started.await();
110         cons.close();
111         thread.join();
112         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
113
114         // interrupt while sleeping - sleep should halt prematurely
115         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
116         cons.started = new CountDownLatch(1);
117         thread = new Thread(cons::sleepAfterFetchFailure);
118         tstart = System.currentTimeMillis();
119         thread.start();
120         cons.started.await();
121         thread.interrupt();
122         thread.join();
123         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
124     }
125
126     @Test
127     public void testCambriaConsumerWrapper() {
128         // verify that different wrappers can be built
129         new CambriaConsumerWrapper(makeBuilder().build());
130         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
131         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
132         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
133         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
134         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
135         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
136         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
137         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
138         new CambriaConsumerWrapper(makeBuilder().password(null).build());
139
140         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
141                         .doesNotThrowAnyException();
142     }
143
144     @Test
145     public void testCambriaConsumerWrapperFetch() throws Exception {
146         CambriaConsumer inner = mock(CambriaConsumer.class);
147         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
148         when(inner.fetch()).thenReturn(lst);
149
150         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
151         Whitebox.setInternalState(cons, "consumer", inner);
152
153         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
154
155         // arrange to throw exception next time fetch is called
156         IOException ex = new IOException(EXPECTED);
157         when(inner.fetch()).thenThrow(ex);
158
159         cons.fetchTimeout = 10;
160
161         try {
162             cons.fetch();
163             fail("missing exception");
164
165         } catch (IOException e) {
166             assertEquals(ex, e);
167         }
168     }
169
170     @Test
171     public void testCambriaConsumerWrapperClose() {
172         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
173         assertThatCode(() -> cons.close()).doesNotThrowAnyException();
174     }
175
176     @Test
177     public void testCambriaConsumerWrapperToString() {
178         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
179     }
180
181     @Test
182     public void testDmaapConsumerWrapper() throws Exception {
183         // verify that different wrappers can be built
184         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
185     }
186
187     @Test(expected = IllegalArgumentException.class)
188     public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
189         new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
190     }
191
192     @Test
193     public void testDmaapConsumerWrapperFetch() throws Exception {
194         DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
195         MRConsumerImpl cons = mock(MRConsumerImpl.class);
196
197         dmaap.fetchTimeout = 5;
198         dmaap.consumer = cons;
199
200         // null return
201         when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
202         assertFalse(dmaap.fetch().iterator().hasNext());
203
204         // with messages, 200
205         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
206         MRConsumerResponse resp = new MRConsumerResponse();
207         resp.setResponseCode("200");
208         resp.setActualMessages(lst);
209         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
210
211         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
212
213         // null messages
214         resp.setActualMessages(null);
215         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
216
217         assertFalse(dmaap.fetch().iterator().hasNext());
218
219         // with messages, NOT 200
220         resp.setResponseCode("400");
221         resp.setActualMessages(lst);
222         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
223
224         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
225     }
226
227     @Test
228     public void testDmaapConsumerWrapperClose() throws Exception {
229         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
230     }
231
232     @Test
233     public void testDmaapConsumerWrapperToString() throws Exception {
234         assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
235     }
236
237     @Test
238     public void testDmaapAafConsumerWrapper() throws Exception {
239         // verify that different wrappers can be built
240         new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
241         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
242                         .doesNotThrowAnyException();
243     }
244
245     @Test(expected = IllegalArgumentException.class)
246     public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
247         /*
248          * Unfortunately, the MR code intercepts this and throws an exception before the
249          * wrapper gets a chance to check it, thus this test does not improve the coverage
250          * for the constructor.
251          */
252         new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
253     }
254
255     @Test
256     public void testDmaapAafConsumerWrapperToString() throws Exception {
257         assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
258     }
259
260     @Test
261     public void testDmaapDmeConsumerWrapper() throws Exception {
262         // verify that different wrappers can be built
263         new DmaapDmeConsumerWrapper(makeBuilder().build());
264         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
265         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
266         new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
267
268         addProps.put(ROUTE_PROP, MY_ROUTE);
269         new DmaapDmeConsumerWrapper(makeBuilder().build());
270         assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
271                         .doesNotThrowAnyException();
272     }
273
274     @Test(expected = IllegalArgumentException.class)
275     public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
276         new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
277     }
278
279     @Test(expected = IllegalArgumentException.class)
280     public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
281         new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
282     }
283
284     @Test(expected = IllegalArgumentException.class)
285     public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
286         new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
287     }
288
289     @Test(expected = IllegalArgumentException.class)
290     public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
291         new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
292     }
293
294     @Test(expected = IllegalArgumentException.class)
295     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
296         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
297     }
298
299     @Test
300     public void testKafkaConsumerWrapper() throws Exception {
301         // verify that different wrappers can be built
302         assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
303     }
304
305     @Test
306     public void testKafkaConsumerWrapperToString() throws Exception {
307         assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString());
308     }
309
310     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
311
312         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
313             super(busTopicParams);
314         }
315
316         @Override
317         public Iterable<String> fetch() throws IOException {
318             return null;
319         }
320     }
321 }