Publish and Subscribe to Kafka topic 30/130530/9
authorSirisha_Manchikanti <sirisha.manchikanti@est.tech>
Thu, 25 Aug 2022 14:00:58 +0000 (15:00 +0100)
committerSirisha_Manchikanti <sirisha.manchikanti@est.tech>
Wed, 21 Sep 2022 09:54:56 +0000 (10:54 +0100)
Issue-ID: POLICY-4134
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: Idefa5b6f3cb702a4b478b76570717e73214d235a

16 files changed:
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicTestBase.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json
policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json

index 47279d4..45a8be3 100644 (file)
@@ -65,6 +65,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
             }
 
             var kafkaTopicSource = makeSource(busTopicParams);
+
             kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource);
 
             return kafkaTopicSource;
index 8d88b0d..ee41150 100644 (file)
@@ -32,6 +32,8 @@ import java.security.GeneralSecurityException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -42,6 +44,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
@@ -235,10 +239,13 @@ public interface BusConsumer {
          */
         private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
 
+        private static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+
         /**
          * Kafka consumer.
          */
-        private KafkaConsumer<String, String> consumer;
+        protected KafkaConsumer<String, String> consumer;
+        protected Properties kafkaProps;
 
         /**
          * Kafka Consumer Wrapper.
@@ -250,20 +257,67 @@ public interface BusConsumer {
          * @throws GeneralSecurityException - Security exception
          * @throws MalformedURLException - Malformed URL exception
          */
-        public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
+        public KafkaConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
             super(busTopicParams);
+
+            if (busTopicParams.isTopicInvalid()) {
+                throw new IllegalArgumentException("No topic for Kafka");
+            }
+
+            //Setup Properties for consumer
+            kafkaProps = new Properties();
+            kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                    busTopicParams.getServers().get(0));
+
+            if (busTopicParams.isAdditionalPropsValid()) {
+                for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+                    kafkaProps.put(entry.getKey(), entry.getValue());
+                }
+            }
+
+            if (kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) == null) {
+                kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+            }
+            if (kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) == null) {
+                kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+            }
+            if (kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
+                kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, busTopicParams.getConsumerGroup());
+            }
+            consumer = new KafkaConsumer<>(kafkaProps);
+            //Subscribe to the topic
+            consumer.subscribe(Arrays.asList(busTopicParams.getTopic()));
         }
 
         @Override
         public Iterable<String> fetch() throws IOException {
-            // TODO: Not implemented yet
-            return new ArrayList<>();
+            ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(fetchTimeout));
+            if (records == null || records.count() <= 0) {
+                return Collections.emptyList();
+            }
+            List<String> messages = new ArrayList<>(records.count());
+            try {
+                for (TopicPartition partition : records.partitions()) {
+                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+                    for (ConsumerRecord<String, String> record : partitionRecords) {
+                        messages.add(record.value());
+                    }
+                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+                }
+            } catch (Exception e) {
+                logger.error("{}: cannot fetch because of {}", this, e.getMessage());
+                sleepAfterFetchFailure();
+                throw e;
+            }
+            return messages;
         }
 
         @Override
         public void close() {
             super.close();
             this.consumer.close();
+            logger.info("Kafka Consumer exited {}", this);
         }
 
         @Override
index e0df709..fe9bab2 100644 (file)
@@ -32,13 +32,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.record.CompressionType;
 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
@@ -155,22 +156,45 @@ public interface BusPublisher {
     public static class KafkaPublisherWrapper implements BusPublisher {
 
         private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+        private static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+
+        private String topic;
 
         /**
-         * The actual Kafka publisher.
+         * Kafka publisher.
          */
-        private final KafkaProducer producer;
+        private Producer<String, String> producer;
+        protected Properties kafkaProps;
 
         /**
-         * Constructor.
+         * Kafka Publisher Wrapper.
          *
          * @param busTopicParams topic parameters
          */
-        public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
-            // TODO Setting of topic parameters is not implemented yet.
-            //Setup Properties for Kafka Producer
-            Properties kafkaProps = new Properties();
-            this.producer = new KafkaProducer(kafkaProps);
+        protected KafkaPublisherWrapper(BusTopicParams busTopicParams) {
+
+            if (busTopicParams.isTopicInvalid()) {
+                throw new IllegalArgumentException("No topic for Kafka");
+            }
+
+            this.topic = busTopicParams.getTopic();
+
+            //Setup Properties for consumer
+            kafkaProps = new Properties();
+            kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, busTopicParams.getServers().get(0));
+            if (busTopicParams.isAdditionalPropsValid()) {
+                for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+                    kafkaProps.put(entry.getKey(), entry.getValue());
+                }
+            }
+            if (kafkaProps.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) == null) {
+                kafkaProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+            }
+            if (kafkaProps.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) == null) {
+                kafkaProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+            }
+
+            producer = new KafkaProducer<>(kafkaProps);
         }
 
         @Override
@@ -178,7 +202,18 @@ public interface BusPublisher {
             if (message == null) {
                 throw new IllegalArgumentException("No message provided");
             }
-            // TODO Sending messages is not implemented yet
+
+            try {
+                //Create the record
+                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
+                    UUID.randomUUID().toString(), message);
+
+                this.producer.send(record);
+                producer.flush();
+            } catch (Exception e) {
+                logger.warn("{}: SEND of {} cannot be performed because of {}", this, message, e.getMessage(), e);
+                return false;
+            }
             return true;
         }
 
@@ -186,7 +221,7 @@ public interface BusPublisher {
         public void close() {
             logger.info("{}: CLOSE", this);
 
-            try (this.producer) {
+            try {
                 this.producer.close();
             } catch (Exception e) {
                 logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
index b564229..6574d40 100644 (file)
@@ -18,6 +18,7 @@
 
 package org.onap.policy.common.endpoints.event.comm.bus.internal;
 
+import java.util.Map;
 import org.onap.policy.common.endpoints.event.comm.Topic;
 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
 import org.slf4j.Logger;
@@ -34,6 +35,8 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
      */
     private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
 
+    protected Map<String, String> additionalProps = null;
+
     /**
      * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
      * attributes.
@@ -47,6 +50,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
      */
     public InlineKafkaTopicSink(BusTopicParams busTopicParams) {
         super(busTopicParams);
+        this.additionalProps = busTopicParams.getAdditionalProps();
     }
 
     /**
@@ -59,6 +63,7 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
                 .servers(this.servers)
                 .topic(this.effectiveTopic)
                 .useHttps(this.useHttps)
+                .additionalProps(this.additionalProps)
                 .build());
         logger.info("{}: KAFKA SINK created", this);
     }
index b8362b8..2a651ee 100644 (file)
@@ -18,6 +18,8 @@
 
 package org.onap.policy.common.endpoints.event.comm.bus.internal;
 
+import java.net.MalformedURLException;
+import java.util.Map;
 import org.onap.policy.common.endpoints.event.comm.Topic;
 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
 
@@ -27,6 +29,8 @@ import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
  */
 public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
 
+    protected Map<String, String> additionalProps = null;
+
     /**
      * Constructor.
      *
@@ -35,19 +39,29 @@ public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource
      */
     public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) {
         super(busTopicParams);
-        this.init();
+        this.additionalProps = busTopicParams.getAdditionalProps();
+        try {
+            this.init();
+        } catch (Exception e) {
+            throw new IllegalArgumentException("ERROR during init in kafka-source: cannot create topic " + topic, e);
+        }
     }
 
     /**
      * Initialize the Cambria client.
      */
     @Override
-    public void init() {
-        this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder()
+    public void init() throws MalformedURLException {
+        BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
                 .servers(this.servers)
                 .topic(this.effectiveTopic)
-                .useHttps(this.useHttps)
-                .build());
+                .fetchTimeout(this.fetchTimeout)
+                .consumerGroup(this.consumerGroup)
+                .useHttps(this.useHttps);
+
+        this.consumer = new BusConsumer.KafkaConsumerWrapper(builder
+                        .additionalProps(this.additionalProps)
+                        .build());
     }
 
     @Override
index 49dff28..46a6c39 100644 (file)
@@ -129,7 +129,6 @@ public final class PolicyEndPointProperties {
     public static final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
     public static final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
 
-
     /* Topic Sink Values */
 
     /**
index 3e62f98..113a4bd 100644 (file)
@@ -24,9 +24,13 @@ package org.onap.policy.common.endpoints.utils;
 import com.google.re2j.Pattern;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
@@ -47,7 +51,7 @@ public class KafkaPropertyUtils {
     public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
 
         final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
-        //TODO More Kafka properties to be added
+
         return BusTopicParams.builder()
                     .servers(serverList)
                     .topic(topic)
index 1a815e1..a00879c 100644 (file)
@@ -32,7 +32,7 @@ import org.onap.policy.common.endpoints.parameters.TopicParameters;
 
 public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
 
-    public static final String SERVER = "my-server";
+    public static final String SERVER = "localhost:9092";
     public static final String TOPIC2 = "my-topic-2";
 
     @Getter
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactoryTest.java
new file mode 100644 (file)
index 0000000..c109e70
--- /dev/null
@@ -0,0 +1,192 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+
+public class KafkaTopicSinkFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSink> {
+
+    private SinkFactory factory;
+    public static final String KAFKA_SERVER = "localhost:9092";
+
+    /**
+     * Creates the object to be tested.
+     */
+    @Before
+    @Override
+    public void setUp() {
+        super.setUp();
+
+        factory = new SinkFactory();
+    }
+
+    @After
+    public void tearDown() {
+        factory.destroy();
+    }
+
+    @Test
+    @Override
+    public void testBuildBusTopicParams() {
+        super.testBuildBusTopicParams();
+        super.testBuildBusTopicParams_Ex();
+    }
+
+    @Test
+    @Override
+    public void testBuildListOfStringString() {
+        super.testBuildListOfStringString();
+
+        // check parameters that were used
+        BusTopicParams params = getLastParams();
+        assertEquals(false, params.isAllowSelfSignedCerts());
+    }
+
+    @Test
+    @Override
+    public void testBuildProperties() {
+        List<KafkaTopicSink> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
+        assertEquals(1, topics.size());
+        assertEquals(MY_TOPIC, topics.get(0).getTopic());
+        assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+        BusTopicParams params = getLastParams();
+        assertEquals(true, params.isManaged());
+        assertEquals(false, params.isUseHttps());
+        assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+        assertEquals(MY_TOPIC, params.getTopic());
+        assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
+        assertEquals(MY_PARTITION, params.getPartitionId());
+
+        List<KafkaTopicSink> topics2 = buildTopics(makePropBuilder().makeTopic(TOPIC3)
+            .removeTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX).build());
+        assertEquals(1, topics2.size());
+        assertEquals(TOPIC3, topics2.get(0).getTopic());
+        assertEquals(topics2.get(0).getTopic(), topics2.get(0).getEffectiveTopic());
+
+        initFactory();
+
+        assertEquals(1, buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build()).size());
+    }
+
+    @Test
+    @Override
+    public void testDestroyString_testGet_testInventory() {
+        super.testDestroyString_testGet_testInventory();
+        super.testDestroyString_Ex();
+    }
+
+    @Test
+    @Override
+    public void testDestroy() {
+        super.testDestroy();
+    }
+
+    @Test
+    public void testGet() {
+        super.testGet_Ex();
+    }
+
+    @Test
+    public void testToString() {
+        assertTrue(factory.toString().startsWith("IndexedKafkaTopicSinkFactory ["));
+    }
+
+    @Override
+    protected void initFactory() {
+        if (factory != null) {
+            factory.destroy();
+        }
+
+        factory = new SinkFactory();
+    }
+
+    @Override
+    protected List<KafkaTopicSink> buildTopics(Properties properties) {
+        return factory.build(properties);
+    }
+
+    @Override
+    protected KafkaTopicSink buildTopic(BusTopicParams params) {
+        return factory.build(params);
+    }
+
+    @Override
+    protected KafkaTopicSink buildTopic(List<String> servers, String topic) {
+        return factory.build(servers, topic);
+    }
+
+    @Override
+    protected void destroyFactory() {
+        factory.destroy();
+    }
+
+    @Override
+    protected void destroyTopic(String topic) {
+        factory.destroy(topic);
+    }
+
+    @Override
+    protected List<KafkaTopicSink> getInventory() {
+        return factory.inventory();
+    }
+
+    @Override
+    protected KafkaTopicSink getTopic(String topic) {
+        return factory.get(topic);
+    }
+
+    @Override
+    protected BusTopicParams getLastParams() {
+        return factory.params.getLast();
+    }
+
+    @Override
+    protected TopicPropertyBuilder makePropBuilder() {
+        return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SINK_TOPICS);
+    }
+
+    /**
+     * Factory that records the parameters of all of the sinks it creates.
+     */
+    private static class SinkFactory extends IndexedKafkaTopicSinkFactory {
+        private Deque<BusTopicParams> params = new LinkedList<>();
+
+        @Override
+        protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
+            params.add(busTopicParams);
+            return super.makeSink(busTopicParams);
+        }
+    }
+}
index 6fa80a4..3a62b4a 100644 (file)
@@ -24,7 +24,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
 
+import java.util.Arrays;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
@@ -40,6 +42,8 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
 
     private SourceFactory factory;
 
+    public static final String KAFKA_SERVER = "localhost:9092";
+
     /**
      * Creates the object to be tested.
      */
@@ -56,12 +60,6 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
         factory.destroy();
     }
 
-    @Test
-    @Override
-    public void testBuildBusTopicParams() {
-        super.testBuildBusTopicParams_Ex();
-    }
-
     @Test
     @Override
     public void testBuildProperties() {
@@ -71,14 +69,29 @@ public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<Kafka
         List<KafkaTopicSource> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
         assertEquals(1, topics.size());
         assertEquals(MY_TOPIC, topics.get(0).getTopic());
+        assertEquals(MY_EFFECTIVE_TOPIC, topics.get(0).getEffectiveTopic());
+
+        BusTopicParams params = getLastParams();
+        assertEquals(true, params.isManaged());
+        assertEquals(false, params.isUseHttps());
+        assertEquals(Arrays.asList(KAFKA_SERVER), params.getServers());
+        assertEquals(MY_TOPIC, params.getTopic());
+        assertEquals(MY_EFFECTIVE_TOPIC, params.getEffectiveTopic());
     }
 
     @Test
     @Override
     public void testDestroyString_testGet_testInventory() {
+        super.testDestroyString_testGet_testInventory();
         super.testDestroyString_Ex();
     }
 
+    @Test
+    @Override
+    public void testDestroy() {
+        super.testDestroy();
+    }
+
     @Test
     public void testGet() {
         super.testGet_Ex();
index 8b75fa3..bd88eec 100644 (file)
@@ -59,6 +59,8 @@ public class TopicTestBase {
 
     public static final String ROUTE_PROP = "routeOffer";
     public static final String MY_ROUTE = "my-route";
+    public static final String MY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+    public static final int KAFKA_PORT = 9092;
 
     /**
      * Message used within exceptions that are expected.
@@ -75,6 +77,11 @@ public class TopicTestBase {
      */
     protected List<String> servers;
 
+    /**
+     * Servers to be added to the parameter builder.
+     */
+    protected List<String> kafkaServers;
+
     /**
      * Parameter builder used to build topic parameters.
      */
@@ -89,13 +96,14 @@ public class TopicTestBase {
         addProps.put("my-key-B", "my-value-B");
 
         servers = Arrays.asList("svra", "svrb");
+        kafkaServers = Arrays.asList("localhost:9092", "10.1.2.3:9092");
 
         builder = makeBuilder();
     }
 
     /**
      * Makes a fully populated parameter builder.
-     * 
+     *
      * @return a new parameter builder
      */
     public TopicParamsBuilder makeBuilder() {
@@ -117,6 +125,39 @@ public class TopicTestBase {
                         .fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT)
                         .longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER)
                         .password(MY_PASS).port(MY_PORT).servers(servers).topic(MY_TOPIC)
-                        .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME);
+                        .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(true).userName(MY_USERNAME)
+                        .serializationProvider(MY_SERIALIZER);
+    }
+
+    /**
+     * Makes a fully populated parameter builder.
+     *
+     * @return a new parameter builder
+     */
+    public TopicParamsBuilder makeKafkaBuilder() {
+        addProps.clear();
+        String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule "
+            + "required username=abc password=abc serviceName=kafka;";
+        addProps.put("sasl.jaas.config", jaas);
+        addProps.put("sasl.mechanism", "SCRAM-SHA-512");
+        addProps.put("security.protocol", "SASL_PLAINTEXT");
+
+        return makeKafkaBuilder(addProps, kafkaServers);
+    }
+
+    /**
+     * Makes a fully populated parameter builder.
+     *
+     * @param addProps additional properties to be added to the builder
+     * @param servers servers to be added to the builder
+     * @return a new parameter builder
+     */
+    public TopicParamsBuilder makeKafkaBuilder(Map<String, String> addProps, List<String> servers) {
+
+        return BusTopicParams.builder().additionalProps(addProps).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME)
+                        .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV)
+                        .hostname(MY_HOST).partitionId(MY_PARTITION).partner(MY_PARTNER)
+                        .port(KAFKA_PORT).servers(servers).topic(MY_TOPIC)
+                        .effectiveTopic(MY_EFFECTIVE_TOPIC).useHttps(false);
     }
 }
index da9f792..7df5d12 100644 (file)
@@ -34,8 +34,11 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import org.apache.commons.collections4.IteratorUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
@@ -299,12 +302,46 @@ public class BusConsumerTest extends TopicTestBase {
     @Test
     public void testKafkaConsumerWrapper() throws Exception {
         // verify that different wrappers can be built
-        assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
+        assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build())).doesNotThrowAnyException();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testKafkaConsumerWrapper_InvalidTopic() throws Exception {
+        new KafkaConsumerWrapper(makeBuilder().topic(null).build());
+    }
+
+    @Test(expected = java.lang.IllegalStateException.class)
+    public void testKafkaConsumerWrapperFetch() throws Exception {
+
+        //Setup Properties for consumer
+        Properties kafkaProps = new Properties();
+        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
+        kafkaProps.setProperty("enable.auto.commit", "true");
+        kafkaProps.setProperty("auto.commit.interval.ms", "1000");
+        kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.kafka.common.serialization.StringDeserializer");
+        kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.kafka.common.serialization.StringDeserializer");
+        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        KafkaConsumerWrapper kafka = new KafkaConsumerWrapper(makeKafkaBuilder().build());
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
+        kafka.consumer = consumer;
+
+        assertFalse(kafka.fetch().iterator().hasNext());
+        consumer.close();
+    }
+
+    @Test
+    public void testKafkaConsumerWrapperClose() throws Exception {
+        assertThatCode(() -> new KafkaConsumerWrapper(makeKafkaBuilder().build()).close()).doesNotThrowAnyException();
     }
 
     @Test
     public void testKafkaConsumerWrapperToString() throws Exception {
-        assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString());
+        assertNotNull(new KafkaConsumerWrapper(makeKafkaBuilder().build()) {}.toString());
     }
 
     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.java
new file mode 100644 (file)
index 0000000..b40b954
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
+import org.onap.policy.common.utils.gson.GsonTestUtils;
+
+public class InlineKafkaTopicSinkTest extends TopicTestBase {
+    private InlineKafkaTopicSink sink;
+
+    /**
+     * Creates the object to be tested.
+     */
+    @Before
+    @Override
+    public void setUp() {
+        super.setUp();
+
+        sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
+    }
+
+    @After
+    public void tearDown() {
+        sink.shutdown();
+    }
+
+    @Test
+    public void testToString() {
+        assertTrue(sink.toString().startsWith("InlineKafkaTopicSink ["));
+    }
+
+    @Test
+    public void testInit() {
+        // nothing null
+        sink = new InlineKafkaTopicSink(makeKafkaBuilder().build());
+        sink.init();
+        assertThatCode(() -> sink.shutdown()).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void testGetTopicCommInfrastructure() {
+        assertEquals(CommInfrastructure.KAFKA, sink.getTopicCommInfrastructure());
+    }
+
+}
index cc09658..6b63c9f 100644 (file)
@@ -42,7 +42,7 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
     public void setUp() {
         super.setUp();
 
-        source = new SingleThreadedKafkaTopicSource(makeBuilder().build());
+        source = new SingleThreadedKafkaTopicSource(makeKafkaBuilder().build());
     }
 
     @After
@@ -50,9 +50,15 @@ public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
         source.shutdown();
     }
 
+    public void testSerialize() {
+        assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedKafkaTopicSourceTest.class))
+                        .doesNotThrowAnyException();
+    }
+
     @Test
     public void testToString() {
         assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource ["));
+        source.shutdown();
     }
 
     @Test
index 7c512a8..42b7a03 100644 (file)
@@ -1,11 +1,19 @@
 {
-  "servers" : [ "svra", "svrb" ],
-  "topic" : "my-topic",
-  "effectiveTopic" : "my-effective-topic",
-  "recentEvents" : [ ],
-  "alive" : false,
-  "locked" : false,
-  "useHttps" : true,
-  "topicCommInfrastructure" : "KAFKA",
-  "partitionKey" : "my-partition"
+  "servers": [
+    "svra",
+    "svrb"
+  ],
+  "topic": "my-topic",
+  "effectiveTopic": "my-effective-topic",
+  "recentEvents": [],
+  "alive": false,
+  "locked": false,
+  "useHttps": false,
+  "topicCommInfrastructure": "KAFKA",
+  "partitionKey": "my-partition",
+  "additionalProps": {
+    "security.protocol": "SASL_PLAINTEXT",
+    "sasl.mechanism": "SCRAM-SHA-512",
+    "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=abc password=abc serviceName=kafka;"
+  }
 }
index 626d87e..38cc2f8 100644 (file)
@@ -1,10 +1,18 @@
 {
-  "servers" : [ "svra", "svrb" ],
-  "topic" : "my-topic",
-  "effectiveTopic" : "my-effective-topic",
-  "recentEvents" : [ ],
-  "alive" : false,
-  "locked" : false,
-  "useHttps" : true,
-  "topicCommInfrastructure" : "KAFKA"
+  "servers": [
+    "localhost:9092",
+    "10.1.2.3:9092"
+  ],
+  "topic": "my-topic",
+  "effectiveTopic": "my-effective-topic",
+  "recentEvents": [],
+  "alive": false,
+  "locked": false,
+  "useHttps": false,
+  "topicCommInfrastructure": "KAFKA",
+  "additionalProps": {
+    "security.protocol": "SASL_PLAINTEXT",
+    "sasl.mechanism": "SCRAM-SHA-512",
+    "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=abc password=abc serviceName=kafka;"
+  }
 }