2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.components.kafka;
22 import com.google.gson.JsonSyntaxException;
23 import fj.data.Either;
24 import org.apache.kafka.common.KafkaException;
25 import org.junit.jupiter.api.BeforeEach;
26 import org.junit.jupiter.api.Test;
27 import org.junit.jupiter.api.extension.ExtendWith;
28 import org.mockito.Mock;
29 import org.mockito.junit.jupiter.MockitoExtension;
30 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
31 import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
32 import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
33 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
35 import java.util.ArrayList;
36 import java.util.List;
38 import static org.junit.jupiter.api.Assertions.assertEquals;
39 import static org.junit.jupiter.api.Assertions.assertFalse;
40 import static org.junit.jupiter.api.Assertions.assertTrue;
41 import static org.mockito.ArgumentMatchers.any;
42 import static org.mockito.Mockito.doThrow;
43 import static org.mockito.Mockito.when;
45 @ExtendWith(MockitoExtension.class)
46 class KafkaHandlerTest {
49 private SdcKafkaConsumer mockSdcKafkaConsumer;
52 private SdcKafkaProducer mockSdcKafkaProducer;
54 private KafkaHandler kafkaHandler;
58 kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
62 void testIsKafkaActiveTrue() {
63 assertTrue(kafkaHandler.isKafkaActive());
67 void testIsKafkaActiveFalse() {
68 kafkaHandler.setKafkaActive(false);
69 assertFalse(kafkaHandler.isKafkaActive());
73 void testFetchFromTopicSuccess() {
74 String testTopic = "testTopic";
75 List<String> mockedReturnedMessages = new ArrayList<>();
76 mockedReturnedMessages.add("message1");
77 mockedReturnedMessages.add("message2");
78 when(mockSdcKafkaConsumer.poll(any())).thenReturn(mockedReturnedMessages);
79 Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
80 Iterable<String> actualReturnedMessages = response.left().value();
81 assertTrue(response.isLeft());
82 assertEquals(actualReturnedMessages, mockedReturnedMessages);
86 void testFetchFromTopicFail() {
87 String testTopic = "testTopic";
88 when(mockSdcKafkaConsumer.poll(any())).thenThrow(new KafkaException());
89 Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
90 CambriaErrorResponse responseValue = response.right().value();
91 assertTrue(response.isRight());
92 assertEquals(responseValue.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
96 void testSendNotificationSuccess() {
97 String testTopic = "testTopic";
98 INotificationData testData = new NotificationDataImpl();
99 CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
100 assertEquals(response.getOperationStatus(), CambriaOperationStatus.OK);
101 assertEquals(response.getHttpCode(), 200);
105 void testSendNotificationKafkaException() {
106 String testTopic = "testTopic";
107 INotificationData testData = new NotificationDataImpl();
108 doThrow(KafkaException.class).when(mockSdcKafkaProducer).send(any(), any());
109 CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
110 assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
111 assertEquals(response.getHttpCode(), 500);
115 void testSendNotificationJsonSyntaxException() {
116 String testTopic = "testTopic";
117 INotificationData testData = new NotificationDataImpl();
118 doThrow(JsonSyntaxException.class).when(mockSdcKafkaProducer).send(any(), any());
119 CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
120 assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
121 assertEquals(response.getHttpCode(), 500);
125 void testSendNotificationFlushException() {
126 String testTopic = "testTopic";
127 INotificationData testData = new NotificationDataImpl();
128 doThrow(KafkaException.class).when(mockSdcKafkaProducer).flush();
129 CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
130 assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
131 assertEquals(response.getHttpCode(), 500);