[SDC-BE] Add kafka ssl config
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / kafka / SdcKafkaConsumerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.kafka;
21
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.junit.jupiter.api.Assertions.assertTrue;
24 import static org.mockito.ArgumentMatchers.any;
25 import static org.mockito.Mockito.never;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.when;
28
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import org.apache.kafka.clients.consumer.ConsumerRecord;
38 import org.apache.kafka.clients.consumer.ConsumerRecords;
39 import org.apache.kafka.clients.consumer.KafkaConsumer;
40 import org.apache.kafka.common.KafkaException;
41 import org.apache.kafka.common.TopicPartition;
42 import org.jetbrains.annotations.NotNull;
43 import org.junit.jupiter.api.Test;
44 import org.junitpioneer.jupiter.SetEnvironmentVariable;
45 import org.mockito.ArgumentCaptor;
46 import org.mockito.Mockito;
47 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
48
49 @SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;")
50 public class SdcKafkaConsumerTest {
51
52     @Test
53     public void TestSubscribeSuccess(){
54         KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
55         SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
56         ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
57
58         String testTopics = "testTopic";
59         sdcKafkaConsumer.subscribe(testTopics);
60         verify(mockKafkaConsumer).subscribe((Collection<String>) captor.capture());
61     }
62
63     @Test
64     public void TestSubscribeAlreadySubscribed(){
65         KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
66         SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
67         ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
68
69
70         String testTopics = "testTopic";
71         Set<String> currentSubs = new HashSet<String>();
72         currentSubs.add(testTopics);
73         when(mockKafkaConsumer.subscription()).thenReturn(currentSubs);
74         sdcKafkaConsumer.subscribe(testTopics);
75         verify(mockKafkaConsumer, never()).subscribe((Collection<String>) captor.capture());
76     }
77
78     @Test
79     public void TestPollForMessagesForSpecificTopicSuccess(){
80         KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
81
82
83         String testTopic = "testTopic";
84
85         ConsumerRecords mockedPollResult = getTestConsumerRecords(testTopic);
86
87         when(mockKafkaConsumer.poll(any())).thenReturn(mockedPollResult);
88
89         DistributionEngineConfiguration config = getMockDistributionEngineConfiguration();
90
91         SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, config);
92
93         List<String> returned = sdcKafkaConsumer.poll(testTopic);
94         assertTrue(returned.size()==1);
95         assertTrue(returned.contains("testTopicValue"));
96     }
97
98     @Test
99     public void testSaslJaasConfigNotFound(){
100         assertThrows(
101             KafkaException.class,
102             () ->  new SdcKafkaConsumer(setTestDistributionEngineConfigs()),
103             "Sasl Jaas Config should not be found, so expected a KafkaException"
104         );
105     }
106
107     @NotNull
108     private DistributionEngineConfiguration getMockDistributionEngineConfiguration() {
109         DistributionEngineConfiguration config = new DistributionEngineConfiguration();
110         DistributionEngineConfiguration.DistributionStatusTopicConfig mockStatusTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
111         mockStatusTopic.setPollingIntervalSec(1);
112         config.setDistributionStatusTopic(mockStatusTopic);
113         return config;
114     }
115
116     @NotNull
117     private ConsumerRecords getTestConsumerRecords(String testTopics) {
118         Map map = new HashMap<Integer, ConsumerRecord>();
119
120         ConsumerRecord consumerRecord = new ConsumerRecord(testTopics, 0, 0, "", "testTopicValue");
121
122         List<ConsumerRecord> consumerRecordList = new ArrayList<>();
123         consumerRecordList.add(consumerRecord);
124         TopicPartition topicPartition = new TopicPartition(testTopics, 0);
125         map.put(topicPartition, consumerRecordList);
126
127         ConsumerRecords mockedPollResult = new ConsumerRecords(map);
128         return mockedPollResult;
129     }
130
131     private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
132         DistributionEngineConfiguration.DistributionStatusTopicConfig dsTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
133         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
134         String testBootstrapServers = "TestBootstrapServer";
135         dsTopic.setConsumerGroup("consumerGroup");
136         dsTopic.setConsumerId("consumerId");
137
138         deConfiguration.setKafkaBootStrapServers(testBootstrapServers);
139         deConfiguration.setDistributionStatusTopic(dsTopic);
140         return deConfiguration;
141     }
142 }