1 package org.onap.so.client.kafka;
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.clients.consumer.MockConsumer;
5 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
6 import org.apache.kafka.common.TopicPartition;
7 import org.junit.Before;
9 import org.junit.jupiter.api.extension.ExtendWith;
10 import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
11 import uk.org.webcompere.systemstubs.jupiter.SystemStub;
12 import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
13 import java.util.Arrays;
14 import java.util.HashMap;
15 import java.util.List;
16 import static org.assertj.core.api.Assertions.assertThat;
18 @ExtendWith(SystemStubsExtension.class)
19 public class KafkaConsumerImplTest {
20 private KafkaConsumerImpl consumer;
21 private static MockConsumer<String, String> mockConsumer;
23 EnvironmentVariables environmentVariables = new EnvironmentVariables();
27 environmentVariables.set("JAAS_CONFIG", "jaas.config");
28 mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
29 configureMockConsumer();
33 public void consumerShouldConsumeMessages() throws Exception {
34 consumer = new KafkaConsumerImpl("localhost:9092");
35 consumer.setConsumer(mockConsumer);
36 List<String> response = consumer.get("TOPIC", "CG1", "C1");
37 assertThat(response).contains("I", "like", "pizza");
40 private void configureMockConsumer() {
41 mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0)));
43 HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
44 beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L);
45 mockConsumer.updateBeginningOffsets(beginningOffsets);
46 mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 0L, "key", "I"));
47 mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 1L, "key", "like"));
48 mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 2L, "key", "pizza"));