70fa83c6acc3fd948f4d98ec52253213f0a71e95
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertThrows;
29 import static org.junit.Assert.fail;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
35
36 import com.att.nsa.cambria.client.CambriaConsumer;
37 import java.io.IOException;
38 import java.nio.charset.StandardCharsets;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Properties;
45 import java.util.concurrent.CountDownLatch;
46 import org.apache.commons.collections4.IteratorUtils;
47 import org.apache.kafka.clients.consumer.ConsumerConfig;
48 import org.apache.kafka.clients.consumer.ConsumerRecord;
49 import org.apache.kafka.clients.consumer.ConsumerRecords;
50 import org.apache.kafka.clients.consumer.KafkaConsumer;
51 import org.apache.kafka.common.TopicPartition;
52 import org.junit.Before;
53 import org.junit.Test;
54 import org.mockito.Mock;
55 import org.mockito.MockitoAnnotations;
56 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
57 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
58 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
59 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
60 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
61 import org.springframework.test.util.ReflectionTestUtils;
62
63 public class BusConsumerTest extends TopicTestBase {
64
65     private static final int SHORT_TIMEOUT_MILLIS = 10;
66     private static final int LONG_TIMEOUT_MILLIS = 3000;
67
68     @Mock
69     KafkaConsumer<String, String> mockedKafkaConsumer;
70
71     @Before
72     @Override
73     public void setUp() {
74         super.setUp();
75         MockitoAnnotations.initMocks(this);
76     }
77
78
79     @Test
80     public void testFetchingBusConsumer() {
81         // should not be negative
82         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
83         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
84
85         // should not be zero
86         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
87         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
88
89         // should not be too large
90         cons = new FetchingBusConsumerImpl(
91                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
92         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
93
94         // should not be what was specified
95         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
96         assertThat(cons.getSleepTime()).isEqualTo(100);
97     }
98
99     @Test
100     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
101
102         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
103
104             private CountDownLatch started = new CountDownLatch(1);
105
106             @Override
107             protected void sleepAfterFetchFailure() {
108                 started.countDown();
109                 super.sleepAfterFetchFailure();
110             }
111         };
112
113         // full sleep
114         long tstart = System.currentTimeMillis();
115         cons.sleepAfterFetchFailure();
116         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
117
118         // close while sleeping - sleep should halt prematurely
119         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
120         cons.started = new CountDownLatch(1);
121         Thread thread = new Thread(cons::sleepAfterFetchFailure);
122         tstart = System.currentTimeMillis();
123         thread.start();
124         cons.started.await();
125         cons.close();
126         thread.join();
127         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
128
129         // interrupt while sleeping - sleep should halt prematurely
130         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
131         cons.started = new CountDownLatch(1);
132         thread = new Thread(cons::sleepAfterFetchFailure);
133         tstart = System.currentTimeMillis();
134         thread.start();
135         cons.started.await();
136         thread.interrupt();
137         thread.join();
138         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
139     }
140
141     @Test
142     public void testCambriaConsumerWrapper() {
143         // verify that different wrappers can be built
144         new CambriaConsumerWrapper(makeBuilder().build());
145         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
146         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
147         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
148         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
149         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
150         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
151         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
152         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
153         new CambriaConsumerWrapper(makeBuilder().password(null).build());
154
155         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
156                         .doesNotThrowAnyException();
157     }
158
159     @Test
160     public void testCambriaConsumerWrapperFetch() throws Exception {
161         CambriaConsumer inner = mock(CambriaConsumer.class);
162         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
163         when(inner.fetch()).thenReturn(lst);
164
165         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
166         ReflectionTestUtils.setField(cons, "consumer", inner);
167
168         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
169
170         // arrange to throw exception next time fetch is called
171         IOException ex = new IOException(EXPECTED);
172         when(inner.fetch()).thenThrow(ex);
173
174         cons.fetchTimeout = 10;
175
176         try {
177             cons.fetch();
178             fail("missing exception");
179
180         } catch (IOException e) {
181             assertEquals(ex, e);
182         }
183     }
184
185     @Test
186     public void testCambriaConsumerWrapperClose() {
187         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
188         assertThatCode(cons::close).doesNotThrowAnyException();
189     }
190
191     @Test
192     public void testCambriaConsumerWrapperToString() {
193         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
194     }
195
196     @Test
197     public void testKafkaConsumerWrapper() {
198         // verify that different wrappers can be built
199         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
200     }
201
202     @Test(expected = IllegalArgumentException.class)
203     public void testKafkaConsumerWrapper_InvalidTopic() {
204         new KafkaConsumerWrapper(makeBuilder().topic(null).build());
205     }
206
207     @Test
208     public void testKafkaConsumerWrapperFetch() {
209
210         //Setup Properties for consumer
211         Properties kafkaProps = new Properties();
212         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
213         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
214         kafkaProps.setProperty("enable.auto.commit", "true");
215         kafkaProps.setProperty("auto.commit.interval.ms", "1000");
216         kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
217             "org.apache.kafka.common.serialization.StringDeserializer");
218         kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
219             "org.apache.kafka.common.serialization.StringDeserializer");
220         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
221         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
222
223         KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
224         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
225         kafka.consumer = consumer;
226
227         assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
228         consumer.close();
229     }
230
231     @Test
232     public void testFetchNoMessages() throws IOException {
233         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
234         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
235
236         when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
237
238         Iterable<String> result = kafkaConsumerWrapper.fetch();
239
240         verify(mockedKafkaConsumer, times(1)).poll(any());
241
242         assertThat(result != null);
243
244         assertThat(!result.iterator().hasNext());
245
246         mockedKafkaConsumer.close();
247     }
248
249     @Test
250     public void testFetchWithMessages() {
251         // Setup
252         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
253         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
254
255         ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
256         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
257         recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
258         ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
259
260         when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
261
262         Iterable<String> result = kafkaConsumerWrapper.fetch();
263
264         verify(mockedKafkaConsumer, times(1)).poll(any());
265
266         verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
267
268         assertThat(result != null);
269
270         assertThat(result.iterator().hasNext());
271
272         assertThat(result.iterator().next().equals("value"));
273
274         mockedKafkaConsumer.close();
275     }
276
277     @Test
278     public void testFetchWithMessagesAndTraceparent() {
279         // Setup
280         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
281         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
282
283         ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
284         record.headers().add(
285                 "traceparent",
286                 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
287         );
288
289         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
290         recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
291         ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
292
293         when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
294
295         Iterable<String> result = kafkaConsumerWrapper.fetch();
296
297         verify(mockedKafkaConsumer, times(1)).poll(any());
298
299         verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
300
301         assertThat(result != null);
302
303         assertThat(result.iterator().hasNext());
304
305         assertThat(result.iterator().next().equals("value"));
306
307         mockedKafkaConsumer.close();
308     }
309
310
311     @Test
312     public void testKafkaConsumerWrapperClose() {
313         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
314     }
315
316     @Test
317     public void testKafkaConsumerWrapperToString() {
318         assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
319     }
320
321     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
322
323         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
324             super(busTopicParams);
325         }
326
327         @Override
328         public Iterable<String> fetch() {
329             return null;
330         }
331     }
332 }