/*- * ============LICENSE_START======================================================= * SDC * ================================================================================ * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.openecomp.sdc.be.components.kafka; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.junit.jupiter.api.Test; import org.junitpioneer.jupiter.SetEnvironmentVariable; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; @SetEnvironmentVariable(key = "SASL_JAAS_CONFIG", value = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;") public class SdcKafkaProducerTest { @Test public void TestSendSuccess(){ KafkaProducer mockKafkaProducer = Mockito.mock(KafkaProducer.class); SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer); ArgumentCaptor captor = ArgumentCaptor.forClass(ProducerRecord.class); sdcKafkaProducer.send("testMessage", "testTopic"); verify(mockKafkaProducer).send(captor.capture()); } @Test public void testFlushSuccess(){ KafkaProducer mockKafkaProducer = Mockito.mock(KafkaProducer.class); SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer); sdcKafkaProducer.flush(); verify(mockKafkaProducer).flush(); } @Test public void testSendFail(){ KafkaProducer mockKafkaProducer = Mockito.mock(KafkaProducer.class); SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer); when(mockKafkaProducer.send(any())).thenThrow(new KafkaException()); assertThrows( KafkaException.class, () -> sdcKafkaProducer.send("testMessage", "testTopic"), "Expected a KafkaException thrown on KafkaProducer Send"); } @Test public void testSaslJaasConfigNotFound(){ assertThrows( KafkaException.class, () -> new SdcKafkaProducer(setTestDistributionEngineConfigs()), "Sasl Jaas Config should not be found, so expected a KafkaException" ); } private DistributionEngineConfiguration setTestDistributionEngineConfigs(){ DistributionEngineConfiguration.DistributionStatusTopicConfig dStatusTopicConfig = new DistributionEngineConfiguration.DistributionStatusTopicConfig(); DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration(); deConfiguration.setKafkaBootStrapServers("TestBootstrapServer"); dStatusTopicConfig.setConsumerId("consumerId"); deConfiguration.setDistributionStatusTopic(dStatusTopicConfig); deConfiguration.getDistributionStatusTopic().getConsumerId(); return deConfiguration; } }