94e7c0c77877ef85507e07d871b9b98982cb20d6
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.fail;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.when;
32
33 import com.att.nsa.cambria.client.CambriaConsumer;
34 import java.io.IOException;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.List;
38 import java.util.Properties;
39 import java.util.concurrent.CountDownLatch;
40 import org.apache.commons.collections4.IteratorUtils;
41 import org.apache.kafka.clients.consumer.ConsumerConfig;
42 import org.apache.kafka.clients.consumer.KafkaConsumer;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
46 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
47 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
49 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
50 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
51 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
52 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
53 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
54 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
55 import org.springframework.test.util.ReflectionTestUtils;
56
57 public class BusConsumerTest extends TopicTestBase {
58
59     private static final int SHORT_TIMEOUT_MILLIS = 10;
60     private static final int LONG_TIMEOUT_MILLIS = 3000;
61
62     @Before
63     @Override
64     public void setUp() {
65         super.setUp();
66     }
67
68     @Test
69     public void testFetchingBusConsumer() throws InterruptedException {
70         // should not be negative
71         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
72         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
73
74         // should not be zero
75         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
76         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
77
78         // should not be too large
79         cons = new FetchingBusConsumerImpl(
80                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
81         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
82
83         // should not be what was specified
84         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
85         assertThat(cons.getSleepTime()).isEqualTo(100);
86     }
87
88     @Test
89     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
90
91         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
92
93             private CountDownLatch started = new CountDownLatch(1);
94
95             @Override
96             protected void sleepAfterFetchFailure() {
97                 started.countDown();
98                 super.sleepAfterFetchFailure();
99             }
100         };
101
102         // full sleep
103         long tstart = System.currentTimeMillis();
104         cons.sleepAfterFetchFailure();
105         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
106
107         // close while sleeping - sleep should halt prematurely
108         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
109         cons.started = new CountDownLatch(1);
110         Thread thread = new Thread(cons::sleepAfterFetchFailure);
111         tstart = System.currentTimeMillis();
112         thread.start();
113         cons.started.await();
114         cons.close();
115         thread.join();
116         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
117
118         // interrupt while sleeping - sleep should halt prematurely
119         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
120         cons.started = new CountDownLatch(1);
121         thread = new Thread(cons::sleepAfterFetchFailure);
122         tstart = System.currentTimeMillis();
123         thread.start();
124         cons.started.await();
125         thread.interrupt();
126         thread.join();
127         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
128     }
129
130     @Test
131     public void testCambriaConsumerWrapper() {
132         // verify that different wrappers can be built
133         new CambriaConsumerWrapper(makeBuilder().build());
134         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
135         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
136         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
137         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
138         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
139         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
140         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
141         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
142         new CambriaConsumerWrapper(makeBuilder().password(null).build());
143
144         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
145                         .doesNotThrowAnyException();
146     }
147
148     @Test
149     public void testCambriaConsumerWrapperFetch() throws Exception {
150         CambriaConsumer inner = mock(CambriaConsumer.class);
151         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
152         when(inner.fetch()).thenReturn(lst);
153
154         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
155         ReflectionTestUtils.setField(cons, "consumer", inner);
156
157         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
158
159         // arrange to throw exception next time fetch is called
160         IOException ex = new IOException(EXPECTED);
161         when(inner.fetch()).thenThrow(ex);
162
163         cons.fetchTimeout = 10;
164
165         try {
166             cons.fetch();
167             fail("missing exception");
168
169         } catch (IOException e) {
170             assertEquals(ex, e);
171         }
172     }
173
174     @Test
175     public void testCambriaConsumerWrapperClose() {
176         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
177         assertThatCode(() -> cons.close()).doesNotThrowAnyException();
178     }
179
180     @Test
181     public void testCambriaConsumerWrapperToString() {
182         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
183     }
184
185     @Test
186     public void testDmaapConsumerWrapper() throws Exception {
187         // verify that different wrappers can be built
188         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
189     }
190
191     @Test(expected = IllegalArgumentException.class)
192     public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
193         new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
194     }
195
196     @Test
197     public void testDmaapConsumerWrapperFetch() throws Exception {
198         DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
199         MRConsumerImpl cons = mock(MRConsumerImpl.class);
200
201         dmaap.fetchTimeout = 5;
202         dmaap.consumer = cons;
203
204         // null return
205         when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
206         assertFalse(dmaap.fetch().iterator().hasNext());
207
208         // with messages, 200
209         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
210         MRConsumerResponse resp = new MRConsumerResponse();
211         resp.setResponseCode("200");
212         resp.setActualMessages(lst);
213         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
214
215         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
216
217         // null messages
218         resp.setActualMessages(null);
219         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
220
221         assertFalse(dmaap.fetch().iterator().hasNext());
222
223         // with messages, NOT 200
224         resp.setResponseCode("400");
225         resp.setActualMessages(lst);
226         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
227
228         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
229     }
230
231     @Test
232     public void testDmaapConsumerWrapperClose() throws Exception {
233         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
234     }
235
236     @Test
237     public void testDmaapConsumerWrapperToString() throws Exception {
238         assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
239     }
240
241     @Test
242     public void testDmaapAafConsumerWrapper() throws Exception {
243         // verify that different wrappers can be built
244         new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
245         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
246                         .doesNotThrowAnyException();
247     }
248
249     @Test(expected = IllegalArgumentException.class)
250     public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
251         /*
252          * Unfortunately, the MR code intercepts this and throws an exception before the
253          * wrapper gets a chance to check it, thus this test does not improve the coverage
254          * for the constructor.
255          */
256         new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
257     }
258
259     @Test
260     public void testDmaapAafConsumerWrapperToString() throws Exception {
261         assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
262     }
263
264     @Test
265     public void testDmaapDmeConsumerWrapper() throws Exception {
266         // verify that different wrappers can be built
267         new DmaapDmeConsumerWrapper(makeBuilder().build());
268         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
269         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
270         new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
271
272         addProps.put(ROUTE_PROP, MY_ROUTE);
273         new DmaapDmeConsumerWrapper(makeBuilder().build());
274         assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
275                         .doesNotThrowAnyException();
276     }
277
278     @Test(expected = IllegalArgumentException.class)
279     public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
280         new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
281     }
282
283     @Test(expected = IllegalArgumentException.class)
284     public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
285         new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
286     }
287
288     @Test(expected = IllegalArgumentException.class)
289     public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
290         new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
291     }
292
293     @Test(expected = IllegalArgumentException.class)
294     public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
295         new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
296     }
297
298     @Test(expected = IllegalArgumentException.class)
299     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
300         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
301     }
302
303     @Test
304     public void testKafkaConsumerWrapper() throws Exception {
305         // verify that different wrappers can be built
306         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
307     }
308
309     @Test(expected = IllegalArgumentException.class)
310     public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
311         new KafkaConsumerWrapper(makeBuilder().topic(null).build());
312     }
313
314     @Test(expected = java.lang.IllegalStateException.class)
315     public void testKafkaConsumerWrapperFetch() throws Exception {
316
317         //Setup Properties for consumer
318         Properties kafkaProps = new Properties();
319         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
320         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
321         kafkaProps.setProperty("enable.auto.commit", "true");
322         kafkaProps.setProperty("auto.commit.interval.ms", "1000");
323         kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
324             "org.apache.kafka.common.serialization.StringDeserializer");
325         kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
326             "org.apache.kafka.common.serialization.StringDeserializer");
327         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
328         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
329
330         KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
331         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
332         kafka.consumer = consumer;
333
334         assertFalse(kafka.fetch().iterator().hasNext());
335         consumer.close();
336     }
337
338     @Test
339     public void testKafkaConsumerWrapperClose() throws Exception {
340         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
341     }
342
343     @Test
344     public void testKafkaConsumerWrapperToString() throws Exception {
345         assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
346     }
347
348     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
349
350         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
351             super(busTopicParams);
352         }
353
354         @Override
355         public Iterable<String> fetch() throws IOException {
356             return null;
357         }
358     }
359 }