36efff90179d7030efa04c8532274c12ac0a2655
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.jupiter.api.Assertions.assertEquals;
28 import static org.junit.jupiter.api.Assertions.assertFalse;
29 import static org.junit.jupiter.api.Assertions.assertNotNull;
30 import static org.junit.jupiter.api.Assertions.assertThrows;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
32 import static org.mockito.ArgumentMatchers.any;
33 import static org.mockito.Mockito.times;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import java.nio.charset.StandardCharsets;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Properties;
43 import java.util.concurrent.CountDownLatch;
44 import org.apache.kafka.clients.consumer.ConsumerConfig;
45 import org.apache.kafka.clients.consumer.ConsumerRecord;
46 import org.apache.kafka.clients.consumer.ConsumerRecords;
47 import org.apache.kafka.clients.consumer.KafkaConsumer;
48 import org.apache.kafka.common.TopicPartition;
49 import org.junit.jupiter.api.AfterEach;
50 import org.junit.jupiter.api.BeforeEach;
51 import org.junit.jupiter.api.Test;
52 import org.mockito.Mock;
53 import org.mockito.MockitoAnnotations;
54 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
55 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
56 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
57 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
58
59 class BusConsumerTest extends TopicTestBase {
60
61     private static final int SHORT_TIMEOUT_MILLIS = 10;
62     private static final int LONG_TIMEOUT_MILLIS = 3000;
63
64     @Mock
65     KafkaConsumer<String, String> mockedKafkaConsumer;
66
67     AutoCloseable closeable;
68
69     @BeforeEach
70     @Override
71     public void setUp() {
72         super.setUp();
73         closeable = MockitoAnnotations.openMocks(this);
74     }
75
76     @AfterEach
77     public void tearDown() throws Exception {
78         closeable.close();
79     }
80
81
82     @Test
83     void testFetchingBusConsumer() {
84         // should not be negative
85         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
86         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
87
88         // should not be zero
89         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
90         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
91
92         // should not be too large
93         cons = new FetchingBusConsumerImpl(
94                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
95         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
96
97         // should not be what was specified
98         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
99         assertThat(cons.getSleepTime()).isEqualTo(100);
100     }
101
102     @Test
103     void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
104
105         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
106
107             private CountDownLatch started = new CountDownLatch(1);
108
109             @Override
110             protected void sleepAfterFetchFailure() {
111                 started.countDown();
112                 super.sleepAfterFetchFailure();
113             }
114         };
115
116         // full sleep
117         long tstart = System.currentTimeMillis();
118         cons.sleepAfterFetchFailure();
119         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
120
121         // close while sleeping - sleep should halt prematurely
122         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
123         cons.started = new CountDownLatch(1);
124         Thread thread = new Thread(cons::sleepAfterFetchFailure);
125         tstart = System.currentTimeMillis();
126         thread.start();
127         cons.started.await();
128         cons.close();
129         thread.join();
130         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
131
132         // interrupt while sleeping - sleep should halt prematurely
133         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
134         cons.started = new CountDownLatch(1);
135         thread = new Thread(cons::sleepAfterFetchFailure);
136         tstart = System.currentTimeMillis();
137         thread.start();
138         cons.started.await();
139         thread.interrupt();
140         thread.join();
141         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
142     }
143
144     @Test
145     void testKafkaConsumerWrapper() {
146         // verify that different wrappers can be built
147         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
148     }
149
150     @Test
151     void testKafkaConsumerWrapper_InvalidTopic() {
152         BusTopicParams params = makeBuilder().topic(null).build();
153         assertThatThrownBy(() -> new KafkaConsumerWrapper(params))
154             .isInstanceOf(IllegalArgumentException.class);
155     }
156
157     @Test
158     void testKafkaConsumerWrapperFetch() {
159
160         //Setup Properties for consumer
161         Properties kafkaProps = new Properties();
162         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
163         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
164         kafkaProps.setProperty("enable.auto.commit", "true");
165         kafkaProps.setProperty("auto.commit.interval.ms", "1000");
166         kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
167             "org.apache.kafka.common.serialization.StringDeserializer");
168         kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
169             "org.apache.kafka.common.serialization.StringDeserializer");
170         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
171         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
172
173         KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
174         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
175         kafka.consumer = consumer;
176
177         assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
178         consumer.close();
179     }
180
181     @Test
182     void testFetchNoMessages() {
183         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
184         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
185
186         when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
187
188         Iterable<String> result = kafkaConsumerWrapper.fetch();
189
190         verify(mockedKafkaConsumer).poll(any());
191
192         assertNotNull(result);
193
194         assertFalse(result.iterator().hasNext());
195
196         mockedKafkaConsumer.close();
197     }
198
199     @Test
200     void testFetchWithMessages() {
201         // Setup
202         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
203         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
204
205         ConsumerRecord<String, String> customerRecord =
206             new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
207         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
208         recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(customerRecord));
209         ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
210
211         when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
212
213         Iterable<String> result = kafkaConsumerWrapper.fetch();
214
215         verify(mockedKafkaConsumer, times(1)).poll(any());
216
217         verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
218
219         assertNotNull(result);
220
221         assertTrue(result.iterator().hasNext());
222
223         assertEquals("value", result.iterator().next());
224
225         mockedKafkaConsumer.close();
226     }
227
228     @Test
229     void testFetchWithMessagesAndTraceparent() {
230         // Setup
231         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
232         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
233
234         ConsumerRecord<String, String> customerRecord =
235             new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
236         customerRecord.headers().add(
237                 "traceparent",
238                 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
239         );
240
241         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
242         recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(customerRecord));
243         ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
244
245         when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
246
247         Iterable<String> result = kafkaConsumerWrapper.fetch();
248
249         verify(mockedKafkaConsumer, times(1)).poll(any());
250
251         verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
252
253         assertNotNull(result);
254
255         assertTrue(result.iterator().hasNext());
256
257         assertEquals("value", result.iterator().next());
258
259         mockedKafkaConsumer.close();
260     }
261
262
263     @Test
264     void testKafkaConsumerWrapperClose() {
265         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
266     }
267
268     @Test
269     void testKafkaConsumerWrapperToString() {
270         assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
271     }
272
273     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
274
275         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
276             super(busTopicParams);
277         }
278
279         @Override
280         public Iterable<String> fetch() {
281             return null;
282         }
283     }
284 }