Revert "[SDC-BE] Add kafka ssl config"
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / kafka / SdcKafkaConsumerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.kafka;
21
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.junit.jupiter.api.Assertions.assertTrue;
24
25 import org.apache.kafka.common.KafkaException;
26 import org.junit.jupiter.api.Test;
27
28 import static org.mockito.ArgumentMatchers.any;
29 import static org.mockito.Mockito.verify;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.when;
32 import org.mockito.ArgumentCaptor;
33 import org.mockito.Mockito;
34
35 import java.util.Collections;
36 import java.util.Collection;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.HashSet;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Set;
43 import org.apache.kafka.clients.consumer.ConsumerRecord;
44 import org.apache.kafka.clients.consumer.ConsumerRecords;
45 import org.apache.kafka.clients.consumer.KafkaConsumer;
46 import org.apache.kafka.common.TopicPartition;
47 import org.jetbrains.annotations.NotNull;
48
49 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
50
51 public class SdcKafkaConsumerTest {
52
53     @Test
54     public void TestSubscribeSuccess(){
55         KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
56         SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
57         ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
58
59         String testTopics = "testTopic";
60         sdcKafkaConsumer.subscribe(testTopics);
61         verify(mockKafkaConsumer).subscribe((Collection<String>) captor.capture());
62     }
63
64     @Test
65     public void TestSubscribeAlreadySubscribed(){
66         KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
67         SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
68         ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
69
70
71         String testTopics = "testTopic";
72         Set<String> currentSubs = new HashSet<String>();
73         currentSubs.add(testTopics);
74         when(mockKafkaConsumer.subscription()).thenReturn(currentSubs);
75         sdcKafkaConsumer.subscribe(testTopics);
76         verify(mockKafkaConsumer, never()).subscribe((Collection<String>) captor.capture());
77     }
78
79     @Test
80     public void TestPollForMessagesForSpecificTopicSuccess(){
81         KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
82
83
84         String testTopic = "testTopic";
85
86         ConsumerRecords mockedPollResult = getTestConsumerRecords(testTopic);
87
88         when(mockKafkaConsumer.poll(any())).thenReturn(mockedPollResult);
89
90         DistributionEngineConfiguration config = getMockDistributionEngineConfiguration();
91
92         SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, config);
93
94         List<String> returned = sdcKafkaConsumer.poll(testTopic);
95         assertTrue(returned.size()==1);
96         assertTrue(returned.contains("testTopicValue"));
97     }
98
99     @Test
100     public void testSaslJaasConfigNotFound(){
101         assertThrows(
102             KafkaException.class,
103             () ->  new SdcKafkaConsumer(setTestDistributionEngineConfigs()),
104             "Sasl Jaas Config should not be found, so expected a KafkaException"
105         );
106     }
107
108     @NotNull
109     private DistributionEngineConfiguration getMockDistributionEngineConfiguration() {
110         DistributionEngineConfiguration config = new DistributionEngineConfiguration();
111         DistributionEngineConfiguration.DistributionStatusTopicConfig mockStatusTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
112         mockStatusTopic.setPollingIntervalSec(1);
113         config.setDistributionStatusTopic(mockStatusTopic);
114         return config;
115     }
116
117     @NotNull
118     private ConsumerRecords getTestConsumerRecords(String testTopics) {
119         Map map = new HashMap<Integer, ConsumerRecord>();
120
121         ConsumerRecord consumerRecord = new ConsumerRecord(testTopics, 0, 0, "", "testTopicValue");
122
123         List<ConsumerRecord> consumerRecordList = new ArrayList<>();
124         consumerRecordList.add(consumerRecord);
125         TopicPartition topicPartition = new TopicPartition(testTopics, 0);
126         map.put(topicPartition, consumerRecordList);
127
128         ConsumerRecords mockedPollResult = new ConsumerRecords(map);
129         return mockedPollResult;
130     }
131
132     private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
133         DistributionEngineConfiguration.DistributionStatusTopicConfig dsTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
134         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
135         String testBootstrapServers = "TestBootstrapServer";
136         dsTopic.setConsumerGroup("consumerGroup");
137         dsTopic.setConsumerId("consumerId");
138
139         deConfiguration.setKafkaBootStrapServers(testBootstrapServers);
140         deConfiguration.setDistributionStatusTopic(dsTopic);
141         return deConfiguration;
142     }
143 }