2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.kafka;
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.junit.jupiter.api.Assertions.assertTrue;
25 import org.apache.kafka.common.KafkaException;
26 import org.junit.jupiter.api.Test;
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.when;
32 import org.mockito.ArgumentCaptor;
33 import org.mockito.Mockito;
35 import java.util.Collections;
36 import java.util.Collection;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.HashSet;
40 import java.util.List;
43 import org.apache.kafka.clients.consumer.ConsumerRecord;
44 import org.apache.kafka.clients.consumer.ConsumerRecords;
45 import org.apache.kafka.clients.consumer.KafkaConsumer;
46 import org.apache.kafka.common.TopicPartition;
47 import org.jetbrains.annotations.NotNull;
49 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
51 public class SdcKafkaConsumerTest {
54 public void TestSubscribeSuccess(){
55 KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
56 SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
57 ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
59 String testTopics = "testTopic";
60 sdcKafkaConsumer.subscribe(testTopics);
61 verify(mockKafkaConsumer).subscribe((Collection<String>) captor.capture());
65 public void TestSubscribeAlreadySubscribed(){
66 KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
67 SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
68 ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
71 String testTopics = "testTopic";
72 Set<String> currentSubs = new HashSet<String>();
73 currentSubs.add(testTopics);
74 when(mockKafkaConsumer.subscription()).thenReturn(currentSubs);
75 sdcKafkaConsumer.subscribe(testTopics);
76 verify(mockKafkaConsumer, never()).subscribe((Collection<String>) captor.capture());
80 public void TestPollForMessagesForSpecificTopicSuccess(){
81 KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
84 String testTopic = "testTopic";
86 ConsumerRecords mockedPollResult = getTestConsumerRecords(testTopic);
88 when(mockKafkaConsumer.poll(any())).thenReturn(mockedPollResult);
90 DistributionEngineConfiguration config = getMockDistributionEngineConfiguration();
92 SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, config);
94 List<String> returned = sdcKafkaConsumer.poll(testTopic);
95 assertTrue(returned.size()==1);
96 assertTrue(returned.contains("testTopicValue"));
100 public void testSaslJaasConfigNotFound(){
102 KafkaException.class,
103 () -> new SdcKafkaConsumer(setTestDistributionEngineConfigs()),
104 "Sasl Jaas Config should not be found, so expected a KafkaException"
109 private DistributionEngineConfiguration getMockDistributionEngineConfiguration() {
110 DistributionEngineConfiguration config = new DistributionEngineConfiguration();
111 DistributionEngineConfiguration.DistributionStatusTopicConfig mockStatusTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
112 mockStatusTopic.setPollingIntervalSec(1);
113 config.setDistributionStatusTopic(mockStatusTopic);
118 private ConsumerRecords getTestConsumerRecords(String testTopics) {
119 Map map = new HashMap<Integer, ConsumerRecord>();
121 ConsumerRecord consumerRecord = new ConsumerRecord(testTopics, 0, 0, "", "testTopicValue");
123 List<ConsumerRecord> consumerRecordList = new ArrayList<>();
124 consumerRecordList.add(consumerRecord);
125 TopicPartition topicPartition = new TopicPartition(testTopics, 0);
126 map.put(topicPartition, consumerRecordList);
128 ConsumerRecords mockedPollResult = new ConsumerRecords(map);
129 return mockedPollResult;
132 private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
133 DistributionEngineConfiguration.DistributionStatusTopicConfig dsTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
134 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
135 String testBootstrapServers = "TestBootstrapServer";
136 dsTopic.setConsumerGroup("consumerGroup");
137 dsTopic.setConsumerId("consumerId");
139 deConfiguration.setKafkaBootStrapServers(testBootstrapServers);
140 deConfiguration.setDistributionStatusTopic(dsTopic);
141 return deConfiguration;