[SDC-BE] Add kafka ssl config
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / kafka / SdcKafkaProducerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.kafka;
21
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.mockito.ArgumentMatchers.any;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.when;
26
27 import org.apache.kafka.clients.producer.KafkaProducer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.apache.kafka.common.KafkaException;
30 import org.junit.jupiter.api.Test;
31 import org.junitpioneer.jupiter.SetEnvironmentVariable;
32 import org.mockito.ArgumentCaptor;
33 import org.mockito.Mockito;
34 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
35
36 @SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;")
37 public class SdcKafkaProducerTest {
38
39     @Test
40     public void TestSendSuccess(){
41         KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
42         SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
43         ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
44         sdcKafkaProducer.send("testMessage", "testTopic");
45
46
47         verify(mockKafkaProducer).send(captor.capture());
48     }
49
50     @Test
51     public void testFlushSuccess(){
52         KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
53         SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
54         sdcKafkaProducer.flush();
55
56         verify(mockKafkaProducer).flush();
57     }
58
59     @Test
60     public void testSendFail(){
61         KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
62         SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
63
64         when(mockKafkaProducer.send(any())).thenThrow(new KafkaException());
65
66         assertThrows(
67             KafkaException.class,
68             () ->   sdcKafkaProducer.send("testMessage", "testTopic"),
69             "Expected a KafkaException thrown on KafkaProducer Send");
70     }
71
72     @Test
73     public void testSaslJaasConfigNotFound(){
74         assertThrows(
75             KafkaException.class,
76             () ->  new SdcKafkaProducer(setTestDistributionEngineConfigs()),
77             "Sasl Jaas Config should not be found, so expected a KafkaException"
78         );
79     }
80
81     private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
82         DistributionEngineConfiguration.DistributionStatusTopicConfig dStatusTopicConfig = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
83         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
84         deConfiguration.setKafkaBootStrapServers("TestBootstrapServer");
85         dStatusTopicConfig.setConsumerId("consumerId");
86
87         deConfiguration.setDistributionStatusTopic(dStatusTopicConfig);
88         deConfiguration.getDistributionStatusTopic().getConsumerId();
89         return deConfiguration;
90     }
91 }