Revert "[SDC-BE] Add kafka ssl config"
[sdc.git] / catalog-be / src / test / java / org / openecomp / sdc / be / components / kafka / KafkaHandlerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.kafka;
21
22 import static org.junit.jupiter.api.Assertions.assertEquals;
23 import static org.junit.jupiter.api.Assertions.assertFalse;
24 import static org.junit.jupiter.api.Assertions.assertNotNull;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.when;
29
30 import com.google.gson.JsonSyntaxException;
31 import org.apache.kafka.common.KafkaException;
32 import org.junit.jupiter.api.extension.ExtendWith;
33 import org.mockito.junit.jupiter.MockitoExtension;
34 import org.junit.jupiter.api.Test;
35 import org.mockito.Mock;
36
37 import java.util.ArrayList;
38 import fj.data.Either;
39 import java.util.List;
40
41 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
42 import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
43 import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
44 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
45
46
47 @ExtendWith(MockitoExtension.class)
48 public class KafkaHandlerTest {
49
50     @Mock
51     private SdcKafkaConsumer mockSdcKafkaConsumer;
52
53     @Mock
54     private SdcKafkaProducer mockSdcKafkaProducer;
55
56     private KafkaHandler kafkaHandler;
57
58     @Test
59     public void testIsKafkaActiveTrue(){
60         KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
61         assertTrue(kafkaHandler.isKafkaActive());
62     }
63
64     @Test
65     public void testIsKafkaActiveFalse(){
66         KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
67         kafkaHandler.setKafkaActive(false);
68         assertFalse(kafkaHandler.isKafkaActive());
69     }
70
71     @Test
72     public void testFetchFromTopicSuccess(){
73         String testTopic = "testTopic";
74         List<String> mockedReturnedMessages = new ArrayList<>();
75         mockedReturnedMessages.add("message1");
76         mockedReturnedMessages.add("message2");
77         KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
78         when(mockSdcKafkaConsumer.poll(any())).thenReturn(mockedReturnedMessages);
79         Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
80         Iterable<String> actualReturnedMessages = response.left().value();
81         assertTrue(response.isLeft());
82         assertEquals(actualReturnedMessages, mockedReturnedMessages);
83     }
84
85     @Test
86     public void testFetchFromTopicFail(){
87         String testTopic = "testTopic";
88         KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
89         when(mockSdcKafkaConsumer.poll(any())).thenThrow(new KafkaException());
90         Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
91         CambriaErrorResponse responseValue = response.right().value();
92         assertTrue(response.isRight());
93         assertEquals(responseValue.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
94     }
95
96     @Test
97     public void testSendNotificationSuccess(){
98         String testTopic = "testTopic";
99         KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
100         INotificationData testData = new NotificationDataImpl();
101         CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
102         assertEquals(response.getOperationStatus(), CambriaOperationStatus.OK);
103         assertEquals(response.getHttpCode(), 200);
104     }
105
106     @Test
107     public void testSendNotificationKafkaException(){
108         String testTopic = "testTopic";
109         KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
110         INotificationData testData = new NotificationDataImpl();
111         doThrow(KafkaException.class).when(mockSdcKafkaProducer).send(any(), any());
112         CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
113         assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
114         assertEquals(response.getHttpCode(), 500);
115     }
116
117     @Test
118     public void testSendNotificationJsonSyntaxException(){
119         String testTopic = "testTopic";
120         KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
121         INotificationData testData = new NotificationDataImpl();
122         doThrow(JsonSyntaxException.class).when(mockSdcKafkaProducer).send(any(), any());
123         CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
124         assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
125         assertEquals(response.getHttpCode(), 500);
126     }
127
128     @Test
129     public void testSendNotificationFlushException(){
130         String testTopic = "testTopic";
131         KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
132         INotificationData testData = new NotificationDataImpl();
133         doThrow(KafkaException.class).when(mockSdcKafkaProducer).flush();
134         CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
135         assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
136         assertEquals(response.getHttpCode(), 500);
137     }
138 }