2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.kafka;
22 import static org.junit.jupiter.api.Assertions.assertThrows;
23 import static org.mockito.ArgumentMatchers.any;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.when;
27 import org.apache.kafka.clients.producer.KafkaProducer;
28 import org.apache.kafka.clients.producer.ProducerRecord;
29 import org.apache.kafka.common.KafkaException;
30 import org.junit.jupiter.api.Test;
31 import org.junitpioneer.jupiter.SetEnvironmentVariable;
32 import org.mockito.ArgumentCaptor;
33 import org.mockito.Mockito;
34 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
36 @SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;")
37 public class SdcKafkaProducerTest {
40 public void TestSendSuccess(){
41 KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
42 SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
43 ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
44 sdcKafkaProducer.send("testMessage", "testTopic");
47 verify(mockKafkaProducer).send(captor.capture());
51 public void testFlushSuccess(){
52 KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
53 SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
54 sdcKafkaProducer.flush();
56 verify(mockKafkaProducer).flush();
60 public void testSendFail(){
61 KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
62 SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
64 when(mockKafkaProducer.send(any())).thenThrow(new KafkaException());
68 () -> sdcKafkaProducer.send("testMessage", "testTopic"),
69 "Expected a KafkaException thrown on KafkaProducer Send");
73 public void testSaslJaasConfigNotFound(){
76 () -> new SdcKafkaProducer(setTestDistributionEngineConfigs()),
77 "Sasl Jaas Config should not be found, so expected a KafkaException"
81 private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
82 DistributionEngineConfiguration.DistributionStatusTopicConfig dStatusTopicConfig = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
83 DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
84 deConfiguration.setKafkaBootStrapServers("TestBootstrapServer");
85 dStatusTopicConfig.setConsumerId("consumerId");
87 deConfiguration.setDistributionStatusTopic(dStatusTopicConfig);
88 deConfiguration.getDistributionStatusTopic().getConsumerId();
89 return deConfiguration;