2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2023 Nordix Foundation. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dmaap.kafka;
23 import com.salesforce.kafka.test.KafkaTestCluster;
24 import com.salesforce.kafka.test.KafkaTestUtils;
25 import com.salesforce.kafka.test.listeners.BrokerListener;
26 import com.salesforce.kafka.test.listeners.SaslPlainListener;
27 import io.github.netmikey.logunit.api.LogCapturer;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Properties;
31 import org.apache.kafka.clients.producer.RecordMetadata;
32 import org.junit.jupiter.api.AfterAll;
33 import org.junit.jupiter.api.Assertions;
34 import org.junit.jupiter.api.BeforeAll;
35 import org.junit.jupiter.api.Test;
36 import org.junit.jupiter.api.extension.RegisterExtension;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 class OnapKafkaClientTest {
43 LogCapturer producerLogs = LogCapturer.create().captureForType(OnapKafkaProducer.class);
46 LogCapturer clientLogs = LogCapturer.create().captureForType(OnapKafkaClient.class);
48 private static final Logger logger = LoggerFactory.getLogger(OnapKafkaClientTest.class);
50 private static TestConfiguration configuration = new TestConfiguration("application.properties");
51 private static final List<String> consumerTopics = configuration.getConsumerTopics();
52 private static KafkaTestCluster kafkaTestCluster = null;
55 static void before() throws Exception {
57 KafkaTestUtils utils = new KafkaTestUtils(kafkaTestCluster);
58 for (String topic: consumerTopics) {
59 utils.createTopic(topic, 1, (short) 1);
61 configuration.setBootstrapServers(Collections.singletonList(kafkaTestCluster.getKafkaConnectString()));
65 static void after() throws Exception {
66 kafkaTestCluster.close();
67 kafkaTestCluster.stop();
71 void whenProducingCorrectRecordsArePresent() {
72 OnapKafkaClient handler = new OnapKafkaClient(configuration);
73 Assertions.assertEquals(handler.fetchFromTopic(consumerTopics.get(0)).size(), 0);
74 handler.publishToTopic(consumerTopics.get(0), "blahblahblahblah");
75 handler.publishToTopic(consumerTopics.get(1), "iaerugfoiaeurgfoaiuerf");
76 List<String> eventsFrom1 = handler.fetchFromTopic(consumerTopics.get(0));
77 Assertions.assertEquals(1, eventsFrom1.size());
78 handler.fetchFromTopic(consumerTopics.get(0));
79 List<String> events2 = handler.fetchFromTopic(consumerTopics.get(1));
80 Assertions.assertEquals( 0, events2.size());
84 void whenConsumingFromInvalidTopicEmptyListIsReturned() {
85 OnapKafkaClient handler = new OnapKafkaClient(configuration);
86 List<String> events = handler.fetchFromTopic("invalidTopic");
87 Assertions.assertEquals(0, events.size());
91 void whenPublishingToInvalidTopicExceptionIsLogged() {
92 OnapKafkaClient handler = new OnapKafkaClient(configuration);
93 RecordMetadata metadata = handler.publishToTopic("invalid.topic", "blahblahblahblah");
94 producerLogs.assertContains("Failed the send data");
95 Assertions.assertNull(metadata);
99 void whenSubscribingToInvalidTopicExceptionIsLogged() {
100 configuration = new TestConfiguration("invalid-application.properties");
101 OnapKafkaClient handler = new OnapKafkaClient(configuration);
102 handler.fetchFromTopic("bvserbatb");
103 clientLogs.assertContains("Consumer has not been initialised");
104 configuration.setConsumerTopics(consumerTopics);
108 private static void startKafkaService() throws Exception {
109 final BrokerListener listener = new SaslPlainListener()
110 .withUsername("kafkaclient")
111 .withPassword("client-secret");
112 final Properties brokerProperties = new Properties();
113 brokerProperties.setProperty("auto.create.topics.enable", "false");
114 kafkaTestCluster = new KafkaTestCluster(
117 Collections.singletonList(listener)
119 kafkaTestCluster.start();
120 logger.debug("Cluster started at: {}", kafkaTestCluster.getKafkaConnectString());
124 System.setProperty("java.security.auth.login.config", "src/test/resources/jaas.conf");