[SO] Remove DMaap Dependency in SO-bpmn-infra
[so.git] / common / src / test / java / org / onap / so / client / kafka / KafkaConsumerImplTest.java
1 package org.onap.so.client.kafka;
2
3 import org.apache.kafka.clients.consumer.ConsumerRecord;
4 import org.apache.kafka.clients.consumer.MockConsumer;
5 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
6 import org.apache.kafka.common.TopicPartition;
7 import org.junit.Before;
8 import org.junit.Test;
9 import org.junit.jupiter.api.extension.ExtendWith;
10 import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
11 import uk.org.webcompere.systemstubs.jupiter.SystemStub;
12 import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
13 import java.util.Arrays;
14 import java.util.HashMap;
15 import java.util.List;
16 import static org.assertj.core.api.Assertions.assertThat;
17
18 @ExtendWith(SystemStubsExtension.class)
19 public class KafkaConsumerImplTest {
20     private KafkaConsumerImpl consumer;
21     private static MockConsumer<String, String> mockConsumer;
22     @SystemStub
23     EnvironmentVariables environmentVariables = new EnvironmentVariables();
24
25     @Before
26     public void setup() {
27         environmentVariables.set("JAAS_CONFIG", "jaas.config");
28         mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
29         configureMockConsumer();
30     }
31
32     @Test
33     public void consumerShouldConsumeMessages() throws Exception {
34         consumer = new KafkaConsumerImpl("localhost:9092");
35         consumer.setConsumer(mockConsumer);
36         List<String> response = consumer.get("TOPIC", "CG1", "C1");
37         assertThat(response).contains("I", "like", "pizza");
38     }
39
40     private void configureMockConsumer() {
41         mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0)));
42
43         HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
44         beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L);
45         mockConsumer.updateBeginningOffsets(beginningOffsets);
46         mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 0L, "key", "I"));
47         mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 1L, "key", "like"));
48         mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 2L, "key", "pizza"));
49
50     }
51 }