2c33a257205774929952372df9c231ac7287d008
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertThrows;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.nio.charset.StandardCharsets;
35 import java.util.Collections;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Properties;
40 import java.util.concurrent.CountDownLatch;
41 import org.apache.kafka.clients.consumer.ConsumerConfig;
42 import org.apache.kafka.clients.consumer.ConsumerRecord;
43 import org.apache.kafka.clients.consumer.ConsumerRecords;
44 import org.apache.kafka.clients.consumer.KafkaConsumer;
45 import org.apache.kafka.common.TopicPartition;
46 import org.junit.After;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.Mock;
50 import org.mockito.MockitoAnnotations;
51 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
52 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
53 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
54 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
55
56 public class BusConsumerTest extends TopicTestBase {
57
58     private static final int SHORT_TIMEOUT_MILLIS = 10;
59     private static final int LONG_TIMEOUT_MILLIS = 3000;
60
61     @Mock
62     KafkaConsumer<String, String> mockedKafkaConsumer;
63
64     AutoCloseable closeable;
65
66     @Before
67     @Override
68     public void setUp() {
69         super.setUp();
70         closeable = MockitoAnnotations.openMocks(this);
71     }
72
73     @After
74     public void tearDown() throws Exception {
75         closeable.close();
76     }
77
78
79     @Test
80     public void testFetchingBusConsumer() {
81         // should not be negative
82         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
83         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
84
85         // should not be zero
86         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
87         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
88
89         // should not be too large
90         cons = new FetchingBusConsumerImpl(
91                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
92         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
93
94         // should not be what was specified
95         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
96         assertThat(cons.getSleepTime()).isEqualTo(100);
97     }
98
99     @Test
100     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
101
102         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
103
104             private CountDownLatch started = new CountDownLatch(1);
105
106             @Override
107             protected void sleepAfterFetchFailure() {
108                 started.countDown();
109                 super.sleepAfterFetchFailure();
110             }
111         };
112
113         // full sleep
114         long tstart = System.currentTimeMillis();
115         cons.sleepAfterFetchFailure();
116         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
117
118         // close while sleeping - sleep should halt prematurely
119         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
120         cons.started = new CountDownLatch(1);
121         Thread thread = new Thread(cons::sleepAfterFetchFailure);
122         tstart = System.currentTimeMillis();
123         thread.start();
124         cons.started.await();
125         cons.close();
126         thread.join();
127         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
128
129         // interrupt while sleeping - sleep should halt prematurely
130         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
131         cons.started = new CountDownLatch(1);
132         thread = new Thread(cons::sleepAfterFetchFailure);
133         tstart = System.currentTimeMillis();
134         thread.start();
135         cons.started.await();
136         thread.interrupt();
137         thread.join();
138         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
139     }
140
141     @Test
142     public void testKafkaConsumerWrapper() {
143         // verify that different wrappers can be built
144         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
145     }
146
147     @Test(expected = IllegalArgumentException.class)
148     public void testKafkaConsumerWrapper_InvalidTopic() {
149         new KafkaConsumerWrapper(makeBuilder().topic(null).build());
150     }
151
152     @Test
153     public void testKafkaConsumerWrapperFetch() {
154
155         //Setup Properties for consumer
156         Properties kafkaProps = new Properties();
157         kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
158         kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
159         kafkaProps.setProperty("enable.auto.commit", "true");
160         kafkaProps.setProperty("auto.commit.interval.ms", "1000");
161         kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
162             "org.apache.kafka.common.serialization.StringDeserializer");
163         kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
164             "org.apache.kafka.common.serialization.StringDeserializer");
165         kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
166         kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
167
168         KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
169         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
170         kafka.consumer = consumer;
171
172         assertThrows(java.lang.IllegalStateException.class, () -> kafka.fetch().iterator().hasNext());
173         consumer.close();
174     }
175
176     @Test
177     public void testFetchNoMessages() throws IOException {
178         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
179         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
180
181         when(mockedKafkaConsumer.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
182
183         Iterable<String> result = kafkaConsumerWrapper.fetch();
184
185         verify(mockedKafkaConsumer, times(1)).poll(any());
186
187         assertThat(result != null);
188
189         assertThat(!result.iterator().hasNext());
190
191         mockedKafkaConsumer.close();
192     }
193
194     @Test
195     public void testFetchWithMessages() {
196         // Setup
197         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
198         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
199
200         ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
201         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
202         recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
203         ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
204
205         when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
206
207         Iterable<String> result = kafkaConsumerWrapper.fetch();
208
209         verify(mockedKafkaConsumer, times(1)).poll(any());
210
211         verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
212
213         assertThat(result != null);
214
215         assertThat(result.iterator().hasNext());
216
217         assertThat(result.iterator().next().equals("value"));
218
219         mockedKafkaConsumer.close();
220     }
221
222     @Test
223     public void testFetchWithMessagesAndTraceparent() {
224         // Setup
225         KafkaConsumerWrapper kafkaConsumerWrapper = new KafkaConsumerWrapper(makeKafkaBuilder().build());
226         kafkaConsumerWrapper.consumer = mockedKafkaConsumer;
227
228         ConsumerRecord<String, String> record = new ConsumerRecord<>("my-effective-topic", 0, 0, "key", "value");
229         record.headers().add(
230                 "traceparent",
231                 "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".getBytes(StandardCharsets.UTF_8)
232         );
233
234         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordsMap = new HashMap<>();
235         recordsMap.put(new TopicPartition("my-effective-topic", 0), Collections.singletonList(record));
236         ConsumerRecords<String, String> consumerRecords = new ConsumerRecords<>(recordsMap);
237
238         when(mockedKafkaConsumer.poll(any())).thenReturn(consumerRecords);
239
240         Iterable<String> result = kafkaConsumerWrapper.fetch();
241
242         verify(mockedKafkaConsumer, times(1)).poll(any());
243
244         verify(mockedKafkaConsumer, times(1)).commitSync(any(Map.class));
245
246         assertThat(result != null);
247
248         assertThat(result.iterator().hasNext());
249
250         assertThat(result.iterator().next().equals("value"));
251
252         mockedKafkaConsumer.close();
253     }
254
255
256     @Test
257     public void testKafkaConsumerWrapperClose() {
258         assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
259     }
260
261     @Test
262     public void testKafkaConsumerWrapperToString() {
263         assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
264     }
265
266     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
267
268         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
269             super(busTopicParams);
270         }
271
272         @Override
273         public Iterable<String> fetch() {
274             return null;
275         }
276     }
277 }