Add Native Kafka streams support in bp-generator 10/116910/13
authorTomasz Wrobel <tomasz.wrobel@nokia.com>
Fri, 15 Jan 2021 13:34:18 +0000 (14:34 +0100)
committerTomasz Wrobel <tomasz.wrobel@nokia.com>
Thu, 21 Jan 2021 10:19:39 +0000 (11:19 +0100)
Issue-ID: DCAEGEN2-1179
Signed-off-by: Tomasz Wrobel <tomasz.wrobel@nokia.com>
Change-Id: I541dca959707a41c56205e20c9f5a56ccec5ca41

15 files changed:
mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/constants/Constants.java
mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Appconfig.java
mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/BaseStream.java [new file with mode: 0644]
mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/Dmaap.java
mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/service/base/BlueprintHelperService.java
mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/AppConfigService.java
mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/StreamService.java
mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java [new file with mode: 0644]
mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java [new file with mode: 0644]
mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java [new file with mode: 0644]
mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java [new file with mode: 0644]
mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java [new file with mode: 0644]
mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/StreamServiceTest.java
mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamServiceTest.java [new file with mode: 0644]
mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamTest.java [new file with mode: 0644]

index ac654a2..fbd0527 100644 (file)
@@ -5,6 +5,8 @@
  *  *  ================================================================================
  *  *  Copyright (c) 2020  AT&T Intellectual Property. All rights reserved.
  *  *  ================================================================================
+ *  *  Modifications Copyright (c) 2021 Nokia
+ *  *  ================================================================================
  *  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  *  you may not use this file except in compliance with the License.
  *  *  You may obtain a copy of the License at
@@ -39,6 +41,7 @@ public class Constants {
     public static final String DATA_ROUTER = "data_router";
     public static final String MESSAGEROUTER_VALUE = "message router";
     public static final String MESSAGE_ROUTER = "message_router";
+    public static final String KAFKA_TYPE = "kafka";
     public static final String TOSCA_DEF_VERSION = "cloudify_dsl_1_3";
     public static final String SERVICE_COMPONENT_NAME_OVERRIDE = "service_component_name_override";
     public static final String EMPTY = "''";
index 5b3cdc6..d0f5784 100644 (file)
@@ -5,6 +5,8 @@
  *  *  ================================================================================
  *  *  Copyright (c) 2020  AT&T Intellectual Property. All rights reserved.
  *  *  ================================================================================
+ *  *  Modifications Copyright (c) 2021 Nokia
+ *  *  ================================================================================
  *  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  *  you may not use this file except in compliance with the License.
  *  *  You may obtain a copy of the License at
@@ -43,9 +45,9 @@ public class Appconfig {
 
     private Calls[] services_calls;
 
-    private Map<String, Dmaap> streams_publishes;
+    private Map<String, BaseStream> streams_publishes;
 
-    private Map<String, Dmaap> streams_subscribes;
+    private Map<String, BaseStream> streams_subscribes;
 
     private Map<String, Object> params;
 
diff --git a/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/BaseStream.java b/mod/bpgenerator/common/src/main/java/org/onap/blueprintgenerator/model/common/BaseStream.java
new file mode 100644 (file)
index 0000000..310f9a0
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ *
+ *  * ============LICENSE_START=======================================================
+ *  *  org.onap.dcae
+ *  *  ================================================================================
+ *  *  Copyright (c) 2021  Nokia Intellectual Property. All rights reserved.
+ *  *  ================================================================================
+ *  *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  *  you may not use this file except in compliance with the License.
+ *  *  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *  ============LICENSE_END=========================================================
+ *
+ *
+ */
+
+
+package org.onap.blueprintgenerator.model.common;
+
+/**
+ * @author : Tomasz Wrobel
+ * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator
+ * Applications Common Model: An interface with common stream field
+ */
+
+public interface BaseStream {
+
+    String getType();
+
+}
index 7ed79b3..9c37773 100644 (file)
@@ -5,6 +5,8 @@
  *  *  ================================================================================
  *  *  Copyright (c) 2020  AT&T Intellectual Property. All rights reserved.
  *  *  ================================================================================
+ *  *  Modifications Copyright (c) 2021 Nokia
+ *  *  ================================================================================
  *  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  *  you may not use this file except in compliance with the License.
  *  *  You may obtain a copy of the License at
@@ -29,13 +31,14 @@ import lombok.Data;
 
 /**
  * @author : Ravi Mantena
- * @date 10/16/2020 Application: DCAE/ONAP - Blueprint Generator Common Module: Used by both ONAP
- * and DCAE Blueprint Applications Common Model: A model class which represents Dmaap
+ * @date 10/16/2020 Application: DCAE/ONAP - Blueprint Generator Common Module: Used by both ONAP and DCAE Blueprint
+ * Applications Common Model: A model class which represents Dmaap
  */
+
 @Data
 @JsonInclude(value = JsonInclude.Include.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class Dmaap {
+public class Dmaap implements BaseStream {
 
     private Object dmaap_info;
 
@@ -45,4 +48,5 @@ public class Dmaap {
     private GetInput pass;
 
     private GetInput user;
+
 }
index db15360..c9759d0 100644 (file)
@@ -5,6 +5,8 @@
  *  *  ================================================================================
  *  *  Copyright (c) 2020  AT&T Intellectual Property. All rights reserved.
  *  *  ================================================================================
+ *  *  Modifications Copyright (c) 2021 Nokia
+ *  *  ================================================================================
  *  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  *  you may not use this file except in compliance with the License.
  *  *  You may obtain a copy of the License at
@@ -201,6 +203,16 @@ public class BlueprintHelperService {
         return type.equals(Constants.MESSAGE_ROUTER) || type.equals(Constants.MESSAGEROUTER_VALUE);
     }
 
+    /**
+     * Returns if the type is Kafka or not
+     *
+     * @param type Input Type
+     * @return
+     */
+    public boolean isKafkaStreamType(String type) {
+        return type.equals(Constants.KAFKA_TYPE);
+    }
+
     /**
      * Returns name with underscore for empty input
      *
index f0d0e50..3db5ea0 100644 (file)
@@ -27,13 +27,11 @@ package org.onap.blueprintgenerator.service.common;
 
 import org.onap.blueprintgenerator.constants.Constants;
 import org.onap.blueprintgenerator.model.common.Appconfig;
-import org.onap.blueprintgenerator.model.common.Dmaap;
+import org.onap.blueprintgenerator.model.common.BaseStream;
 import org.onap.blueprintgenerator.model.common.GetInput;
 import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec;
 import org.onap.blueprintgenerator.model.componentspec.common.Calls;
 import org.onap.blueprintgenerator.model.componentspec.common.Parameters;
-import org.onap.blueprintgenerator.model.componentspec.common.Publishes;
-import org.onap.blueprintgenerator.model.componentspec.common.Subscribes;
 import org.onap.blueprintgenerator.service.base.BlueprintHelperService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -60,6 +58,7 @@ public class AppConfigService {
     @Autowired
     private StreamService streamService;
 
+
     /**
      * Creates Inputs section under App Config with Publishes, Subscribes, Parameters sections by
      * checking Datarouter/MessageRouter/override/Dmaap values
@@ -82,9 +81,9 @@ public class AppConfigService {
         Calls[] call = new Calls[0];
         appconfig.setService_calls(call);
 
-        Map<String, Dmaap> streamPublishes = streamService.createStreamPublishes(
+        Map<String, BaseStream> streamPublishes = streamService.createStreamPublishes(
             onapComponentSpec, blueprintHelperService, dmaapService, inputs, isDmaap);
-        Map<String, Dmaap> streamSubscribes = streamService.createStreamSubscribes(
+        Map<String, BaseStream> streamSubscribes = streamService.createStreamSubscribes(
             onapComponentSpec, blueprintHelperService, dmaapService, inputs, isDmaap);
 
         appconfig.setStreams_publishes(streamPublishes);
@@ -132,4 +131,5 @@ public class AppConfigService {
         response.put("inputs", inputs);
         return response;
     }
+
 }
index bd4cf87..f27ea48 100644 (file)
@@ -29,39 +29,45 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import org.onap.blueprintgenerator.constants.Constants;
+import org.onap.blueprintgenerator.model.common.BaseStream;
 import org.onap.blueprintgenerator.model.common.Dmaap;
 import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec;
 import org.onap.blueprintgenerator.model.componentspec.common.Publishes;
 import org.onap.blueprintgenerator.model.componentspec.common.Subscribes;
 import org.onap.blueprintgenerator.service.base.BlueprintHelperService;
+import org.onap.blueprintgenerator.service.common.kafka.KafkaStreamService;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
  * @author : Joanna Jeremicz
- * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service
- * to create publishes and subscribes streams
+ * @date 01/15/2021 Application: ONAP - Blueprint Generator Common ONAP Service to create publishes and subscribes
+ * streams
  */
 @Service("streamService")
 public class StreamService {
 
+    @Autowired
+    private KafkaStreamService kafkaStreamsService;
+
     /**
      * Creates publishes stream for given Inputs and ComponentSpec
      *
-     * @param onapComponentSpec Onap Component Specification
+     * @param onapComponentSpec      Onap Component Specification
      * @param blueprintHelperService Blueprint Helper Service
-     * @param dmaapService Dmaap Service
-     * @param inputs Inputs
-     * @param isDmaap Dmaap Argument
+     * @param dmaapService           Dmaap Service
+     * @param inputs                 Inputs
+     * @param isDmaap                Dmaap Argument
      * @return
      */
-    public Map<String, Dmaap> createStreamPublishes(
+    public Map<String, BaseStream> createStreamPublishes(
         OnapComponentSpec onapComponentSpec,
         BlueprintHelperService blueprintHelperService,
         DmaapService dmaapService,
         Map<String, LinkedHashMap<String, Object>> inputs,
         boolean isDmaap) {
 
-        Map<String, Dmaap> streamPublishes = new TreeMap<>();
+        Map<String, BaseStream> streamPublishes = new TreeMap<>();
         if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getPublishes() == null) {
             return streamPublishes;
         }
@@ -90,6 +96,9 @@ public class StreamService {
                 Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap");
                 dmaap.setType(publishes.getType());
                 streamPublishes.put(config, dmaap);
+            } else if (blueprintHelperService.isKafkaStreamType(publishes.getType())) {
+                inputs.putAll(kafkaStreamsService.createStreamPublishInputs(publishes.getConfig_key()));
+                streamPublishes.putAll(kafkaStreamsService.createAppPropertiesPublish(publishes.getConfig_key()));
             }
         }
         return streamPublishes;
@@ -98,21 +107,21 @@ public class StreamService {
     /**
      * Creates subscribes stream for given Inputs and ComponentSpec
      *
-     * @param onapComponentSpec Onap Component Specification
+     * @param onapComponentSpec      Onap Component Specification
      * @param blueprintHelperService Blueprint Helper Service
-     * @param dmaapService Dmaap Service
-     * @param inputs Inputs
-     * @param isDmaap Dmaap Argument
+     * @param dmaapService           Dmaap Service
+     * @param inputs                 Inputs
+     * @param isDmaap                Dmaap Argument
      * @return
      */
-    public Map<String, Dmaap> createStreamSubscribes(
+    public Map<String, BaseStream> createStreamSubscribes(
         OnapComponentSpec onapComponentSpec,
         BlueprintHelperService blueprintHelperService,
         DmaapService dmaapService,
         Map<String, LinkedHashMap<String, Object>> inputs,
         boolean isDmaap) {
 
-        Map<String, Dmaap> streamSubscribes = new TreeMap<>();
+        Map<String, BaseStream> streamSubscribes = new TreeMap<>();
         if (onapComponentSpec.getStreams() == null || onapComponentSpec.getStreams().getSubscribes() == null) {
             return streamSubscribes;
         }
@@ -141,7 +150,12 @@ public class StreamService {
                 Dmaap dmaap = (Dmaap) dmaapDataRouterResponse.get("dmaap");
                 dmaap.setType(subscribes.getType());
                 streamSubscribes.put(config, dmaap);
+            } else if (blueprintHelperService.isKafkaStreamType(subscribes.getType())) {
+                inputs.putAll(kafkaStreamsService.createStreamSubscribeInputs(subscribes.getConfig_key()));
+                streamSubscribes.putAll(kafkaStreamsService.createAppPropertiesSubscribe(subscribes.getConfig_key()));
             }
+
+
         }
         return streamSubscribes;
     }
diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/AafCredential.java
new file mode 100644 (file)
index 0000000..072a54c
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ *
+ *  * ============LICENSE_START=======================================================
+ *  *  org.onap.dcae
+ *  *  ================================================================================
+ *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
+ *  *  ================================================================================
+ *  *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  *  you may not use this file except in compliance with the License.
+ *  *  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *  ============LICENSE_END=========================================================
+ *
+ *
+ */
+
+package org.onap.blueprintgenerator.service.common.kafka;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import lombok.Data;
+import org.onap.blueprintgenerator.model.common.GetInput;
+
+/**
+ * @author : Tomasz Wrobel
+ * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator
+ * Applications Common Model: A model class which represents AafCredential
+ */
+
+@Data
+@JsonInclude(value = Include.NON_NULL)
+public class AafCredential {
+
+    private GetInput username;
+
+    private GetInput password;
+
+    public AafCredential(String usernameInput, String passwordInput) {
+
+        this.username = new GetInput(usernameInput);
+
+        this.password = new GetInput(passwordInput);
+
+    }
+}
diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaCommonConstants.java
new file mode 100644 (file)
index 0000000..0a040be
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ *
+ *  * ============LICENSE_START=======================================================
+ *  *  org.onap.dcae
+ *  *  ================================================================================
+ *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
+ *  *  ================================================================================
+ *  *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  *  you may not use this file except in compliance with the License.
+ *  *  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *  ============LICENSE_END=========================================================
+ *
+ *
+ */
+
+package org.onap.blueprintgenerator.service.common.kafka;
+
+/**
+ * @author : Tomasz Wrobel
+ * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator
+ * Class which contains Kafka Constants
+ */
+
+public class KafkaCommonConstants {
+
+    public static final String KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME = "kafka_bootstrap_servers";
+
+    public static final String AFF_KAFKA_USER_INPUT_NAME = "kafka_username";
+    public static final String AAF_KAFKA_PASSWORD_INPUT_NAME = "kafka_password";
+
+}
diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaInfo.java
new file mode 100644 (file)
index 0000000..c03eaf0
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ *
+ *  * ============LICENSE_START=======================================================
+ *  *  org.onap.dcae
+ *  *  ================================================================================
+ *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
+ *  *  ================================================================================
+ *  *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  *  you may not use this file except in compliance with the License.
+ *  *  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *  ============LICENSE_END=========================================================
+ *
+ *
+ */
+
+package org.onap.blueprintgenerator.service.common.kafka;
+
+import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import lombok.Data;
+import org.onap.blueprintgenerator.model.common.GetInput;
+
+
+/**
+ * @author : Tomasz Wrobel
+ * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator
+ * Applications Common Model: A model class which represents Kafka Info
+ */
+
+@Data
+@JsonInclude(value = Include.NON_NULL)
+public class KafkaInfo {
+
+    private GetInput bootstrap_servers;
+
+    private GetInput topic_name;
+
+    public KafkaInfo(String topicName) {
+
+        this.bootstrap_servers = new GetInput(KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME);
+
+        this.topic_name = new GetInput(topicName);
+
+    }
+
+}
diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStream.java
new file mode 100644 (file)
index 0000000..ca92f69
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ *
+ *  * ============LICENSE_START=======================================================
+ *  *  org.onap.dcae
+ *  *  ================================================================================
+ *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
+ *  *  ================================================================================
+ *  *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  *  you may not use this file except in compliance with the License.
+ *  *  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *  ============LICENSE_END=========================================================
+ *
+ *
+ */
+
+
+package org.onap.blueprintgenerator.service.common.kafka;
+
+import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AAF_KAFKA_PASSWORD_INPUT_NAME;
+import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AFF_KAFKA_USER_INPUT_NAME;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+import org.onap.blueprintgenerator.model.common.BaseStream;
+
+
+/**
+ * @author : Tomasz Wrobel
+ * @date 01/18/2021 Application: DCAE/ONAP - Blueprint Generator
+ * Applications Common Model: A model class which represents Kafka Stream
+ */
+
+@Data
+@JsonInclude(value = JsonInclude.Include.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KafkaStream implements BaseStream {
+
+    private final String type = "kafka";
+
+    @JsonProperty("aaf_credentials")
+    private AafCredential aafCredential;
+
+    @JsonProperty("kafka_info")
+    private KafkaInfo kafkaInfo;
+
+    public KafkaStream(String topicName) {
+        this.aafCredential = new AafCredential(AFF_KAFKA_USER_INPUT_NAME, AAF_KAFKA_PASSWORD_INPUT_NAME);
+        this.kafkaInfo = new KafkaInfo(topicName);
+    }
+}
diff --git a/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java b/mod/bpgenerator/onap/src/main/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamService.java
new file mode 100644 (file)
index 0000000..2090ef0
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ *
+ *  * ============LICENSE_START=======================================================
+ *  *  org.onap.dcae
+ *  *  ================================================================================
+ *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
+ *  *  ================================================================================
+ *  *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  *  you may not use this file except in compliance with the License.
+ *  *  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *  ============LICENSE_END=========================================================
+ *
+ *
+ */
+
+package org.onap.blueprintgenerator.service.common.kafka;
+
+
+import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AAF_KAFKA_PASSWORD_INPUT_NAME;
+import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.AFF_KAFKA_USER_INPUT_NAME;
+import static org.onap.blueprintgenerator.service.common.kafka.KafkaCommonConstants.KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.onap.blueprintgenerator.service.base.BlueprintHelperService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author : Tomasz Wrobel
+ * @date 18/01/2021 Application: ONAP - Blueprint Generator Common ONAP Service used to create Kafka Stream application
+ * config object and Kafka Stream inputs
+ */
+@Service
+public class KafkaStreamService {
+
+    private static final String PUBLISH_URL_SUFFIX = "_publish_url";
+    private static final String SUBSCRIBE_URL_SUFFIX = "_subscribe_url";
+    private static final String DEFAULT_STREAM_URL = "sample_stream_url";
+    private static final String DEFAULT_BOOTSTRAP_SERVER = "message-router-kafka:9092";
+    private static final String DEFAULT_AAF_USER = "admin";
+    private static final String DEFAULT_AAF_PASSWORD = "admin_secret";
+
+    @Autowired
+    private BlueprintHelperService blueprintHelperService;
+
+
+    /**
+     * Creates publish stream inputs for given streamName
+     *
+     * @param streamName Stream name
+     * @return
+     */
+    public Map<String, LinkedHashMap<String, Object>> createStreamPublishInputs(String streamName) {
+        return createStreamInputs(streamName + PUBLISH_URL_SUFFIX);
+    }
+
+    /**
+     * Creates subscribe stream inputs for given streamName
+     *
+     * @param streamName Stream name
+     * @return
+     */
+    public Map<String, LinkedHashMap<String, Object>> createStreamSubscribeInputs(String streamName) {
+        return createStreamInputs(streamName + SUBSCRIBE_URL_SUFFIX);
+    }
+
+    /**
+     * Creates Application properties publish stream object for given streamName
+     *
+     * @param streamName Stream name
+     * @return
+     */
+    public Map<String, KafkaStream> createAppPropertiesPublish(String streamName) {
+
+        LinkedHashMap<String, KafkaStream> kafkaStreamMap = new LinkedHashMap<>();
+        KafkaStream kafkaStream = createAppProperties(streamName, PUBLISH_URL_SUFFIX);
+
+        kafkaStreamMap.put(streamName, kafkaStream);
+
+        return kafkaStreamMap;
+    }
+
+    /**
+     * Creates Application properties subscribe stream object for given streamName
+     *
+     * @param streamName Stream name
+     * @return
+     */
+    public Map<String, KafkaStream> createAppPropertiesSubscribe(String streamName) {
+
+        LinkedHashMap<String, KafkaStream> kafkaStreamMap = new LinkedHashMap<>();
+        KafkaStream kafkaStream = createAppProperties(streamName, SUBSCRIBE_URL_SUFFIX);
+
+        kafkaStreamMap.put(streamName, kafkaStream);
+
+        return kafkaStreamMap;
+    }
+
+    private KafkaStream createAppProperties(String streamName, String urlSuffix) {
+        String topicName = streamName + urlSuffix;
+
+        return new KafkaStream(topicName);
+    }
+
+    private Map<String, LinkedHashMap<String, Object>> createStreamInputs(String streamName) {
+        LinkedHashMap<String, LinkedHashMap<String, Object>> streamInputs = createBaseInputs();
+        LinkedHashMap<String, Object> stream =
+            blueprintHelperService.createStringInput(DEFAULT_STREAM_URL);
+        streamInputs.put(streamName, stream);
+        return streamInputs;
+    }
+
+    private LinkedHashMap<String, LinkedHashMap<String, Object>> createBaseInputs() {
+        LinkedHashMap<String, LinkedHashMap<String, Object>> baseInputs = new LinkedHashMap<>();
+
+        LinkedHashMap<String, Object> kafka_message_router = blueprintHelperService
+            .createStringInput(DEFAULT_BOOTSTRAP_SERVER);
+        baseInputs.put(KAFKA_INFO_BOOTSTRAP_SERVERS_INPUT_NAME, kafka_message_router);
+
+        LinkedHashMap<String, Object> kafka_username = blueprintHelperService.createStringInput(DEFAULT_AAF_USER);
+        baseInputs.put(AFF_KAFKA_USER_INPUT_NAME, kafka_username);
+
+        LinkedHashMap<String, Object> kafka_password = blueprintHelperService.createStringInput(DEFAULT_AAF_PASSWORD);
+        baseInputs.put(AAF_KAFKA_PASSWORD_INPUT_NAME, kafka_password);
+
+        return baseInputs;
+    }
+}
index 09bb176..fe93f5f 100644 (file)
@@ -35,6 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.onap.blueprintgenerator.model.common.BaseStream;
 import org.onap.blueprintgenerator.model.common.Dmaap;
 import org.onap.blueprintgenerator.model.componentspec.OnapComponentSpec;
 import org.onap.blueprintgenerator.model.componentspec.common.Publishes;
@@ -66,7 +67,7 @@ class StreamServiceTest {
     void whenStreamsIsNullCreateStreamPublishesShouldReturnEmptyMap() {
         when(onapComponentSpecMock.getStreams()).thenReturn(null);
 
-        Map<String, Dmaap> streamPublishes = streamService.createStreamPublishes(
+        Map<String, BaseStream> streamPublishes = streamService.createStreamPublishes(
             onapComponentSpecMock,
             blueprintHelperServiceMock,
             dmaapServiceMock,
@@ -81,7 +82,7 @@ class StreamServiceTest {
         when(streamsMock.getPublishes()).thenReturn(null);
         when(onapComponentSpecMock.getStreams()).thenReturn(streamsMock);
 
-        Map<String, Dmaap> streamPublishes = streamService.createStreamPublishes(
+        Map<String, BaseStream> streamPublishes = streamService.createStreamPublishes(
             onapComponentSpecMock,
             blueprintHelperServiceMock,
             dmaapServiceMock,
@@ -99,21 +100,22 @@ class StreamServiceTest {
 
         DmaapService dmaapService = new DmaapService();
 
-        Map<String, Dmaap> streamPublishes = streamService.createStreamPublishes(
+        Map<String, BaseStream> streamPublishes = streamService.createStreamPublishes(
             onapComponentSpecMock,
             blueprintHelperServiceMock,
             dmaapService,
             createInputs(),
             true);
 
-        Map<String, Dmaap> expectedMap = createExpectedMap("_feed");
+        Map<String, BaseStream> expectedMap = createExpectedMap("_feed");
 
         assertNotNull(streamPublishes);
         assertEquals(expectedMap.size(), streamPublishes.size());
-        for(Map.Entry<String, Dmaap> entry : expectedMap.entrySet()) {
+        for (Map.Entry<String, BaseStream> entry : expectedMap.entrySet()) {
             assertTrue(streamPublishes.containsKey(entry.getKey()));
-            assertTrue(streamPublishes.get(entry.getKey()).getType().equals(entry.getValue().getType()));
-            assertTrue(streamPublishes.get(entry.getKey()).getDmaap_info().equals(entry.getValue().getDmaap_info()));
+            assertEquals(streamPublishes.get(entry.getKey()).getType(), entry.getValue().getType());
+            assertEquals(((Dmaap) streamPublishes.get(entry.getKey())).getDmaap_info(),
+                ((Dmaap) entry.getValue()).getDmaap_info());
         }
     }
 
@@ -125,21 +127,22 @@ class StreamServiceTest {
 
         DmaapService dmaapService = new DmaapService();
 
-        Map<String, Dmaap> streamPublishes = streamService.createStreamPublishes(
+        Map<String, BaseStream> streamPublishes = streamService.createStreamPublishes(
             onapComponentSpecMock,
             blueprintHelperServiceMock,
             dmaapService,
             createInputs(),
             true);
 
-        Map<String, Dmaap> expectedMap = createExpectedMap("_topic");
+        Map<String, BaseStream> expectedMap = createExpectedMap("_topic");
 
         assertNotNull(streamPublishes);
         assertEquals(expectedMap.size(), streamPublishes.size());
-        for(Map.Entry<String, Dmaap> entry : expectedMap.entrySet()) {
+        for (Map.Entry<String, BaseStream> entry : expectedMap.entrySet()) {
             assertTrue(streamPublishes.containsKey(entry.getKey()));
-            assertTrue(streamPublishes.get(entry.getKey()).getType().equals(entry.getValue().getType()));
-            assertTrue(streamPublishes.get(entry.getKey()).getDmaap_info().equals(entry.getValue().getDmaap_info()));
+            assertEquals(streamPublishes.get(entry.getKey()).getType(), entry.getValue().getType());
+            assertEquals(((Dmaap) streamPublishes.get(entry.getKey())).getDmaap_info(),
+                ((Dmaap) entry.getValue()).getDmaap_info());
         }
     }
 
@@ -147,7 +150,7 @@ class StreamServiceTest {
     void whenStreamsIsNullCreateStreamSubscribesShouldReturnEmptyMap() {
         when(onapComponentSpecMock.getStreams()).thenReturn(null);
 
-        Map<String, Dmaap> streamSubscribes = streamService.createStreamSubscribes(
+        Map<String, BaseStream> streamSubscribes = streamService.createStreamSubscribes(
             onapComponentSpecMock,
             blueprintHelperServiceMock,
             dmaapServiceMock,
@@ -162,7 +165,7 @@ class StreamServiceTest {
         when(streamsMock.getPublishes()).thenReturn(null);
         when(onapComponentSpecMock.getStreams()).thenReturn(streamsMock);
 
-        Map<String, Dmaap> streamSubscribes = streamService.createStreamSubscribes(
+        Map<String, BaseStream> streamSubscribes = streamService.createStreamSubscribes(
             onapComponentSpecMock,
             blueprintHelperServiceMock,
             dmaapServiceMock,
@@ -180,21 +183,22 @@ class StreamServiceTest {
 
         DmaapService dmaapService = new DmaapService();
 
-        Map<String, Dmaap> streamSubscribes = streamService.createStreamSubscribes(
+        Map<String, BaseStream> streamSubscribes = streamService.createStreamSubscribes(
             onapComponentSpecMock,
             blueprintHelperServiceMock,
             dmaapService,
             createInputs(),
             true);
 
-        Map<String, Dmaap> expectedMap = createExpectedMap("_feed");
+        Map<String, BaseStream> expectedMap = createExpectedMap("_feed");
 
         assertNotNull(streamSubscribes);
         assertEquals(expectedMap.size(), streamSubscribes.size());
-        for(Map.Entry<String, Dmaap> entry : expectedMap.entrySet()) {
+        for (Map.Entry<String, BaseStream> entry : expectedMap.entrySet()) {
             assertTrue(streamSubscribes.containsKey(entry.getKey()));
-            assertTrue(streamSubscribes.get(entry.getKey()).getType().equals(entry.getValue().getType()));
-            assertTrue(streamSubscribes.get(entry.getKey()).getDmaap_info().equals(entry.getValue().getDmaap_info()));
+            assertEquals(streamSubscribes.get(entry.getKey()).getType(), entry.getValue().getType());
+            assertEquals(((Dmaap) streamSubscribes.get(entry.getKey())).getDmaap_info(),
+                ((Dmaap) entry.getValue()).getDmaap_info());
         }
     }
 
@@ -206,26 +210,27 @@ class StreamServiceTest {
 
         DmaapService dmaapService = new DmaapService();
 
-        Map<String, Dmaap> streamSubscribes = streamService.createStreamSubscribes(
+        Map<String, BaseStream> streamSubscribes = streamService.createStreamSubscribes(
             onapComponentSpecMock,
             blueprintHelperServiceMock,
             dmaapService,
             createInputs(),
             true);
 
-        Map<String, Dmaap> expectedMap = createExpectedMap("_topic");
+        Map<String, BaseStream> expectedMap = createExpectedMap("_topic");
 
         assertNotNull(streamSubscribes);
         assertEquals(expectedMap.size(), streamSubscribes.size());
-        for(Map.Entry<String, Dmaap> entry : expectedMap.entrySet()) {
+        for (Map.Entry<String, BaseStream> entry : expectedMap.entrySet()) {
             assertTrue(streamSubscribes.containsKey(entry.getKey()));
-            assertTrue(streamSubscribes.get(entry.getKey()).getType().equals(entry.getValue().getType()));
-            assertTrue(streamSubscribes.get(entry.getKey()).getDmaap_info().equals(entry.getValue().getDmaap_info()));
+            assertEquals(streamSubscribes.get(entry.getKey()).getType(), entry.getValue().getType());
+            assertEquals(((Dmaap) streamSubscribes.get(entry.getKey())).getDmaap_info(),
+                ((Dmaap) entry.getValue()).getDmaap_info());
         }
     }
 
-    private Map<String, Dmaap> createExpectedMap(String suffix) {
-        Map<String, Dmaap> expectedMap = new HashMap<>();
+    private Map<String, BaseStream> createExpectedMap(String suffix) {
+        Map<String, BaseStream> expectedMap = new HashMap<>();
         Dmaap dmaap1 = new Dmaap();
         dmaap1.setType("t1");
         dmaap1.setDmaap_info("<<k1" + suffix + ">>");
@@ -260,21 +265,21 @@ class StreamServiceTest {
         return new Subscribes[]{sub1, sub2, sub3};
     }
 
-    private Publishes createPublishes(String key, String type){
+    private Publishes createPublishes(String key, String type) {
         Publishes publishes = new Publishes();
         publishes.setConfig_key(key);
         publishes.setType(type);
         return publishes;
     }
 
-    private Subscribes createSubscribes(String key, String type){
+    private Subscribes createSubscribes(String key, String type) {
         Subscribes subscribes = new Subscribes();
         subscribes.setConfig_key(key);
         subscribes.setType(type);
         return subscribes;
     }
 
-    private Map<String, LinkedHashMap<String, Object>> createInputs(){
+    private Map<String, LinkedHashMap<String, Object>> createInputs() {
         LinkedHashMap<String, Object> map = new LinkedHashMap<>();
         map.put("key-1", "obj-1");
 
diff --git a/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamServiceTest.java b/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamServiceTest.java
new file mode 100644 (file)
index 0000000..cad3b71
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ *
+ *  * ============LICENSE_START=======================================================
+ *  *  org.onap.dcae
+ *  *  ================================================================================
+ *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
+ *  *  ================================================================================
+ *  *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  *  you may not use this file except in compliance with the License.
+ *  *  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *  ============LICENSE_END=========================================================
+ *
+ *
+ */
+
+package org.onap.blueprintgenerator.service.common.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.junit.Test;
+
+import org.junit.runner.RunWith;
+import org.onap.blueprintgenerator.service.base.BlueprintHelperService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.ConfigFileApplicationContextInitializer;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = {KafkaStreamService.class, BlueprintHelperService.class},
+    initializers = ConfigFileApplicationContextInitializer.class)
+public class KafkaStreamServiceTest {
+
+    private static final String TEST_STREAM_NAME = "test_stream_name";
+    private static final String PUBLISH_URL_SUFFIX = "_publish_url";
+    private static final String SUBSCRIBE_URL_SUFFIX = "_subscribe_url";
+    private static final String DEFAULT_KEY = "default";
+    private static final String KAFKA_TYPE = "kafka";
+
+    @Autowired
+    private KafkaStreamService kafkaStreamService;
+
+    @Test
+    public void createCorrectStreamCommonInputs() {
+
+        Map<String, LinkedHashMap<String, Object>> publishInputs = kafkaStreamService
+            .createStreamPublishInputs("test_stream_name");
+
+        LinkedHashMap<String, Object> kafka_bootstrap_servers = publishInputs.get("kafka_bootstrap_servers");
+        LinkedHashMap<String, Object> kafka_username = publishInputs.get("kafka_username");
+        LinkedHashMap<String, Object> kafka_password = publishInputs.get("kafka_password");
+
+        assertNotNull(kafka_bootstrap_servers);
+        assertNotNull(kafka_username);
+        assertNotNull(kafka_password);
+
+        assertNotNull(kafka_bootstrap_servers.get(DEFAULT_KEY));
+        assertNotNull(kafka_username.get(DEFAULT_KEY));
+        assertNotNull(kafka_password.get(DEFAULT_KEY));
+    }
+
+    @Test
+    public void createCorrectStreamPublishInput() {
+        Map<String, LinkedHashMap<String, Object>> publishInputs = kafkaStreamService
+            .createStreamPublishInputs(TEST_STREAM_NAME);
+
+        LinkedHashMap<String, Object> kafka_stream_name = publishInputs.get(TEST_STREAM_NAME + PUBLISH_URL_SUFFIX);
+
+        assertNotNull(kafka_stream_name);
+
+        assertNotNull(kafka_stream_name.get(DEFAULT_KEY));
+    }
+
+    @Test
+    public void createCorrectStreamSubscribeInput() {
+        Map<String, LinkedHashMap<String, Object>> publishInputs = kafkaStreamService
+            .createStreamSubscribeInputs(TEST_STREAM_NAME);
+
+        LinkedHashMap<String, Object> kafka_stream_name = publishInputs.get(TEST_STREAM_NAME + SUBSCRIBE_URL_SUFFIX);
+
+        assertNotNull(kafka_stream_name);
+
+        assertNotNull(kafka_stream_name.get(DEFAULT_KEY));
+    }
+
+    @Test
+    public void createCorrectPublishAppConfig() {
+        Map<String, KafkaStream> appPropertiesPublish = kafkaStreamService
+            .createAppPropertiesPublish(TEST_STREAM_NAME);
+
+        KafkaStream kafkaStream = appPropertiesPublish.get(TEST_STREAM_NAME);
+
+        assertEquals(KAFKA_TYPE, kafkaStream.getType());
+        assertNotNull(kafkaStream.getAafCredential());
+        assertNotNull(kafkaStream.getKafkaInfo());
+        assertTrue(kafkaStream.getKafkaInfo().toString().contains(TEST_STREAM_NAME + PUBLISH_URL_SUFFIX));
+
+    }
+
+    @Test
+    public void createCorrectSubscribeAppConfig() {
+        Map<String, KafkaStream> appPropertiesSubscribe = kafkaStreamService
+            .createAppPropertiesSubscribe(TEST_STREAM_NAME);
+
+        KafkaStream kafkaStream = appPropertiesSubscribe.get(TEST_STREAM_NAME);
+
+        assertEquals(KAFKA_TYPE, kafkaStream.getType());
+        assertNotNull(kafkaStream.getAafCredential());
+        assertNotNull(kafkaStream.getKafkaInfo());
+        assertTrue(kafkaStream.getKafkaInfo().toString().contains(TEST_STREAM_NAME + SUBSCRIBE_URL_SUFFIX));
+
+    }
+}
diff --git a/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamTest.java b/mod/bpgenerator/onap/src/test/java/org/onap/blueprintgenerator/service/common/kafka/KafkaStreamTest.java
new file mode 100644 (file)
index 0000000..2ba2274
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ *
+ *  * ============LICENSE_START=======================================================
+ *  *  org.onap.dcae
+ *  *  ================================================================================
+ *  *  Copyright (c) 2021 Nokia Intellectual Property. All rights reserved.
+ *  *  ================================================================================
+ *  *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  *  you may not use this file except in compliance with the License.
+ *  *  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *  ============LICENSE_END=========================================================
+ *
+ *
+ */
+
+package org.onap.blueprintgenerator.service.common.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class KafkaStreamTest {
+
+    private final static String TEST_TOPIC_NAME = "test_topic";
+    private static final String GET_INPUT_KAFKA_USERNAME = "{\"get_input\":\"kafka_username\"}";
+    private static final String GET_INPUT_KAFKA_PASSWORD = "{\"get_input\":\"kafka_password\"}";
+    private static final String AAF_USERNAME = "username";
+    private static final String AAF_PASSWORD = "password";
+    private static final String AAF_CREDENTIAL_NODE = "aaf_credentials";
+    private static final String KAFKA_TYPE_NODE = "type";
+    private static final String EXPECTED_KAFKA_TYPE = "\"kafka\"";
+    private static final String KAFKA_BOOTSTRAP_SERVERS = "bootstrap_servers";
+    private static final String KAFKA_TOPIC_NAME = "topic_name";
+    private static final String EXPECTED_GET_INPUT_TOPIC = "{\"get_input\":\"" + TEST_TOPIC_NAME + "\"}";
+    private static final String EXPECTED_GET_INPUT_BOOTSTRAP_SERVERS = "{\"get_input\":\"kafka_bootstrap_servers\"}";
+    private static final String KAFKA_INFO_NODE = "kafka_info";
+
+    private KafkaStream kafkaStream;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @Before
+    public void setUp() {
+        kafkaStream = new KafkaStream(TEST_TOPIC_NAME);
+    }
+
+    @Test
+    public void kafkaStreamHasCorrectAafCredential() throws IOException {
+
+        String kafkaStreamJson = mapper.writeValueAsString(kafkaStream);
+
+        JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson);
+        JsonNode aafCredential = kafkaStreamNode.get(AAF_CREDENTIAL_NODE);
+
+        assertNotNull(aafCredential);
+        assertEquals(GET_INPUT_KAFKA_USERNAME, aafCredential.get(AAF_USERNAME).toString());
+        assertEquals(GET_INPUT_KAFKA_PASSWORD, aafCredential.get(AAF_PASSWORD).toString());
+    }
+
+    @Test
+    public void kafkaStreamHasCorrectKafkaInfo() throws IOException {
+
+        String kafkaStreamJson = mapper.writeValueAsString(kafkaStream);
+
+        JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson);
+        JsonNode kafkaInfo = kafkaStreamNode.get(KAFKA_INFO_NODE);
+
+        assertNotNull(kafkaInfo);
+        assertEquals(EXPECTED_GET_INPUT_BOOTSTRAP_SERVERS, kafkaInfo.get(KAFKA_BOOTSTRAP_SERVERS).toString());
+        assertEquals(EXPECTED_GET_INPUT_TOPIC, kafkaInfo.get(KAFKA_TOPIC_NAME).toString());
+
+    }
+
+    @Test
+    public void kafkaStreamHasCorrectType() throws IOException {
+
+        String kafkaStreamJson = mapper.writeValueAsString(kafkaStream);
+
+        JsonNode kafkaStreamNode = mapper.readTree(kafkaStreamJson);
+        JsonNode kafkaType = kafkaStreamNode.get(KAFKA_TYPE_NODE);
+
+        assertNotNull(kafkaType);
+        assertEquals(EXPECTED_KAFKA_TYPE, kafkaType.toString());
+    }
+
+}