[SDC] Add kafka native messaging
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / kafka / SdcKafkaProducerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.kafka;
21
22 import static org.junit.jupiter.api.Assertions.assertEquals;
23 import static org.junit.jupiter.api.Assertions.assertThrows;
24 import org.junit.jupiter.api.Test;
25
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.verify;
29 import static org.mockito.Mockito.when;
30 import org.mockito.ArgumentCaptor;
31 import org.mockito.Mockito;
32
33 import org.apache.kafka.clients.producer.ProducerRecord;
34 import org.apache.kafka.clients.producer.KafkaProducer;
35 import org.apache.kafka.common.KafkaException;
36
37 import org.openecomp.sdc.be.catalog.api.IStatus;
38 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
39
40 public class SdcKafkaProducerTest {
41
42     @Test
43     public void TestSendSuccess(){
44         KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
45         SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
46         ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
47         sdcKafkaProducer.send("testMessage", "testTopic");
48
49
50         verify(mockKafkaProducer).send(captor.capture());
51     }
52
53     @Test
54     public void testFlushSuccess(){
55         KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
56         SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
57         sdcKafkaProducer.flush();
58
59         verify(mockKafkaProducer).flush();
60     }
61
62     @Test
63     public void testSendFail(){
64         KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
65         SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
66
67         when(mockKafkaProducer.send(any())).thenThrow(new KafkaException());
68
69         assertThrows(
70             KafkaException.class,
71             () ->   sdcKafkaProducer.send("testMessage", "testTopic"),
72             "Expected a KafkaException thrown on KafkaProducer Send");
73     }
74
75     @Test
76     public void testSaslJaasConfigNotFound(){
77         assertThrows(
78             KafkaException.class,
79             () ->  new SdcKafkaProducer(setTestDistributionEngineConfigs()),
80             "Sasl Jaas Config should not be found, so expected a KafkaException"
81         );
82     }
83
84     private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
85         DistributionEngineConfiguration.DistributionStatusTopicConfig dStatusTopicConfig = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
86         DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
87         deConfiguration.setKafkaBootStrapServers("TestBootstrapServer");
88         dStatusTopicConfig.setConsumerId("consumerId");
89
90         deConfiguration.setDistributionStatusTopic(dStatusTopicConfig);
91         deConfiguration.getDistributionStatusTopic().getConsumerId();
92         return deConfiguration;
93     }
94 }