2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.kafka;
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.junit.jupiter.api.Assertions.assertTrue;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.Mockito.never;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.when;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
37 import org.apache.kafka.clients.consumer.ConsumerRecord;
38 import org.apache.kafka.clients.consumer.ConsumerRecords;
39 import org.apache.kafka.clients.consumer.KafkaConsumer;
40 import org.apache.kafka.common.KafkaException;
41 import org.apache.kafka.common.TopicPartition;
42 import org.jetbrains.annotations.NotNull;
43 import org.junit.jupiter.api.Test;
44 import org.junitpioneer.jupiter.SetEnvironmentVariable;
45 import org.mockito.ArgumentCaptor;
46 import org.mockito.Mockito;
47 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
49 @SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;")
50 public class SdcKafkaConsumerTest {
53 public void TestSubscribeSuccess(){
54 KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
55 SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
56 ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
58 String testTopics = "testTopic";
59 sdcKafkaConsumer.subscribe(testTopics);
60 verify(mockKafkaConsumer).subscribe((Collection<String>) captor.capture());
64 public void TestSubscribeAlreadySubscribed(){
65 KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
66 SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
67 ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
70 String testTopics = "testTopic";
71 Set<String> currentSubs = new HashSet<String>();
72 currentSubs.add(testTopics);
73 when(mockKafkaConsumer.subscription()).thenReturn(currentSubs);
74 sdcKafkaConsumer.subscribe(testTopics);
75 verify(mockKafkaConsumer, never()).subscribe((Collection<String>) captor.capture());
79 public void TestPollForMessagesForSpecificTopicSuccess(){
80 KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
83 String testTopic = "testTopic";
85 ConsumerRecords mockedPollResult = getTestConsumerRecords(testTopic);
87 when(mockKafkaConsumer.poll(any())).thenReturn(mockedPollResult);
89 DistributionEngineConfiguration config = getMockDistributionEngineConfiguration();
91 SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, config);
93 List<String> returned = sdcKafkaConsumer.poll(testTopic);
94 assertTrue(returned.size()==1);
95 assertTrue(returned.contains("testTopicValue"));
99 public void testSaslJaasConfigNotFound(){
101 KafkaException.class,
102 () -> new SdcKafkaConsumer(setTestDistributionEngineConfigs()),
103 "Sasl Jaas Config should not be found, so expected a KafkaException"
108 private DistributionEngineConfiguration getMockDistributionEngineConfiguration() {
109 DistributionEngineConfiguration config = new DistributionEngineConfiguration();
110 DistributionEngineConfiguration.DistributionStatusTopicConfig mockStatusTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
111 mockStatusTopic.setPollingIntervalSec(1);
112 config.setDistributionStatusTopic(mockStatusTopic);
117 private ConsumerRecords getTestConsumerRecords(String testTopics) {
118 Map map = new HashMap<Integer, ConsumerRecord>();
120 ConsumerRecord consumerRecord = new ConsumerRecord(testTopics, 0, 0, "", "testTopicValue");
122 List<ConsumerRecord> consumerRecordList = new ArrayList<>();
123 consumerRecordList.add(consumerRecord);
124 TopicPartition topicPartition = new TopicPartition(testTopics, 0);
125 map.put(topicPartition, consumerRecordList);
127 ConsumerRecords mockedPollResult = new ConsumerRecords(map);
128 return mockedPollResult;
131 private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
132 DistributionEngineConfiguration.DistributionStatusTopicConfig dsTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
133 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
134 String testBootstrapServers = "TestBootstrapServer";
135 dsTopic.setConsumerGroup("consumerGroup");
136 dsTopic.setConsumerId("consumerId");
138 deConfiguration.setKafkaBootStrapServers(testBootstrapServers);
139 deConfiguration.setDistributionStatusTopic(dsTopic);
140 return deConfiguration;