Introduce Custom Kafka End point 38/129838/3
authorSirisha_Manchikanti <sirisha.manchikanti@est.tech>
Fri, 1 Jul 2022 06:15:00 +0000 (07:15 +0100)
committerSirisha_Manchikanti <sirisha.manchikanti@est.tech>
Fri, 22 Jul 2022 19:17:46 +0000 (20:17 +0100)
Issue-ID: POLICY-4133
Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech>
Change-Id: I2745f3af97e9bb83d94c5cb6d29dfd452d315506

26 files changed:
policy-endpoints/pom.xml
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/Topic.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/properties/PolicyEndPointProperties.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java [new file with mode: 0644]
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java [new file with mode: 0644]
policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json [new file with mode: 0644]
policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json [new file with mode: 0644]

index 550f2a0..816f750 100644 (file)
@@ -1,9 +1,9 @@
 <!--
   ============LICENSE_START=======================================================
-   Copyright (C) 2018 Ericsson. All rights reserved.
-   Modifications Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
-   Modifications Copyright (C) 2019-2020 Nordix Foundation.
-   Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
+   Copyright (C) 2022 Ericsson. All rights reserved.
+   Modifications Copyright (C) 2018-2022 AT&T Intellectual Property. All rights reserved.
+   Modifications Copyright (C) 2019-2022 Nordix Foundation.
+   Modifications Copyright (C) 2022 Bell Canada. All rights reserved.
   ================================================================================
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@
         <cambria.version>1.2.1-oss</cambria.version>
         <http.client.version>4.5.5</http.client.version>
         <http.core.version>4.4.4</http.core.version>
+        <version.kafka>3.0.0</version.kafka>
     </properties>
 
     <dependencies>
             <scope>provided</scope>
         </dependency>
 
+         <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${version.kafka}</version>
+            <exclusions>
+                <!-- The default Zookeeper version in Kafka has vulnerabilities -->
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 </project>
index 6532a19..69bee71 100644 (file)
@@ -4,6 +4,7 @@
  * ================================================================================
  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2019 Samsung Electronics Co., Ltd.
+ * Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -43,6 +44,10 @@ public interface Topic extends TopicRegisterable, Startable, Lockable {
          * DMAAP Communication Infrastructure.
          */
         DMAAP,
+        /**
+         * KAFKA Communication Infrastructure.
+         */
+        KAFKA,
         /**
          * NOOP for internal use only.
          */
index bb70752..9bf3731 100644 (file)
@@ -3,6 +3,7 @@
  * ONAP
  * ================================================================================
  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,6 +27,8 @@ import org.onap.policy.common.capabilities.Lockable;
 import org.onap.policy.common.capabilities.Startable;
 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
@@ -159,6 +162,14 @@ public interface TopicEndpoint extends Startable, Lockable {
      */
     NoopTopicSource getNoopTopicSource(String topicName);
 
+    /**
+     * Get the Kafka Source for the given topic name.
+     *
+     * @param topicName the topic name.
+     * @return the Kafka Source.
+     */
+    KafkaTopicSource getKafkaTopicSource(String topicName);
+
     /**
      * Get the Topic Sinks for the given topic name.
      *
@@ -236,6 +247,18 @@ public interface TopicEndpoint extends Startable, Lockable {
      */
     DmaapTopicSink getDmaapTopicSink(String topicName);
 
+    /**
+     * Get the KAFKA Topic Source for the given topic name.
+     *
+     * @param topicName the topic name
+     *
+     * @return the Topic Source
+     * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+     *         TopicReaders for a topic name and communication infrastructure
+     * @throws IllegalArgumentException if invalid parameters are present
+     */
+    KafkaTopicSink getKafkaTopicSink(String topicName);
+
     /**
      * Gets only the UEB Topic Sources.
      *
@@ -250,6 +273,13 @@ public interface TopicEndpoint extends Startable, Lockable {
      */
     List<DmaapTopicSource> getDmaapTopicSources();
 
+    /**
+     * Gets only the KAFKA Topic Sources.
+     *
+     * @return the KAFKA Topic Source List
+     */
+    List<KafkaTopicSource> getKafkaTopicSources();
+
     /**
      * Gets only the NOOP Topic Sources.
      *
@@ -271,10 +301,18 @@ public interface TopicEndpoint extends Startable, Lockable {
      */
     List<DmaapTopicSink> getDmaapTopicSinks();
 
+    /**
+     * Gets only the KAFKA Topic Sinks.
+     *
+     * @return the KAFKA Topic Sinks List
+     */
+    List<KafkaTopicSink> getKafkaTopicSinks();
+
     /**
      * Gets only the NOOP Topic Sinks.
      *
      * @return the NOOP Topic Sinks List
      */
     List<NoopTopicSink> getNoopTopicSinks();
+
 }
index 293bf60..d37410e 100644 (file)
@@ -3,6 +3,7 @@
  * ONAP
  * ================================================================================
  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,6 +30,9 @@ import org.onap.policy.common.capabilities.Startable;
 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories;
 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource;
@@ -95,6 +99,9 @@ class TopicEndpointProxy implements TopicEndpoint {
                 case DMAAP:
                     sources.add(DmaapTopicFactories.getSourceFactory().build(param));
                     break;
+                case KAFKA:
+                    sources.add(KafkaTopicFactories.getSourceFactory().build(param));
+                    break;
                 case NOOP:
                     sources.add(NoopTopicFactories.getSourceFactory().build(param));
                     break;
@@ -115,12 +122,14 @@ class TopicEndpointProxy implements TopicEndpoint {
 
         // 1. Create UEB Sources
         // 2. Create DMAAP Sources
-        // 3. Create NOOP Sources
+        // 3. Create KAFKA Sources
+        // 4. Create NOOP Sources
 
         List<TopicSource> sources = new ArrayList<>();
 
         sources.addAll(UebTopicFactories.getSourceFactory().build(properties));
         sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties));
+        sources.addAll(KafkaTopicFactories.getSourceFactory().build(properties));
         sources.addAll(NoopTopicFactories.getSourceFactory().build(properties));
 
         lockSources(sources);
@@ -146,6 +155,9 @@ class TopicEndpointProxy implements TopicEndpoint {
                 case DMAAP:
                     sinks.add(DmaapTopicFactories.getSinkFactory().build(param));
                     break;
+                case KAFKA:
+                    sinks.add(KafkaTopicFactories.getSinkFactory().build(param));
+                    break;
                 case NOOP:
                     sinks.add(NoopTopicFactories.getSinkFactory().build(param));
                     break;
@@ -165,12 +177,14 @@ class TopicEndpointProxy implements TopicEndpoint {
     public List<TopicSink> addTopicSinks(Properties properties) {
         // 1. Create UEB Sinks
         // 2. Create DMAAP Sinks
-        // 3. Create NOOP Sinks
+        // 3. Create KAFKA Sinks
+        // 4. Create NOOP Sinks
 
         final List<TopicSink> sinks = new ArrayList<>();
 
         sinks.addAll(UebTopicFactories.getSinkFactory().build(properties));
         sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties));
+        sinks.addAll(KafkaTopicFactories.getSinkFactory().build(properties));
         sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties));
 
         lockSinks(sinks);
@@ -191,6 +205,7 @@ class TopicEndpointProxy implements TopicEndpoint {
 
         sources.addAll(UebTopicFactories.getSourceFactory().inventory());
         sources.addAll(DmaapTopicFactories.getSourceFactory().inventory());
+        sources.addAll(KafkaTopicFactories.getSourceFactory().inventory());
         sources.addAll(NoopTopicFactories.getSourceFactory().inventory());
 
         return sources;
@@ -223,6 +238,15 @@ class TopicEndpointProxy implements TopicEndpoint {
                 logger.debug("No DMAAP source for topic: {}", topic, e);
             }
 
+            try {
+                final TopicSource kafkaSource = this.getKafkaTopicSource(topic);
+                if (kafkaSource != null) {
+                    sources.add(kafkaSource);
+                }
+            } catch (final Exception e) {
+                logger.debug("No KAFKA source for topic: {}", topic, e);
+            }
+
             try {
                 final TopicSource noopSource = this.getNoopTopicSource(topic);
                 if (noopSource != null) {
@@ -242,6 +266,7 @@ class TopicEndpointProxy implements TopicEndpoint {
 
         sinks.addAll(UebTopicFactories.getSinkFactory().inventory());
         sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory());
+        sinks.addAll(KafkaTopicFactories.getSinkFactory().inventory());
         sinks.addAll(NoopTopicFactories.getSinkFactory().inventory());
 
         return sinks;
@@ -274,6 +299,15 @@ class TopicEndpointProxy implements TopicEndpoint {
                 logger.debug("No DMAAP sink for topic: {}", topic, e);
             }
 
+            try {
+                final TopicSink kafkaSink = this.getKafkaTopicSink(topic);
+                if (kafkaSink != null) {
+                    sinks.add(kafkaSink);
+                }
+            } catch (final Exception e) {
+                logger.debug("No KAFKA sink for topic: {}", topic, e);
+            }
+
             try {
                 final TopicSink noopSink = this.getNoopTopicSink(topic);
                 if (noopSink != null) {
@@ -306,6 +340,12 @@ class TopicEndpointProxy implements TopicEndpoint {
             logNoSink(topicName, e);
         }
 
+        try {
+            sinks.add(this.getKafkaTopicSink(topicName));
+        } catch (final Exception e) {
+            logNoSink(topicName, e);
+        }
+
         try {
             sinks.add(this.getNoopTopicSink(topicName));
         } catch (final Exception e) {
@@ -327,6 +367,12 @@ class TopicEndpointProxy implements TopicEndpoint {
         return DmaapTopicFactories.getSourceFactory().inventory();
     }
 
+    @GsonJsonIgnore
+    @Override
+    public List<KafkaTopicSource> getKafkaTopicSources() {
+        return KafkaTopicFactories.getSourceFactory().inventory();
+    }
+
     @GsonJsonIgnore
     @Override
     public List<NoopTopicSource> getNoopTopicSources() {
@@ -345,6 +391,12 @@ class TopicEndpointProxy implements TopicEndpoint {
         return DmaapTopicFactories.getSinkFactory().inventory();
     }
 
+    @Override
+    @GsonJsonIgnore
+    public List<KafkaTopicSink> getKafkaTopicSinks() {
+        return KafkaTopicFactories.getSinkFactory().inventory();
+    }
+
     @GsonJsonIgnore
     @Override
     public List<NoopTopicSink> getNoopTopicSinks() {
@@ -432,6 +484,9 @@ class TopicEndpointProxy implements TopicEndpoint {
         DmaapTopicFactories.getSourceFactory().destroy();
         DmaapTopicFactories.getSinkFactory().destroy();
 
+        KafkaTopicFactories.getSourceFactory().destroy();
+        KafkaTopicFactories.getSinkFactory().destroy();
+
         NoopTopicFactories.getSinkFactory().destroy();
         NoopTopicFactories.getSourceFactory().destroy();
 
@@ -497,6 +552,8 @@ class TopicEndpointProxy implements TopicEndpoint {
                 return this.getUebTopicSource(topicName);
             case DMAAP:
                 return this.getDmaapTopicSource(topicName);
+            case KAFKA:
+                return this.getKafkaTopicSource(topicName);
             case NOOP:
                 return this.getNoopTopicSource(topicName);
             default:
@@ -519,6 +576,8 @@ class TopicEndpointProxy implements TopicEndpoint {
                 return this.getUebTopicSink(topicName);
             case DMAAP:
                 return this.getDmaapTopicSink(topicName);
+            case KAFKA:
+                return this.getKafkaTopicSink(topicName);
             case NOOP:
                 return this.getNoopTopicSink(topicName);
             default:
@@ -541,6 +600,11 @@ class TopicEndpointProxy implements TopicEndpoint {
         return DmaapTopicFactories.getSourceFactory().get(topicName);
     }
 
+    @Override
+    public KafkaTopicSource getKafkaTopicSource(String topicName) {
+        return KafkaTopicFactories.getSourceFactory().get(topicName);
+    }
+
     @Override
     public NoopTopicSource getNoopTopicSource(String topicName) {
         return NoopTopicFactories.getSourceFactory().get(topicName);
@@ -551,6 +615,11 @@ class TopicEndpointProxy implements TopicEndpoint {
         return DmaapTopicFactories.getSinkFactory().get(topicName);
     }
 
+    @Override
+    public KafkaTopicSink getKafkaTopicSink(String topicName) {
+        return KafkaTopicFactories.getSinkFactory().get(topicName);
+    }
+
     @Override
     public NoopTopicSink getNoopTopicSink(String topicName) {
         return NoopTopicFactories.getSinkFactory().get(topicName);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java
new file mode 100644 (file)
index 0000000..23aaabd
--- /dev/null
@@ -0,0 +1,198 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import com.google.re2j.Pattern;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineKafkaTopicSink;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils;
+import org.onap.policy.common.endpoints.utils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory of KAFKA Reader Topics indexed by topic name.
+ */
+class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
+    private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
+    private static final String MISSING_TOPIC = "A topic must be provided";
+
+    /**
+     * Logger.
+     */
+    private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
+
+    /**
+     * KAFKA Topic Name Index.
+     */
+    protected HashMap<String, KafkaTopicSink> kafkaTopicSinks = new HashMap<>();
+
+    @Override
+    public KafkaTopicSink build(BusTopicParams busTopicParams) {
+
+        if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
+            throw new IllegalArgumentException("KAFKA Server(s) must be provided");
+        }
+
+        if (StringUtils.isBlank(busTopicParams.getTopic())) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (kafkaTopicSinks.containsKey(busTopicParams.getTopic())) {
+                return kafkaTopicSinks.get(busTopicParams.getTopic());
+            }
+
+            KafkaTopicSink kafkaTopicWriter = makeSink(busTopicParams);
+            if (busTopicParams.isManaged()) {
+                kafkaTopicSinks.put(busTopicParams.getTopic(), kafkaTopicWriter);
+            }
+
+            return kafkaTopicWriter;
+        }
+    }
+
+
+    @Override
+    public KafkaTopicSink build(List<String> servers, String topic) {
+        return this.build(BusTopicParams.builder()
+                .servers(servers)
+                .topic(topic)
+                .managed(true)
+                .useHttps(false)
+                .build());
+    }
+
+
+    @Override
+    public List<KafkaTopicSink> build(Properties properties) {
+
+        String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS);
+        if (StringUtils.isBlank(writeTopics)) {
+            logger.info("{}: no topic for KAFKA Sink", this);
+            return new ArrayList<>();
+        }
+
+        List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>();
+        synchronized (this) {
+            for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
+                addTopic(newKafkaTopicSinks, topic, properties);
+            }
+            return newKafkaTopicSinks;
+        }
+    }
+
+    private void addTopic(List<KafkaTopicSink> newKafkaTopicSinks, String topic, Properties properties) {
+        if (this.kafkaTopicSinks.containsKey(topic)) {
+            newKafkaTopicSinks.add(this.kafkaTopicSinks.get(topic));
+            return;
+        }
+
+        String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic;
+
+        var props = new PropertyUtils(properties, topicPrefix,
+            (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+
+        String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+        if (StringUtils.isBlank(servers)) {
+            logger.error("{}: no KAFKA servers configured for sink {}", this, topic);
+            return;
+        }
+
+        KafkaTopicSink kafkaTopicWriter = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers)
+                .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
+                .build());
+        newKafkaTopicSinks.add(kafkaTopicWriter);
+    }
+
+    @Override
+    public void destroy(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        KafkaTopicSink kafkaTopicWriter;
+        synchronized (this) {
+            if (!kafkaTopicSinks.containsKey(topic)) {
+                return;
+            }
+
+            kafkaTopicWriter = kafkaTopicSinks.remove(topic);
+        }
+
+        kafkaTopicWriter.shutdown();
+    }
+
+    @Override
+    public void destroy() {
+        List<KafkaTopicSink> writers = this.inventory();
+        for (KafkaTopicSink writer : writers) {
+            writer.shutdown();
+        }
+
+        synchronized (this) {
+            this.kafkaTopicSinks.clear();
+        }
+    }
+
+    @Override
+    public KafkaTopicSink get(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (kafkaTopicSinks.containsKey(topic)) {
+                return kafkaTopicSinks.get(topic);
+            } else {
+                throw new IllegalStateException("KafkaTopicSink for " + topic + " not found");
+            }
+        }
+    }
+
+    @Override
+    public synchronized List<KafkaTopicSink> inventory() {
+        return new ArrayList<>(this.kafkaTopicSinks.values());
+    }
+
+    /**
+     * Makes a new sink.
+     *
+     * @param busTopicParams parameters to use to configure the sink
+     * @return a new sink
+     */
+    protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
+        return new InlineKafkaTopicSink(busTopicParams);
+    }
+
+
+    @Override
+    public String toString() {
+        return "IndexedKafkaTopicSinkFactory " + kafkaTopicSinks.keySet();
+    }
+
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
new file mode 100644 (file)
index 0000000..47279d4
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import com.google.re2j.Pattern;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedKafkaTopicSource;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils;
+import org.onap.policy.common.endpoints.utils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory of KAFKA Source Topics indexed by topic name.
+ */
+class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
+    private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
+    private static final String MISSING_TOPIC = "A topic must be provided";
+
+    /**
+     * Logger.
+     */
+    private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
+
+    /**
+     * KAFKA Topic Name Index.
+     */
+    protected HashMap<String, KafkaTopicSource> kafkaTopicSources = new HashMap<>();
+
+    @Override
+    public KafkaTopicSource build(BusTopicParams busTopicParams) {
+        if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
+            throw new IllegalArgumentException("KAFKA Server(s) must be provided");
+        }
+
+        if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (kafkaTopicSources.containsKey(busTopicParams.getTopic())) {
+                return kafkaTopicSources.get(busTopicParams.getTopic());
+            }
+
+            var kafkaTopicSource = makeSource(busTopicParams);
+            kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource);
+
+            return kafkaTopicSource;
+        }
+    }
+
+    @Override
+    public List<KafkaTopicSource> build(Properties properties) {
+
+        String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS);
+        if (StringUtils.isBlank(readTopics)) {
+            logger.info("{}: no topic for KAFKA Source", this);
+            return new ArrayList<>();
+        }
+
+        List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>();
+        synchronized (this) {
+            for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
+                addTopic(newKafkaTopicSources, topic, properties);
+            }
+        }
+        return newKafkaTopicSources;
+    }
+
+    @Override
+    public KafkaTopicSource build(List<String> servers, String topic) {
+        return this.build(BusTopicParams.builder()
+                .servers(servers)
+                .topic(topic)
+                .managed(true)
+                .useHttps(false).build());
+    }
+
+    private void addTopic(List<KafkaTopicSource> newKafkaTopicSources, String topic, Properties properties) {
+        if (this.kafkaTopicSources.containsKey(topic)) {
+            newKafkaTopicSources.add(this.kafkaTopicSources.get(topic));
+            return;
+        }
+
+        String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic;
+
+        var props = new PropertyUtils(properties, topicPrefix,
+            (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+
+        String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+        if (StringUtils.isBlank(servers)) {
+            logger.error("{}: no KAFKA servers configured for sink {}", this, topic);
+            return;
+        }
+
+        var kafkaTopicSource = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers)
+                .build());
+
+        newKafkaTopicSources.add(kafkaTopicSource);
+    }
+
+    /**
+     * Makes a new source.
+     *
+     * @param busTopicParams parameters to use to configure the source
+     * @return a new source
+     */
+    protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) {
+        return new SingleThreadedKafkaTopicSource(busTopicParams);
+    }
+
+    @Override
+    public void destroy(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        KafkaTopicSource kafkaTopicSource;
+
+        synchronized (this) {
+            if (!kafkaTopicSources.containsKey(topic)) {
+                return;
+            }
+
+            kafkaTopicSource = kafkaTopicSources.remove(topic);
+        }
+
+        kafkaTopicSource.shutdown();
+    }
+
+    @Override
+    public void destroy() {
+        List<KafkaTopicSource> readers = this.inventory();
+        for (KafkaTopicSource reader : readers) {
+            reader.shutdown();
+        }
+
+        synchronized (this) {
+            this.kafkaTopicSources.clear();
+        }
+    }
+
+    @Override
+    public KafkaTopicSource get(String topic) {
+
+        if (topic == null || topic.isEmpty()) {
+            throw new IllegalArgumentException(MISSING_TOPIC);
+        }
+
+        synchronized (this) {
+            if (kafkaTopicSources.containsKey(topic)) {
+                return kafkaTopicSources.get(topic);
+            } else {
+                throw new IllegalStateException("KafkaTopiceSource for " + topic + " not found");
+            }
+        }
+    }
+
+    @Override
+    public synchronized List<KafkaTopicSource> inventory() {
+        return new ArrayList<>(this.kafkaTopicSources.values());
+    }
+
+    @Override
+    public String toString() {
+        return "IndexedKafkaTopicSourceFactory " + kafkaTopicSources.keySet();
+    }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactories.java
new file mode 100644 (file)
index 0000000..60db385
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class KafkaTopicFactories {
+
+    /**
+     * Factory for instantiation and management of sinks.
+     */
+    @Getter
+    private static final KafkaTopicSinkFactory sinkFactory = new IndexedKafkaTopicSinkFactory();
+
+    /**
+     * Factory for instantiation and management of sources.
+     */
+    @Getter
+    private static final KafkaTopicSourceFactory sourceFactory = new IndexedKafkaTopicSourceFactory();
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSink.java
new file mode 100644 (file)
index 0000000..960a02c
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+/**
+ * Topic Writer over KAFKA Infrastructure.
+ */
+public interface KafkaTopicSink extends BusTopicSink {
+
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkFactory.java
new file mode 100644 (file)
index 0000000..fa5e56f
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import java.util.List;
+import java.util.Properties;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+
+/**
+ * KAFKA Topic Sink Factory.
+ */
+public interface KafkaTopicSinkFactory {
+
+    /**
+     * Instantiates a new KAFKA Topic Writer.
+     *
+     * @param busTopicParams parameters object
+     * @return an KAFKA Topic Sink
+     */
+    KafkaTopicSink build(BusTopicParams busTopicParams);
+
+    /**
+     * Creates an KAFKA Topic Writer based on properties files.
+     *
+     * @param properties Properties containing initialization values
+     *
+     * @return an KAFKA Topic Writer
+     * @throws IllegalArgumentException if invalid parameters are present
+     */
+    List<KafkaTopicSink> build(Properties properties);
+
+    /**
+     * Instantiates a new KAFKA Topic Writer.
+     *
+     * @param servers list of servers
+     * @param topic topic name
+     *
+     * @return an KAFKA Topic Writer
+     * @throws IllegalArgumentException if invalid parameters are present
+     */
+    KafkaTopicSink build(List<String> servers, String topic);
+
+    /**
+     * Destroys an KAFKA Topic Writer based on a topic.
+     *
+     * @param topic topic name
+     * @throws IllegalArgumentException if invalid parameters are present
+     */
+    void destroy(String topic);
+
+    /**
+     * Destroys all KAFKA Topic Writers.
+     */
+    void destroy();
+
+    /**
+     * gets an KAFKA Topic Writer based on topic name.
+     *
+     * @param topic the topic name
+     *
+     * @return an KAFKA Topic Writer with topic name
+     * @throws IllegalArgumentException if an invalid topic is provided
+     * @throws IllegalStateException if the KAFKA Topic Reader is an incorrect state
+     */
+    KafkaTopicSink get(String topic);
+
+    /**
+     * Provides a snapshot of the KAFKA Topic Writers.
+     *
+     * @return a list of the KAFKA Topic Writers
+     */
+    List<KafkaTopicSink> inventory();
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSource.java
new file mode 100644 (file)
index 0000000..03efd08
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+/**
+ * Kafka Topic Source.
+ */
+public interface KafkaTopicSource extends BusTopicSource {
+
+}
\ No newline at end of file
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
new file mode 100644 (file)
index 0000000..8cb51df
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import java.util.List;
+import java.util.Properties;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+
+/**
+ * Kafka Topic Source Factory.
+ */
+public interface KafkaTopicSourceFactory {
+
+    /**
+     * Creates an Kafka Topic Source based on properties files.
+     *
+     * @param properties Properties containing initialization values
+     *
+     * @return an Kafka Topic Source
+     * @throws IllegalArgumentException if invalid parameters are present
+     */
+    List<KafkaTopicSource> build(Properties properties);
+
+    /**
+     * Instantiates a new Kafka Topic Source.
+     *
+     * @param busTopicParams parameters object
+     * @return an Kafka Topic Source
+     */
+    KafkaTopicSource build(BusTopicParams busTopicParams);
+
+    /**
+     * Instantiates a new Kafka Topic Source.
+     *
+     * @param servers list of servers
+     * @param topic topic name
+     *
+     * @return an Kafka Topic Source
+     * @throws IllegalArgumentException if invalid parameters are present
+     */
+    KafkaTopicSource build(List<String> servers, String topic);
+
+    /**
+     * Destroys an Kafka Topic Source based on a topic.
+     *
+     * @param topic topic name
+     * @throws IllegalArgumentException if invalid parameters are present
+     */
+    void destroy(String topic);
+
+    /**
+     * Destroys all Kafka Topic Sources.
+     */
+    void destroy();
+
+    /**
+     * Gets an Kafka Topic Source based on topic name.
+     *
+     * @param topic the topic name
+     * @return an Kafka Topic Source with topic name
+     * @throws IllegalArgumentException if an invalid topic is provided
+     * @throws IllegalStateException if the Kafka Topic Source is an incorrect state
+     */
+    KafkaTopicSource get(String topic);
+
+    /**
+     * Provides a snapshot of the Kafka Topic Sources.
+     *
+     * @return a list of the Kafka Topic Sources
+     */
+    List<KafkaTopicSource> inventory();
+}
index 20f4c91..8d88b0d 100644 (file)
@@ -5,6 +5,7 @@
  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,13 +29,19 @@ import com.att.nsa.cambria.client.CambriaConsumer;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.security.GeneralSecurityException;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl.MRConsumerImplBuilder;
@@ -218,6 +225,53 @@ public interface BusConsumer {
         }
     }
 
+    /**
+     * Kafka based consumer.
+     */
+    public static class KafkaConsumerWrapper extends FetchingBusConsumer {
+
+        /**
+         * logger.
+         */
+        private static Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
+
+        /**
+         * Kafka consumer.
+         */
+        private KafkaConsumer<String, String> consumer;
+
+        /**
+         * Kafka Consumer Wrapper.
+         * BusTopicParam object contains the following parameters
+         * servers messaging bus hosts.
+         * topic topic
+         *
+         * @param busTopicParams - The parameters for the bus topic
+         * @throws GeneralSecurityException - Security exception
+         * @throws MalformedURLException - Malformed URL exception
+         */
+        public KafkaConsumerWrapper(BusTopicParams busTopicParams) {
+            super(busTopicParams);
+        }
+
+        @Override
+        public Iterable<String> fetch() throws IOException {
+            // TODO: Not implemented yet
+            return new ArrayList<>();
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            this.consumer.close();
+        }
+
+        @Override
+        public String toString() {
+            return "KafkaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
+        }
+    }
+
     /**
      * MR based consumer.
      */
index 8bf805b..e0df709 100644 (file)
@@ -5,6 +5,7 @@
  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -31,8 +32,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.record.CompressionType;
 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
@@ -143,6 +149,58 @@ public interface BusPublisher {
 
     }
 
+    /**
+     * Kafka based library publisher.
+     */
+    public static class KafkaPublisherWrapper implements BusPublisher {
+
+        private static Logger logger = LoggerFactory.getLogger(KafkaPublisherWrapper.class);
+
+        /**
+         * The actual Kafka publisher.
+         */
+        private final KafkaProducer producer;
+
+        /**
+         * Constructor.
+         *
+         * @param busTopicParams topic parameters
+         */
+        public KafkaPublisherWrapper(BusTopicParams busTopicParams) {
+            // TODO Setting of topic parameters is not implemented yet.
+            //Setup Properties for Kafka Producer
+            Properties kafkaProps = new Properties();
+            this.producer = new KafkaProducer(kafkaProps);
+        }
+
+        @Override
+        public boolean send(String partitionId, String message) {
+            if (message == null) {
+                throw new IllegalArgumentException("No message provided");
+            }
+            // TODO Sending messages is not implemented yet
+            return true;
+        }
+
+        @Override
+        public void close() {
+            logger.info("{}: CLOSE", this);
+
+            try (this.producer) {
+                this.producer.close();
+            } catch (Exception e) {
+                logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
+            }
+        }
+
+
+        @Override
+        public String toString() {
+            return "KafkaPublisherWrapper []";
+        }
+
+    }
+
     /**
      * DmaapClient library wrapper.
      */
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
new file mode 100644 (file)
index 0000000..b564229
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This implementation publishes events for the associated KAFKA topic, inline with the calling
+ * thread.
+ */
+public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTopicSink {
+
+    /**
+     * Logger.
+     */
+    private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+
+    /**
+     * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
+     * attributes.
+     *
+     * <p>servers              list of KAFKA servers available for publishing
+     * topic                the topic to publish to
+     * partitionId          the partition key (optional, autogenerated if not provided)
+     * useHttps             does connection use HTTPS?
+     * @param busTopicParams contains attributes needed
+     * @throws IllegalArgumentException if invalid arguments are detected
+     */
+    public InlineKafkaTopicSink(BusTopicParams busTopicParams) {
+        super(busTopicParams);
+    }
+
+    /**
+     * Instantiation of internal resources.
+     */
+    @Override
+    public void init() {
+
+        this.publisher = new BusPublisher.KafkaPublisherWrapper(BusTopicParams.builder()
+                .servers(this.servers)
+                .topic(this.effectiveTopic)
+                .useHttps(this.useHttps)
+                .build());
+        logger.info("{}: KAFKA SINK created", this);
+    }
+
+    @Override
+    public String toString() {
+        return "InlineKafkaTopicSink [getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()="
+                        + super.toString() + "]";
+    }
+
+    @Override
+    public CommInfrastructure getTopicCommInfrastructure() {
+        return Topic.CommInfrastructure.KAFKA;
+    }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
new file mode 100644 (file)
index 0000000..b8362b8
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
+
+/**
+ * This topic source implementation specializes in reading messages over an Kafka Bus topic source and
+ * notifying its listeners.
+ */
+public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
+
+    /**
+     * Constructor.
+     *
+     * @param busTopicParams Parameters object containing all the required inputs
+     * @throws IllegalArgumentException An invalid parameter passed in
+     */
+    public SingleThreadedKafkaTopicSource(BusTopicParams busTopicParams) {
+        super(busTopicParams);
+        this.init();
+    }
+
+    /**
+     * Initialize the Cambria client.
+     */
+    @Override
+    public void init() {
+        this.consumer = new BusConsumer.KafkaConsumerWrapper(BusTopicParams.builder()
+                .servers(this.servers)
+                .topic(this.effectiveTopic)
+                .useHttps(this.useHttps)
+                .build());
+    }
+
+    @Override
+    public CommInfrastructure getTopicCommInfrastructure() {
+        return Topic.CommInfrastructure.KAFKA;
+    }
+
+    @Override
+    public String toString() {
+        return "SingleThreadedKafkaTopicSource [getTopicCommInfrastructure()=" + getTopicCommInfrastructure()
+            + ", toString()=" + super.toString() + "]";
+    }
+
+}
index 08ed262..49dff28 100644 (file)
@@ -3,6 +3,7 @@
  * ONAP
  * ================================================================================
  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -79,6 +80,11 @@ public final class PolicyEndPointProperties {
     public static final String PROPERTY_NOOP_SOURCE_TOPICS = "noop.source.topics";
     public static final String PROPERTY_NOOP_SINK_TOPICS = "noop.sink.topics";
 
+    /* KAFKA Properties */
+
+    public static final String PROPERTY_KAFKA_SOURCE_TOPICS = "kafka.source.topics";
+    public static final String PROPERTY_KAFKA_SINK_TOPICS = "kafka.sink.topics";
+
     /* HTTP Server Properties */
 
     public static final String PROPERTY_HTTP_SERVER_SERVICES = "http.server.services";
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/utils/KafkaPropertyUtils.java
new file mode 100644 (file)
index 0000000..3e62f98
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.utils;
+
+import com.google.re2j.Pattern;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class KafkaPropertyUtils {
+    private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
+
+    /**
+     * Makes a topic builder, configuring it with properties that are common to both
+     * sources and sinks.
+     *
+     * @param props properties to be used to configure the builder
+     * @param topic topic being configured
+     * @param servers target servers
+     * @return a topic builder
+     */
+    public static TopicParamsBuilder makeBuilder(PropertyUtils props, String topic, String servers) {
+
+        final List<String> serverList = new ArrayList<>(Arrays.asList(COMMA_SPACE_PAT.split(servers)));
+        //TODO More Kafka properties to be added
+        return BusTopicParams.builder()
+                    .servers(serverList)
+                    .topic(topic)
+                    .effectiveTopic(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX,
+                                    topic))
+                    .managed(props.getBoolean(PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, true));
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicFactoryTestBase.java
new file mode 100644 (file)
index 0000000..3986549
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+import java.util.Collections;
+import org.onap.policy.common.endpoints.event.comm.Topic;
+
+/**
+ * Base class for KafkaTopicXxxFactory tests.
+ *
+ * @param <T> type of topic managed by the factory
+ */
+public abstract class KafkaTopicFactoryTestBase<T extends Topic> extends BusTopicFactoryTestBase<T> {
+
+    @Override
+    public void testBuildBusTopicParams_Ex() {
+
+        super.testBuildBusTopicParams_Ex();
+
+        // null servers
+        assertThatIllegalArgumentException().as("null servers")
+                        .isThrownBy(() -> buildTopic(makeBuilder().servers(null).build()));
+
+        // empty servers
+        assertThatIllegalArgumentException().as("empty servers")
+                        .isThrownBy(() -> buildTopic(makeBuilder().servers(Collections.emptyList()).build()));
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicPropertyBuilder.java
new file mode 100644 (file)
index 0000000..1a815e1
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_EFFECTIVE_TOPIC;
+import static org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase.MY_PARTITION;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX;
+
+import java.util.Arrays;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+
+public class KafkaTopicPropertyBuilder extends TopicPropertyBuilder {
+
+    public static final String SERVER = "my-server";
+    public static final String TOPIC2 = "my-topic-2";
+
+    @Getter
+    private TopicParameters params = new TopicParameters();
+
+    /**
+     * Constructs the object.
+     *
+     * @param prefix the prefix for the properties to be built
+     */
+    public KafkaTopicPropertyBuilder(String prefix) {
+        super(prefix);
+    }
+
+    /**
+     * Adds a topic and configures it's properties with default values.
+     *
+     * @param topic the topic to be added
+     * @return this builder
+     */
+    public KafkaTopicPropertyBuilder makeTopic(String topic) {
+        addTopic(topic);
+
+        setTopicProperty(PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, MY_EFFECTIVE_TOPIC);
+        setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true");
+        setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true");
+        setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION);
+        setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER);
+
+        params.setTopicCommInfrastructure("kafka");
+        params.setTopic(topic);
+        params.setEffectiveTopic(MY_EFFECTIVE_TOPIC);
+        params.setManaged(true);
+        params.setUseHttps(true);
+        params.setPartitionId(MY_PARTITION);
+        params.setServers(Arrays.asList(SERVER));
+
+        return this;
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSinkTest.java
new file mode 100644 (file)
index 0000000..503e513
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.junit.Test;
+
+public class KafkaTopicSinkTest {
+
+    @Test
+    public void test() {
+        assertNotNull(KafkaTopicFactories.getSinkFactory());
+    }
+
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactoryTest.java
new file mode 100644 (file)
index 0000000..6fa80a4
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+
+public class KafkaTopicSourceFactoryTest extends KafkaTopicFactoryTestBase<KafkaTopicSource> {
+
+    private SourceFactory factory;
+
+    /**
+     * Creates the object to be tested.
+     */
+    @Before
+    @Override
+    public void setUp() {
+        super.setUp();
+
+        factory = new SourceFactory();
+    }
+
+    @After
+    public void tearDown() {
+        factory.destroy();
+    }
+
+    @Test
+    @Override
+    public void testBuildBusTopicParams() {
+        super.testBuildBusTopicParams_Ex();
+    }
+
+    @Test
+    @Override
+    public void testBuildProperties() {
+
+        initFactory();
+
+        List<KafkaTopicSource> topics = buildTopics(makePropBuilder().makeTopic(MY_TOPIC).build());
+        assertEquals(1, topics.size());
+        assertEquals(MY_TOPIC, topics.get(0).getTopic());
+    }
+
+    @Test
+    @Override
+    public void testDestroyString_testGet_testInventory() {
+        super.testDestroyString_Ex();
+    }
+
+    @Test
+    public void testGet() {
+        super.testGet_Ex();
+    }
+
+    @Test
+    public void testToString() {
+        assertTrue(factory.toString().startsWith("IndexedKafkaTopicSourceFactory ["));
+    }
+
+    @Override
+    protected void initFactory() {
+        if (factory != null) {
+            factory.destroy();
+        }
+
+        factory = new SourceFactory();
+    }
+
+    @Override
+    protected List<KafkaTopicSource> buildTopics(Properties properties) {
+        return factory.build(properties);
+    }
+
+    @Override
+    protected KafkaTopicSource buildTopic(BusTopicParams params) {
+        return factory.build(params);
+    }
+
+    @Override
+    protected KafkaTopicSource buildTopic(List<String> servers, String topic) {
+        return factory.build(servers, topic);
+    }
+
+    @Override
+    protected void destroyFactory() {
+        factory.destroy();
+    }
+
+    @Override
+    protected void destroyTopic(String topic) {
+        factory.destroy(topic);
+    }
+
+    @Override
+    protected List<KafkaTopicSource> getInventory() {
+        return factory.inventory();
+    }
+
+    @Override
+    protected KafkaTopicSource getTopic(String topic) {
+        return factory.get(topic);
+    }
+
+    @Override
+    protected BusTopicParams getLastParams() {
+        return factory.params.getLast();
+    }
+
+    @Override
+    protected TopicPropertyBuilder makePropBuilder() {
+        return new KafkaTopicPropertyBuilder(PROPERTY_KAFKA_SOURCE_TOPICS);
+    }
+
+    /**
+     * Factory that records the parameters of all of the sources it creates.
+     */
+    private static class SourceFactory extends IndexedKafkaTopicSourceFactory {
+        private Deque<BusTopicParams> params = new LinkedList<>();
+
+        @Override
+        protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) {
+            params.add(busTopicParams);
+            return super.makeSource(busTopicParams);
+        }
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceTest.java
new file mode 100644 (file)
index 0000000..ee2d1d7
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.junit.Test;
+
+public class KafkaTopicSourceTest {
+
+    @Test
+    public void verifyKafkaTopicFactoriesNotNull() {
+        assertNotNull(KafkaTopicFactories.getSourceFactory());
+    }
+
+}
index 21050f9..da9f792 100644 (file)
@@ -46,6 +46,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.Dmaa
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.KafkaConsumerWrapper;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.powermock.reflect.Whitebox;
 
@@ -295,6 +296,17 @@ public class BusConsumerTest extends TopicTestBase {
         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
     }
 
+    @Test
+    public void testKafkaConsumerWrapper() throws Exception {
+        // verify that different wrappers can be built
+        assertThatCode(() -> new KafkaConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void testKafkaConsumerWrapperToString() throws Exception {
+        assertNotNull(new KafkaConsumerWrapper(makeBuilder().build()) {}.toString());
+    }
+
     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
 
         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.java
new file mode 100644 (file)
index 0000000..cc09658
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
+import org.onap.policy.common.utils.gson.GsonTestUtils;
+
+public class SingleThreadedKafkaTopicSourceTest extends TopicTestBase {
+    private SingleThreadedKafkaTopicSource source;
+
+    /**
+     * Creates the object to be tested.
+     */
+    @Before
+    @Override
+    public void setUp() {
+        super.setUp();
+
+        source = new SingleThreadedKafkaTopicSource(makeBuilder().build());
+    }
+
+    @After
+    public void tearDown() {
+        source.shutdown();
+    }
+
+    @Test
+    public void testToString() {
+        assertTrue(source.toString().startsWith("SingleThreadedKafkaTopicSource ["));
+    }
+
+    @Test
+    public void testGetTopicCommInfrastructure() {
+        assertEquals(CommInfrastructure.KAFKA, source.getTopicCommInfrastructure());
+    }
+
+}
diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSinkTest.json
new file mode 100644 (file)
index 0000000..7c512a8
--- /dev/null
@@ -0,0 +1,11 @@
+{
+  "servers" : [ "svra", "svrb" ],
+  "topic" : "my-topic",
+  "effectiveTopic" : "my-effective-topic",
+  "recentEvents" : [ ],
+  "alive" : false,
+  "locked" : false,
+  "useHttps" : true,
+  "topicCommInfrastructure" : "KAFKA",
+  "partitionKey" : "my-partition"
+}
diff --git a/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json b/policy-endpoints/src/test/resources/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSourceTest.json
new file mode 100644 (file)
index 0000000..626d87e
--- /dev/null
@@ -0,0 +1,10 @@
+{
+  "servers" : [ "svra", "svrb" ],
+  "topic" : "my-topic",
+  "effectiveTopic" : "my-effective-topic",
+  "recentEvents" : [ ],
+  "alive" : false,
+  "locked" : false,
+  "useHttps" : true,
+  "topicCommInfrastructure" : "KAFKA"
+}