Implement Kafka stream definition parsers 21/82221/6
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 14 Mar 2019 07:38:15 +0000 (08:38 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 14 Mar 2019 12:58:20 +0000 (13:58 +0100)
Change-Id: I43215c1c2494b6036deb004891fb76bfd2b74474
Issue-ID: DCAEGEN2-1341
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
25 files changed:
rest-services/cbs-client/pom.xml
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/kafka_sink.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json [new file with mode: 0644]

index 70c11a1..9544a7f 100644 (file)
@@ -1,58 +1,57 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
 
-  <parent>
-    <groupId>org.onap.dcaegen2.services.sdk</groupId>
-    <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
-    <version>1.1.4-SNAPSHOT</version>
-  </parent>
+    <parent>
+        <groupId>org.onap.dcaegen2.services.sdk</groupId>
+        <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
+        <version>1.1.4-SNAPSHOT</version>
+    </parent>
 
-  <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-  <artifactId>cbs-client</artifactId>
+    <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+    <artifactId>cbs-client</artifactId>
 
-  <name>dcaegen2-services-sdk-rest-services-cbs-client</name>
-  <description>Config Binding Service Rest Services Module</description>
-  <packaging>jar</packaging>
+    <name>dcaegen2-services-sdk-rest-services-cbs-client</name>
+    <description>Config Binding Service Rest Services Module</description>
+    <packaging>jar</packaging>
 
-  <dependencies>
-    <dependency>
-      <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-      <artifactId>common-dependency</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>io.vavr</groupId>
-      <artifactId>vavr
-      </artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.jetbrains</groupId>
-      <artifactId>annotations</artifactId>
-    </dependency>
+    <dependencies>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>common-dependency</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.vavr</groupId>
+            <artifactId>vavr</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains</groupId>
+            <artifactId>annotations</artifactId>
+        </dependency>
 
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.junit.jupiter</groupId>
-      <artifactId>junit-jupiter-engine</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.assertj</groupId>
-      <artifactId>assertj-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>io.projectreactor</groupId>
-      <artifactId>reactor-test</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 
 </project>
index 214f0e4..cbdea00 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions;
 
+import io.vavr.control.Either;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
@@ -30,6 +31,14 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 public class StreamParserError {
     private final String message;
 
+    public static <R> Either<StreamParserError, R> left(Throwable ex) {
+        return Either.left(fromThrowable(ex));
+    }
+
+    public static StreamParserError fromThrowable(Throwable ex) {
+        return new StreamParserError(ex.getMessage());
+    }
+
     public StreamParserError(String message) {
         this.message = message;
     }
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
new file mode 100644 (file)
index 0000000..f18f217
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import com.google.gson.JsonObject;
+import io.vavr.control.Either;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+public interface StreamFromGsonParser<S extends DataStream> extends StreamParser<JsonObject, S> {
+
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
new file mode 100644 (file)
index 0000000..4b0223f
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public final class StreamFromGsonParsers {
+
+    private StreamFromGsonParsers() {
+    }
+
+    public static StreamFromGsonParser<KafkaSink> kafkaSinkParser() {
+        return KafkaSinkParser.create();
+    }
+
+    public static StreamFromGsonParser<KafkaSource> kafkaSourceParser() {
+        return KafkaSourceParser.create();
+    }
+}
index 9ba7047..3467c80 100644 (file)
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
 
 import io.vavr.control.Either;
+import io.vavr.control.Try;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener.MerkleTree;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
 
 /**
+ * A generic data stream parser which parses {@code T} to data stream {@code S}.
+ *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since 1.1.2
+ * @param <T> input data type, eg. Gson Object
+ * @param <S> output data type
+ * @since 1.1.3
  */
 @ExperimentalApi
-public interface StreamParser<S extends DataStream> {
+public interface StreamParser<T, S extends DataStream> {
 
-    Either<StreamParserError, S> parse(MerkleTree<String> subtree);
+    /**
+     * Parse the input data {@code T} producing the {@link DataStream}.
+     *
+     * @param input - the input data
+     * @return Right(parsing result) or Left(parsing error)
+     */
+    default Either<StreamParserError, S> parse(T input) {
+        return Try.of(() -> unsafeParse(input))
+                .toEither()
+                .mapLeft(StreamParserError::fromThrowable);
+    }
 
-    default S unsafeParse(MerkleTree<String> subtree) {
-        return parse(subtree).getOrElseThrow(StreamParsingException::new);
+    /**
+     * Parse the input data {@code T} producing the {@link DataStream}. Will throw StreamParsingException when input
+     * was invalid.
+     *
+     * @param input - the input data
+     * @return parsing result
+     * @throws StreamParsingException when parsing was unsuccessful
+     */
+    default S unsafeParse(T input) {
+        return parse(input).getOrElseThrow(StreamParsingException::new);
     }
 }
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java
new file mode 100644 (file)
index 0000000..ecafd30
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import java.util.Objects;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+abstract class GsonKafka implements Kafka {
+
+    protected final KafkaInfo kafkaInfo;
+    private final AafCredentials aafCredentials;
+
+    GsonKafka(@NotNull KafkaInfo kafkaInfo,
+            @Nullable AafCredentials aafCredentials) {
+        this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo");
+        this.aafCredentials = aafCredentials;
+    }
+
+    @Override
+    public String bootstrapServers() {
+        return kafkaInfo.bootstrapServers();
+    }
+
+    @Override
+    public String topicName() {
+        return kafkaInfo.topicName();
+    }
+
+    @Override
+    public @Nullable AafCredentials aafCredentials() {
+        return aafCredentials;
+    }
+
+    @Override
+    public @Nullable String clientRole() {
+        return kafkaInfo.clientRole();
+    }
+
+    @Override
+    public @Nullable String clientId() {
+        return kafkaInfo.clientId();
+    }
+
+    @Override
+    public int maxPayloadSizeBytes() {
+        return kafkaInfo.maxPayloadSizeBytes();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        GsonKafka gsonKafka = (GsonKafka) o;
+        return kafkaInfo.equals(gsonKafka.kafkaInfo) &&
+                Objects.equals(aafCredentials, gsonKafka.aafCredentials);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(kafkaInfo, aafCredentials);
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java
new file mode 100644 (file)
index 0000000..c45f847
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class GsonKafkaSink extends GsonKafka implements KafkaSink {
+
+    GsonKafkaSink(
+            @NotNull KafkaInfo kafkaInfo,
+            @Nullable AafCredentials aafCredentials) {
+        super(kafkaInfo, aafCredentials);
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java
new file mode 100644 (file)
index 0000000..1509d9d
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class GsonKafkaSource extends GsonKafka implements KafkaSource {
+
+    GsonKafkaSource(
+            @NotNull KafkaInfo kafkaInfo,
+            @Nullable AafCredentials aafCredentials) {
+        super(kafkaInfo, aafCredentials);
+    }
+
+    @Override
+    public @Nullable String consumerGroupId() {
+        return kafkaInfo.consumerGroupId();
+    }
+
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
new file mode 100644 (file)
index 0000000..a813607
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.Lazy;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+final class GsonUtils {
+    private static final Lazy<Gson> GSON = Lazy.of(() -> {
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo());
+        gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersAafCredentials());
+        return gsonBuilder.create();
+    });
+
+    private GsonUtils() {
+    }
+
+    static Gson gsonInstance() {
+        return GSON.get();
+    }
+
+    static void assertStreamType(JsonObject json, String expectedType) {
+        final String actualType = requiredString(json, "type");
+        if (!actualType.equals(expectedType)) {
+            throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'");
+        }
+    }
+
+    static String requiredString(JsonObject parent, String childName) {
+        return requiredChild(parent, childName).getAsString();
+    }
+
+    static JsonElement requiredChild(JsonObject parent, String childName) {
+        if (parent.has(childName)) {
+            return parent.get(childName);
+        } else {
+            throw new IllegalArgumentException(
+                    "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent));
+        }
+    }
+
+    static JsonObject readObjectFromResource(String resource) throws IOException {
+        return readFromResource(resource).getAsJsonObject();
+    }
+
+    static JsonElement readFromResource(String resource) throws IOException {
+        try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) {
+            return new JsonParser().parse(reader);
+        }
+    }
+
+    private static String stringifyChildrenNames(JsonObject parent) {
+        return parent.entrySet().stream().map(Entry::getKey).collect(Collectors.joining(", "));
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java
new file mode 100644 (file)
index 0000000..8b17a8d
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.annotations.SerializedName;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@Value.Immutable
+@Gson.TypeAdapters
+public interface KafkaInfo {
+
+    @SerializedName("bootstrap_servers")
+    String bootstrapServers();
+
+    @SerializedName("topic_name")
+    String topicName();
+
+    @SerializedName("consumer_group_id")
+    @Nullable String consumerGroupId();
+
+    @SerializedName("client_role")
+    @Nullable String clientRole();
+
+    @SerializedName("client_id")
+    @Nullable String clientId();
+
+    @Value.Default
+    @SerializedName("max_payload_size_bytes")
+    default int maxPayloadSizeBytes() {
+        return 1024 * 1024;
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java
new file mode 100644 (file)
index 0000000..393fe40
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
+
+    private final Gson gson;
+
+    public static KafkaSinkParser create() {
+        return new KafkaSinkParser(gsonInstance());
+    }
+
+    KafkaSinkParser(Gson gson) {
+        this.gson = gson;
+    }
+
+    @Override
+    public KafkaSink unsafeParse(JsonObject input) {
+        assertStreamType(input, "kafka");
+
+        final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+        final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+
+        return new GsonKafkaSink(kafkaInfo, null);
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java
new file mode 100644 (file)
index 0000000..8b48a2a
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public class KafkaSourceParser implements StreamFromGsonParser<KafkaSource> {
+    private final Gson gson;
+
+    public static KafkaSourceParser create() {
+        return new KafkaSourceParser(gsonInstance());
+    }
+
+    KafkaSourceParser(Gson gson) {
+        this.gson = gson;
+    }
+
+    @Override
+    public KafkaSource unsafeParse(JsonObject input) {
+        assertStreamType(input, "kafka");
+
+        final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+        final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+
+        return new GsonKafkaSource(kafkaInfo, null);
+    }
+}
index 1148156..ecb0b55 100644 (file)
@@ -21,6 +21,8 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
 
 
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
@@ -29,6 +31,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
  * @version 1.2.1
  */
 @ExperimentalApi
+@Value.Immutable
+@Gson.TypeAdapters
 public interface AafCredentials {
 
     @Nullable String username();
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
new file mode 100644 (file)
index 0000000..97f07a2
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+public interface Kafka {
+
+    String bootstrapServers();
+
+    String topicName();
+
+    @Nullable AafCredentials aafCredentials();
+
+    @Nullable String clientRole();
+
+    @Nullable String clientId();
+
+    @Value.Default
+    default int maxPayloadSizeBytes() {
+        return 1024 * 1024;
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java
new file mode 100644 (file)
index 0000000..514881f
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+
+import org.immutables.value.Value;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+@Value.Immutable
+public abstract interface KafkaSink extends Kafka, SinkStream {
+
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java
new file mode 100644 (file)
index 0000000..65280a9
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+@Value.Immutable
+public interface KafkaSource extends Kafka, SourceStream {
+
+    @Nullable String consumerGroupId();
+}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java
new file mode 100644 (file)
index 0000000..b548120
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.in;
+import static org.junit.jupiter.api.Assertions.*;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.Function1;
+import io.vavr.control.Either;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class KafkaSinkParserTest {
+
+    private final StreamFromGsonParser<KafkaSink> cut = StreamFromGsonParsers.kafkaSinkParser();
+
+    @Test
+    void precondition_assureInstanceOf() {
+        assertThat(cut).isInstanceOf(KafkaSinkParser.class);
+    }
+
+    @Test
+    void shouldParseMinimalKafkaSinkDefinition() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json");
+
+        // when
+        final KafkaSink result = cut.unsafeParse(input);
+
+        // then
+        assertThat(result.aafCredentials()).isNull();
+        assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+        assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+        assertThat(result.clientId()).isNull();
+        assertThat(result.clientRole()).isNull();
+    }
+
+    @Test
+    void shouldParseBasicKafkaSinkDefinition() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json");
+
+        // when
+        final KafkaSink result = cut.unsafeParse(input);
+
+        // then
+        assertThat(result.aafCredentials()).isNull();
+        assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+        assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+        assertThat(result.clientId()).isEqualTo("1500462518108");
+        assertThat(result.clientRole()).isEqualTo("com.dcae.member");
+    }
+
+    @Test
+    void shouldReturnErrorWhenStructureIsWrong() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json");
+
+        // when
+        final Either<StreamParserError, KafkaSink> result = cut.parse(input);
+
+        // then
+        assertThat(result.isRight()).describedAs("should not be right").isFalse();
+        result.peekLeft(error -> {
+            assertThat(error.message()).contains("kafka_info");
+        });
+    }
+
+    @Test
+    void shouldReturnErrorWhenTypeIsWrong() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json");
+
+        // when
+        final Either<StreamParserError, KafkaSink> result = cut.parse(input);
+
+        // then
+        assertThat(result.isRight()).describedAs("should not be right").isFalse();
+        result.peekLeft(error -> {
+            assertThat(error.message()).containsIgnoringCase("invalid stream type");
+            assertThat(error.message()).containsIgnoringCase("kafka");
+            assertThat(error.message()).containsIgnoringCase("message_router");
+        });
+    }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java
new file mode 100644 (file)
index 0000000..8713128
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class KafkaSourceParserTest {
+
+    private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
+
+    @Test
+    void precondition_assureInstanceOf() {
+        assertThat(cut).isInstanceOf(KafkaSourceParser.class);
+    }
+
+    @Test
+    void shouldParseMinimalKafkaSourceDefinition() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json");
+
+        // when
+        final KafkaSource result = cut.unsafeParse(input);
+
+        // then
+        assertThat(result.aafCredentials()).isNull();
+        assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+        assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+        assertThat(result.clientId()).isNull();
+        assertThat(result.clientRole()).isNull();
+    }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json
new file mode 100644 (file)
index 0000000..b60388d
--- /dev/null
@@ -0,0 +1,9 @@
+{
+    "type": "kafka",
+    "kafka_info": {
+        "client_role": "com.dcae.member",
+        "client_id": "1500462518108",
+        "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060",
+        "topic_name": "HVVES_PERF3GPP"
+    }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json
new file mode 100644 (file)
index 0000000..0ee88ad
--- /dev/null
@@ -0,0 +1,7 @@
+{
+    "type": "message_router",
+    "kafka_info": {
+        "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060",
+        "topic_name": "HVVES_PERF3GPP"
+    }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_minimal.json
new file mode 100644 (file)
index 0000000..da8dd4f
--- /dev/null
@@ -0,0 +1,7 @@
+{
+    "type": "kafka",
+    "kafka_info": {
+        "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060",
+        "topic_name": "HVVES_PERF3GPP"
+    }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink_missing_child.json
new file mode 100644 (file)
index 0000000..d2591b3
--- /dev/null
@@ -0,0 +1,3 @@
+{
+    "type": "kafka"
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source_minimal.json
new file mode 100644 (file)
index 0000000..da8dd4f
--- /dev/null
@@ -0,0 +1,7 @@
+{
+    "type": "kafka",
+    "kafka_info": {
+        "bootstrap_servers": "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060",
+        "topic_name": "HVVES_PERF3GPP"
+    }
+}