Kafka streams parsers - additions 57/82257/12
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 14 Mar 2019 13:34:35 +0000 (14:34 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 19 Mar 2019 14:06:15 +0000 (15:06 +0100)
Change-Id: I98ca661682b41d76d3de668d6faeb6ebe02f92a8
Issue-ID: DCAEGEN2-1341
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
46 files changed:
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java with 72% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java with 72% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java with 88% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java with 91% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java with 91% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java with 98% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java with 71% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java with 72% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java with 90% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java with 93% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java with 93% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java with 98% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java with 63% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java with 63% similarity]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java [deleted file]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java [moved from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java with 69% similarity]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java [moved from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java with 70% similarity]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java [moved from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java with 81% similarity]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java [moved from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java with 80% similarity]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java [moved from rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java with 68% similarity]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/sample_config.json
rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json [moved from rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json with 100% similarity]
rest-services/cbs-client/src/test/resources/streams/kafka_sink.json
rest-services/cbs-client/src/test/resources/streams/kafka_source.json [new file with mode: 0644]

index 36589da..989bd2d 100644 (file)
@@ -20,9 +20,9 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
 
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import reactor.core.publisher.Mono;
 
index 15c4eea..dfd0e2f 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
 
+import static java.lang.String.valueOf;
+
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import io.vavr.collection.List;
-import org.jetbrains.annotations.NotNull;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-
-import static java.lang.String.valueOf;
+import org.jetbrains.annotations.NotNull;
 
 
 /**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
new file mode 100644 (file)
index 0000000..4fdb31b
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.Stream;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+public final class DataStreams {
+
+    private DataStreams() {
+    }
+
+    public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) {
+        return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE);
+    }
+
+    public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) {
+        return createCollectionOfStreams(rootJson, DataStreamDirection.SINK);
+    }
+
+    private static Stream<RawDataStream<JsonObject>> createCollectionOfStreams(JsonObject rootJson, DataStreamDirection direction) {
+        final JsonElement streamsJson = rootJson.get(direction.configurationKey());
+        return streamsJson == null
+                ? Stream.empty()
+                : DataStreamUtils.mapJsonToStreams(streamsJson, direction);
+    }
+
+
+}
index f18f217..460d710 100644 (file)
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
 
 import com.google.gson.JsonObject;
-import io.vavr.control.Either;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
 
 /**
index 9d703bb..7ae92ba 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
 
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.*;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
+ * @since 1.1.4
  */
 public final class StreamFromGsonParsers {
 
index 3467c80..69016ed 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 
 /**
  * A generic data stream parser which parses {@code T} to data stream {@code S}.
@@ -33,7 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @param <T> input data type, eg. Gson Object
  * @param <S> output data type
- * @since 1.1.3
+ * @since 1.1.4
  */
 @ExperimentalApi
 public interface StreamParser<T, S extends DataStream> {
@@ -44,7 +45,7 @@ public interface StreamParser<T, S extends DataStream> {
      * @param input - the input data
      * @return Right(parsing result) or Left(parsing error)
      */
-    default Either<StreamParserError, S> parse(T input) {
+    default Either<StreamParserError, S> parse(RawDataStream<T> input) {
         return Try.of(() -> unsafeParse(input))
                 .toEither()
                 .mapLeft(StreamParserError::fromThrowable);
@@ -58,7 +59,7 @@ public interface StreamParser<T, S extends DataStream> {
      * @return parsing result
      * @throws StreamParsingException when parsing was unsuccessful
      */
-    default S unsafeParse(T input) {
+    default S unsafeParse(RawDataStream<T> input) {
         return parse(input).getOrElseThrow(StreamParsingException::new);
     }
 }
index 05bfc9b..9be08e3 100644 (file)
@@ -24,9 +24,9 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import reactor.core.publisher.Mono;
 
 public class CbsClientImpl implements CbsClient {
index 53d0bd3..89daebc 100644 (file)
@@ -22,12 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
-
 import java.net.InetSocketAddress;
-
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import reactor.core.publisher.Mono;
 
 /**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
new file mode 100644 (file)
index 0000000..d34b144
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.Stream;
+import java.io.IOException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public final class DataStreamUtils {
+
+    public static Stream<RawDataStream<JsonObject>> mapJsonToStreams(JsonElement streamsJson,
+            DataStreamDirection direction) {
+        return Stream.ofAll(streamsJson.getAsJsonObject().entrySet())
+                .map(namedSinkJson -> {
+                    final JsonObject jsonObject = namedSinkJson.getValue().getAsJsonObject();
+                    return rawDataStream(namedSinkJson.getKey(), direction, jsonObject);
+                });
+    }
+
+    public static void assertStreamType(
+            RawDataStream<JsonObject> json,
+            String expectedType,
+            DataStreamDirection expectedDirection) {
+        if (!json.type().equals(expectedType)) {
+            throw new IllegalArgumentException(
+                    "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'");
+        }
+        if (json.direction() != expectedDirection) {
+            throw new IllegalArgumentException(
+                    "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction()
+                            + "'");
+        }
+    }
+
+    public static RawDataStream<JsonObject> readSourceFromResource(String resource) throws IOException {
+        return rawDataStream(resource, DataStreamDirection.SOURCE, GsonUtils.readObjectFromResource(resource));
+    }
+
+    public  static RawDataStream<JsonObject> readSinkFromResource(String resource) throws IOException {
+        return rawDataStream(resource, DataStreamDirection.SINK, GsonUtils.readObjectFromResource(resource));
+    }
+
+    private static RawDataStream<JsonObject> rawDataStream(String name, DataStreamDirection direction, JsonObject json) {
+        return ImmutableRawDataStream.<JsonObject>builder()
+                .name(name)
+                .direction(direction)
+                .type(GsonUtils.requiredString(json, "type"))
+                .descriptor(json)
+                .build();
+    }
+}
index a088016..0b66228 100644 (file)
@@ -26,14 +26,14 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import io.vavr.Lazy;
-
+import io.vavr.control.Option;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
-
-import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource;
@@ -42,7 +42,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since March 2019
  */
-final class GsonUtils {
+public final class GsonUtils {
+
     private static final Lazy<Gson> GSON = Lazy.of(() -> {
         GsonBuilder gsonBuilder = new GsonBuilder();
         gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo());
@@ -56,39 +57,35 @@ final class GsonUtils {
     private GsonUtils() {
     }
 
-    static Gson gsonInstance() {
+    public  static Gson gsonInstance() {
         return GSON.get();
     }
 
-    static void assertStreamType(JsonObject json, String expectedType) {
-        final String actualType = requiredString(json, "type");
-        if (!actualType.equals(expectedType)) {
-            throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'");
-        }
-    }
-
-    static String requiredString(JsonObject parent, String childName) {
+    public  static String requiredString(JsonObject parent, String childName) {
         return requiredChild(parent, childName).getAsString();
     }
 
-    static Option<String> optionalString(JsonObject parent, String childName) {
+    public  static Option<String> optionalString(JsonObject parent, String childName) {
             return Option.of(parent.get(childName).getAsString());
     }
 
-    static JsonElement requiredChild(JsonObject parent, String childName) {
-        if (parent.has(childName)) {
-            return parent.get(childName);
-        } else {
-            throw new IllegalArgumentException(
-                    "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent));
-        }
+    public static JsonElement requiredChild(JsonObject parent, String childName) {
+        return optionalChild(parent, childName)
+                .getOrElseThrow(() -> new IllegalArgumentException(
+                        "Could not find sub-node '" + childName + "'. Actual sub-nodes: "
+                                + stringifyChildrenNames(parent)));
+
+    }
+
+    public  static Option<JsonElement> optionalChild(JsonObject parent, String childName) {
+        return Option.of(parent.get(childName));
     }
 
-    static JsonObject readObjectFromResource(String resource) throws IOException {
+    public  static JsonObject readObjectFromResource(String resource) throws IOException {
         return readFromResource(resource).getAsJsonObject();
     }
 
-    static JsonElement readFromResource(String resource) throws IOException {
+    public static JsonElement readFromResource(String resource) throws IOException {
         try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) {
             return new JsonParser().parse(reader);
         }
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
  */
@@ -45,12 +49,12 @@ public final class DataRouterSinkParser implements StreamFromGsonParser<DataRout
         this.gson = gson;
     }
 
-    public DataRouterSink unsafeParse(JsonObject input) {
-        assertStreamType(input, DATA_ROUTER_TYPE);
-
-        final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+    @Override
+    public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) {
+        assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK);
 
-        return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class);
+        final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
+        return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name());
 
     }
 
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
  */
@@ -45,12 +49,12 @@ public final class DataRouterSourceParser implements StreamFromGsonParser<DataRo
         this.gson = gson;
     }
 
-    public DataRouterSource unsafeParse(JsonObject input) {
-        assertStreamType(input, DATA_ROUTER_TYPE);
-
-        final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+    @Override
+    public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) {
+        assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE);
 
-        return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class);
+        final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
+        return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name());
 
     }
 
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -31,15 +31,21 @@ import java.util.Objects;
  */
 
 abstract class GsonMessageRouter implements MessageRouter {
+    private final String name;
     private final MessageRouterDmaapInfo dmaapInfo;
     private final AafCredentials aafCredentials;
 
-    GsonMessageRouter(@NotNull MessageRouterDmaapInfo dmaapInfo,
-                      @Nullable AafCredentials aafCredentials) {
+    GsonMessageRouter(String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
+            @Nullable AafCredentials aafCredentials) {
+        this.name = name;
         this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo");
         this.aafCredentials = aafCredentials;
     }
 
+    public String name() {
+        return name;
+    }
+
     @Override
     public String topicUrl() {
         return dmaapInfo.topicUrl();
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
 
 public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink {
     GsonMessageRouterSink(
-            @NotNull MessageRouterDmaapInfo dmaapInfo,
+            String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
             @Nullable AafCredentials aafCredentials) {
-        super(dmaapInfo, aafCredentials);
+        super(name, dmaapInfo, aafCredentials);
     }
 }
\ No newline at end of file
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -30,8 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
 
 public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource {
     GsonMessageRouterSource(
-            @NotNull MessageRouterDmaapInfo dmaapInfo,
+            String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
             @Nullable AafCredentials aafCredentials) {
-        super(dmaapInfo, aafCredentials);
+        super(name, dmaapInfo, aafCredentials);
     }
 }
\ No newline at end of file
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-
 public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> {
 
     private final Gson gson;
@@ -43,15 +47,16 @@ public final class MessageRouterSinkParser implements StreamFromGsonParser<Messa
         this.gson = gson;
     }
 
-    public MessageRouterSink unsafeParse(JsonObject input) {
-        assertStreamType(input, MESSAGE_ROUTER_TYPE);
+    @Override
+    public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) {
+        assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK);
 
-        final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+        final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
 
-        final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+        final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
         final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
 
-        return new GsonMessageRouterSink(dmaapInfo, aafCredentials);
+        return new GsonMessageRouterSink(input.name(), dmaapInfo, aafCredentials);
 
     }
 }
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
-
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
  */
@@ -46,15 +50,16 @@ public final class MessageRouterSourceParser implements StreamFromGsonParser<Mes
         this.gson = gson;
     }
 
-    public MessageRouterSource unsafeParse(JsonObject input) {
-        assertStreamType(input, MESSAGE_ROUTER_TYPE);
+    @Override
+    public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) {
+        assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE);
 
-        final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+        final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
 
-        final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+        final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
         final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
 
-        return new GsonMessageRouterSource(dmaapInfo, aafCredentials);
+        return new GsonMessageRouterSource(input.name(), dmaapInfo, aafCredentials);
 
     }
 
@@ -18,7 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
 
 import java.util.Objects;
 import org.jetbrains.annotations.NotNull;
@@ -32,15 +32,23 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
  */
 abstract class GsonKafka implements Kafka {
 
-    protected final KafkaInfo kafkaInfo;
+    private final String name;
+    final KafkaInfo kafkaInfo;
     private final AafCredentials aafCredentials;
 
-    GsonKafka(@NotNull KafkaInfo kafkaInfo,
+    GsonKafka(
+            @NotNull String name,
+            @NotNull KafkaInfo kafkaInfo,
             @Nullable AafCredentials aafCredentials) {
+        this.name = Objects.requireNonNull(name, "name");
         this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo");
         this.aafCredentials = aafCredentials;
     }
 
+    public String name() {
+        return name;
+    }
+
     @Override
     public String bootstrapServers() {
         return kafkaInfo.bootstrapServers();
@@ -18,7 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -32,8 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
 class GsonKafkaSink extends GsonKafka implements KafkaSink {
 
     GsonKafkaSink(
+            @NotNull String name,
             @NotNull KafkaInfo kafkaInfo,
             @Nullable AafCredentials aafCredentials) {
-        super(kafkaInfo, aafCredentials);
+        super(name, kafkaInfo, aafCredentials);
     }
+
 }
@@ -18,7 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -32,9 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma
 class GsonKafkaSource extends GsonKafka implements KafkaSource {
 
     GsonKafkaSource(
+            @NotNull String name,
             @NotNull KafkaInfo kafkaInfo,
             @Nullable AafCredentials aafCredentials) {
-        super(kafkaInfo, aafCredentials);
+        super(name, kafkaInfo, aafCredentials);
     }
 
     @Override
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo;
 
 import com.google.gson.Gson;
-import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
 
 /**
@@ -46,12 +51,13 @@ public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
     }
 
     @Override
-    public KafkaSink unsafeParse(JsonObject input) {
-        assertStreamType(input, KAFKA_TYPE);
+    public KafkaSink unsafeParse(RawDataStream<JsonObject> input) {
+        assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK);
+        final JsonObject json = input.descriptor();
 
-        final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
-        final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+        final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
+        final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull();
 
-        return new GsonKafkaSink(kafkaInfo, null);
+        return new GsonKafkaSink(input.name(), kafkaInfo, aafCreds);
     }
 }
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo;
 
 import com.google.gson.Gson;
-import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
 
 /**
@@ -46,12 +51,13 @@ public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource
     }
 
     @Override
-    public KafkaSource unsafeParse(JsonObject input) {
-        assertStreamType(input, KAFKA_TYPE);
+    public KafkaSource unsafeParse(RawDataStream<JsonObject> input) {
+        assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE);
+        final JsonObject json = input.descriptor();
 
-        final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
-        final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+        final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
+        final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull();
 
-        return new GsonKafkaSource(kafkaInfo, null);
+        return new GsonKafkaSource(input.name(), kafkaInfo, aafCreds);
     }
 }
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
new file mode 100644 (file)
index 0000000..4cfa33a
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.optionalChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+final class KafkaUtils {
+
+    private KafkaUtils() {
+    }
+
+    static KafkaInfo extractKafkaInfo(Gson gson, JsonObject input) {
+        final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+        return gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+    }
+
+    static Option<AafCredentials> extractAafCredentials(Gson gson, JsonObject input) {
+        return optionalChild(input, "aaf_credentials")
+                .map(aafCredsJson -> gson.fromJson(aafCredsJson, ImmutableAafCredentials.class));
+    }
+}
index e8d6319..c3c70b7 100644 (file)
@@ -36,9 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 @Gson.TypeAdapters
 public interface AafCredentials {
 
-    @SerializedName("aaf_username")
+    @SerializedName(value = "username", alternate = "aaf_username")
     @Nullable String username();
 
-    @SerializedName("aaf_password")
+    @SerializedName(value = "password", alternate = "aaf_password")
     @Nullable String password();
 }
index 43d9d72..37bf7e5 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
 
+import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
@@ -28,5 +29,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
  */
 @ExperimentalApi
 public interface DataStream {
-
+    @Value.Default
+    default String name() {
+        return "";
+    }
 }
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
new file mode 100644 (file)
index 0000000..f3cac54
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public enum DataStreamDirection {
+
+    SINK("streams_publishes"),
+    SOURCE("streams_subscribes");
+
+    private final String configurationKey;
+
+    DataStreamDirection(String configurationKey) {
+        this.configurationKey = configurationKey;
+    }
+
+    public String configurationKey() {
+        return configurationKey;
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
new file mode 100644 (file)
index 0000000..7a39ede
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@Value.Immutable
+public interface RawDataStream<T> {
+    String name();
+    String type();
+    DataStreamDirection direction();
+    T descriptor();
+}
index 97f07a2..1810fc6 100644 (file)
@@ -20,6 +20,9 @@
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
+import static io.vavr.Predicates.not;
+
+import io.vavr.collection.List;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
@@ -46,4 +49,9 @@ public interface Kafka {
     default int maxPayloadSizeBytes() {
         return 1024 * 1024;
     }
+
+    @Value.Derived
+    default List<String> bootstrapServerList() {
+        return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty));
+    }
 }
index c9ceeaf..8a5edcc 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import io.vavr.collection.List;
+import java.math.BigInteger;
 import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Test;
 
-import java.math.BigInteger;
-
-
-import static org.assertj.core.api.Assertions.assertThat;
-
 class MerkleTreeParserTest {
 
     private final MerkleTreeParser cut = new MerkleTreeParser();
index e862d84..e2833fe 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString;
 
 import com.google.gson.JsonObject;
+import io.vavr.collection.Map;
 import io.vavr.collection.Stream;
 import java.time.Duration;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -128,6 +137,91 @@ class CbsClientImplIT {
                 .verify(Duration.ofSeconds(5));
     }
 
+    @Test
+    void testCbsClientWithStreamsParsing() {
+        // given
+        final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+        final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
+        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+        // when
+        final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+                .map(json ->
+                        DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
+                );
+
+        // then
+        StepVerifier.create(result)
+                .consumeNextWith(kafkaSink -> {
+                    assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
+                    assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
+                    assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
+                })
+                .expectComplete()
+                .verify(Duration.ofSeconds(5));
+    }
+
+    @Test
+    void testCbsClientWithStreamsParsingUsingSwitch() {
+        // given
+        final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+        // TODO: Use these parsers below
+        final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
+        final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
+
+        // when
+        final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+                .map(json -> {
+                    final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
+                            .groupBy(RawDataStream::type);
+
+                    final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+                            .map(kafkaSinkParser::unsafeParse);
+                    final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+                            .map(mrSinkParser::unsafeParse);
+
+                    assertThat(allKafkaSinks.size())
+                            .describedAs("Number of kafka sinks")
+                            .isEqualTo(2);
+                    assertThat(allMrSinks.size())
+                            .describedAs("Number of DMAAP-MR sinks")
+                            .isEqualTo(1);
+
+                    return true;
+                })
+                .then();
+
+        // then
+        StepVerifier.create(result)
+                .expectComplete()
+                .verify(Duration.ofSeconds(5));
+    }
+
+    @Test
+    void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
+        // given
+        final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+        final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
+        final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+        // when
+        final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+                .map(json ->
+                        DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
+                );
+
+        // then
+        StepVerifier.create(result)
+                .expectErrorSatisfies(ex -> {
+                    assertThat(ex).isInstanceOf(IllegalArgumentException.class);
+                    assertThat(ex).hasMessageContaining("Invalid stream type");
+                    assertThat(ex).hasMessageContaining("message_router");
+                    assertThat(ex).hasMessageContaining("kafka");
+                })
+                .verify(Duration.ofSeconds(5));
+    }
+
     private String sampleConfigValue(JsonObject obj) {
         return obj.get(SAMPLE_CONFIG_KEY).getAsString();
     }
index 9fd7cc8..617904f 100644 (file)
@@ -30,8 +30,8 @@ import static org.mockito.Mockito.verify;
 import com.google.gson.JsonObject;
 import java.net.InetSocketAddress;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import reactor.core.publisher.Mono;
 
 /**
index 6843e0e..94ff53f 100644 (file)
@@ -31,9 +31,9 @@ import com.google.gson.JsonParser;
 import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
index d0485f5..7835a5f 100644 (file)
@@ -21,9 +21,6 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 
 import io.vavr.CheckedFunction0;
-import io.vavr.Function0;
-import java.io.IOException;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java
deleted file mode 100644 (file)
index 8713128..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import com.google.gson.JsonObject;
-import java.io.IOException;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
- */
-class KafkaSourceParserTest {
-
-    private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
-
-    @Test
-    void precondition_assureInstanceOf() {
-        assertThat(cut).isInstanceOf(KafkaSourceParser.class);
-    }
-
-    @Test
-    void shouldParseMinimalKafkaSourceDefinition() throws IOException {
-        // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json");
-
-        // when
-        final KafkaSource result = cut.unsafeParse(input);
-
-        // then
-        assertThat(result.aafCredentials()).isNull();
-        assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
-        assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
-        assertThat(result.clientId()).isNull();
-        assertThat(result.clientRole()).isNull();
-    }
-}
\ No newline at end of file
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
@@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
 
@@ -37,6 +41,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
 
 
 class DataRouterSinkParserTest {
+
     private static final String SAMPLE_LOCATION = "mtc00";
     private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz";
     private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs";
@@ -44,49 +49,54 @@ class DataRouterSinkParserTest {
     private static final String SAMPLE_PASSWORD = "some-password";
     private static final String SAMPLE_PUBLISHER_ID = "123456";
 
-    private static final Gson gson = new Gson();
-
     private final StreamFromGsonParser<DataRouterSink> streamParser = StreamFromGsonParsers.dataRouterSinkParser();
 
-    private static final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder()
-            .location(SAMPLE_LOCATION)
-            .publishUrl(SAMPLE_PUBLISH_URL)
-            .logUrl(SAMPLE_LOG_URL)
-            .username(SAMPLE_USER)
-            .password(SAMPLE_PASSWORD)
-            .publisherId(SAMPLE_PUBLISHER_ID)
-            .build();
-
-    private static final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder()
-            .publishUrl(SAMPLE_PUBLISH_URL)
-            .build();
-
     @Test
     void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json");
+
         // when
         DataRouterSink result = streamParser.unsafeParse(input);
+
         // then
+        final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder()
+                .name(input.name())
+                .location(SAMPLE_LOCATION)
+                .publishUrl(SAMPLE_PUBLISH_URL)
+                .logUrl(SAMPLE_LOG_URL)
+                .username(SAMPLE_USER)
+                .password(SAMPLE_PASSWORD)
+                .publisherId(SAMPLE_PUBLISHER_ID)
+                .build();
         assertThat(result).isEqualTo(fullConfigurationStream);
     }
 
     @Test
     void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
         //given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_minimal.json");
+        RawDataStream<JsonObject> input = DataStreamUtils
+                .readSinkFromResource("/streams/data_router_sink_minimal.json");
+
         // when
         DataRouterSink result = streamParser.unsafeParse(input);
+
         // then
+        final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder()
+                .name(input.name())
+                .publishUrl(SAMPLE_PUBLISH_URL)
+                .build();
         assertThat(result).isEqualTo(minimalConfigurationStream);
     }
 
     @Test
     void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json");
+
         // when
         Either<StreamParserError, DataRouterSink> result = streamParser.parse(input);
+
         // then
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
         result.peekLeft(error -> {
@@ -100,9 +110,17 @@ class DataRouterSinkParserTest {
     @Test
     void emptyConfiguration_shouldParseToStreamParserError() {
         // given
-        JsonObject input = gson.fromJson("{}", JsonObject.class);
+        JsonObject json = new JsonObject();
+        final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+                .name("empty")
+                .type("data_router")
+                .descriptor(json)
+                .direction(DataStreamDirection.SINK)
+                .build();
+
         // when
         Either<StreamParserError, DataRouterSink> result = streamParser.parse(input);
+
         // then
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
     }
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -28,7 +28,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
 
 import java.io.IOException;
@@ -38,54 +42,60 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
 
 public class DataRouterSourceParserTest {
+
     private static final String SAMPLE_LOCATION = "mtc00";
     private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path";
     private static final String SAMPLE_USER = "some-user";
     private static final String SAMPLE_PASSWORD = "some-password";
     private static final String SAMPLE_SUBSCRIBER_ID = "789012";
 
-    private static final Gson gson = new Gson();
-
-    private static final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder()
-            .location(SAMPLE_LOCATION)
-            .deliveryUrl(SAMPLE_DELIVERY_URL)
-            .username(SAMPLE_USER)
-            .password(SAMPLE_PASSWORD)
-            .subscriberId(SAMPLE_SUBSCRIBER_ID)
-            .build();
-
-    private static final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder()
-            .build();
-
-
     private final StreamFromGsonParser<DataRouterSource> streamParser = StreamFromGsonParsers.dataRouterSourceParser();
 
     @Test
     void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_full.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_source_full.json");
+
         // when
         DataRouterSource result = streamParser.unsafeParse(input);
+
         // then
+
+        final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder()
+                .name(input.name())
+                .location(SAMPLE_LOCATION)
+                .deliveryUrl(SAMPLE_DELIVERY_URL)
+                .username(SAMPLE_USER)
+                .password(SAMPLE_PASSWORD)
+                .subscriberId(SAMPLE_SUBSCRIBER_ID)
+                .build();
         assertThat(result).isEqualTo(fullConfigurationStream);
     }
 
     @Test
     void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_minimal.json");
+        RawDataStream<JsonObject> input = DataStreamUtils
+                .readSourceFromResource("/streams/data_router_source_minimal.json");
+
         // when
         DataRouterSource result = streamParser.unsafeParse(input);
+
         // then
+        final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder()
+                .name(input.name())
+                .build();
         assertThat(result).isEqualTo(minimalConfigurationStream);
     }
 
     @Test
     void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json");
+
         // when
         Either<StreamParserError, DataRouterSource> result = streamParser.parse(input);
+
         // then
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
         result.peekLeft(error -> {
@@ -99,9 +109,17 @@ public class DataRouterSourceParserTest {
     @Test
     void emptyConfiguration_shouldBeParsedToStreamParserError() {
         // given
-        JsonObject input = gson.fromJson("{}", JsonObject.class);
+        JsonObject json = new JsonObject();
+        final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+                .name("empty")
+                .type("data_router")
+                .descriptor(json)
+                .direction(DataStreamDirection.SOURCE)
+                .build();
+
         // when
         Either<StreamParserError, DataRouterSource> result = streamParser.parse(input);
+
         // then
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
     }
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
@@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
 
@@ -48,16 +52,16 @@ public class MessageRouterSinkParserTest {
     private static final String SAMPLE_CLIENT_ID = "1500462518108";
     private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic";
 
-    private static final Gson gson = new Gson();
-
     private final StreamFromGsonParser<MessageRouterSink> streamParser = StreamFromGsonParsers.messageRouterSinkParser();
 
     @Test
     void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json");
+
         // when
         MessageRouterSink result = streamParser.unsafeParse(input);
+
         // then
         assertThat(result).isInstanceOf(MessageRouterSink.class);
         assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME);
@@ -71,10 +75,11 @@ public class MessageRouterSinkParserTest {
     @Test
     void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_minimal.json");
 
         // when
         MessageRouterSink result = streamParser.unsafeParse(input);
+
         // then
         assertThat(result).isInstanceOf(MessageRouterSink.class);
         assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
@@ -86,9 +91,11 @@ public class MessageRouterSinkParserTest {
     @Test
     void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json");
+
         // when
         Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input);
+
         // then
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
         result.peekLeft(error -> {
@@ -102,9 +109,17 @@ public class MessageRouterSinkParserTest {
     @Test
     void emptyConfiguration_shouldParseToStreamParserError() {
         // given
-        JsonObject input = gson.fromJson("{}", JsonObject.class);
+        JsonObject json = new JsonObject();
+        final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+                .name("empty")
+                .type("data_router")
+                .descriptor(json)
+                .direction(DataStreamDirection.SINK)
+                .build();
+
         // when
         Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input);
+
         // then
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
     }
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
@@ -26,6 +26,10 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
 
@@ -48,16 +52,16 @@ public class MessageRouterSourceParserTest {
     private static final String SAMPLE_CLIENT_ID = "1500462518108";
     private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic";
 
-    private static final Gson gson = new Gson();
-
     private final StreamFromGsonParser<MessageRouterSource> streamParser = StreamFromGsonParsers.messageRouterSourceParser();
 
     @Test
     void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json");
+
         // when
         MessageRouterSource result = streamParser.unsafeParse(input);
+
         // then
         assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME);
         assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD);
@@ -70,10 +74,11 @@ public class MessageRouterSourceParserTest {
     @Test
     void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_minimal.json");
 
         // when
         MessageRouterSource result = streamParser.unsafeParse(input);
+
         // then
         assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
         assertThat(result.aafCredentials().username()).isNull();
@@ -84,9 +89,11 @@ public class MessageRouterSourceParserTest {
     @Test
     void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_sink_full.json");
+
         // when
         Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input);
+
         // then
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
         result.peekLeft(error -> {
@@ -100,7 +107,13 @@ public class MessageRouterSourceParserTest {
     @Test
     void emptyConfiguration_shouldParseToStreamParserError() {
         // given
-        JsonObject input = gson.fromJson("{}", JsonObject.class);
+        JsonObject json = new JsonObject();
+        final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+                .name("empty")
+                .type("data_router")
+                .descriptor(json)
+                .direction(DataStreamDirection.SOURCE)
+                .build();
         // when
         Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input);
         // then
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.in;
-import static org.junit.jupiter.api.Assertions.*;
 
 import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.vavr.Function1;
 import io.vavr.control.Either;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
 
 /**
@@ -52,7 +51,7 @@ class KafkaSinkParserTest {
     @Test
     void shouldParseMinimalKafkaSinkDefinition() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_minimal.json");
 
         // when
         final KafkaSink result = cut.unsafeParse(input);
@@ -66,15 +65,19 @@ class KafkaSinkParserTest {
     }
 
     @Test
-    void shouldParseBasicKafkaSinkDefinition() throws IOException {
+    void shouldParseFullKafkaSinkDefinition() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink.json");
 
         // when
         final KafkaSink result = cut.unsafeParse(input);
 
         // then
-        assertThat(result.aafCredentials()).isNull();
+        final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder()
+                .username("the user")
+                .password("the passwd")
+                .build();
+        assertThat(result.aafCredentials()).isEqualTo(expectedCredentials);
         assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
         assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
         assertThat(result.clientId()).isEqualTo("1500462518108");
@@ -84,7 +87,7 @@ class KafkaSinkParserTest {
     @Test
     void shouldReturnErrorWhenStructureIsWrong() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_missing_child.json");
 
         // when
         final Either<StreamParserError, KafkaSink> result = cut.parse(input);
@@ -99,7 +102,7 @@ class KafkaSinkParserTest {
     @Test
     void shouldReturnErrorWhenTypeIsWrong() throws IOException {
         // given
-        JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json");
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_invalid_type.json");
 
         // when
         final Either<StreamParserError, KafkaSink> result = cut.parse(input);
@@ -112,4 +115,19 @@ class KafkaSinkParserTest {
             assertThat(error.message()).containsIgnoringCase("message_router");
         });
     }
+
+    @Test
+    void shouldReturnErrorWhenDirectionIsWrong() throws IOException {
+        // given
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_sink.json");
+
+        // when
+        final Either<StreamParserError, KafkaSink> result = cut.parse(input);
+
+        // then
+        assertThat(result.isRight()).describedAs("should not be right").isFalse();
+        result.peekLeft(error -> {
+            assertThat(error.message()).containsIgnoringCase("invalid stream direction");
+        });
+    }
 }
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
new file mode 100644 (file)
index 0000000..d255d99
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import io.vavr.control.Either;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class KafkaSourceParserTest {
+
+    private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
+
+    @Test
+    void precondition_assureInstanceOf() {
+        assertThat(cut).isInstanceOf(KafkaSourceParser.class);
+    }
+
+    @Test
+    void shouldParseMinimalKafkaSourceDefinition() throws IOException {
+        // given
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source_minimal.json");
+
+        // when
+        final KafkaSource result = cut.unsafeParse(input);
+
+        // then
+        assertThat(result.aafCredentials()).isNull();
+        assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+        assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+        assertThat(result.clientId()).isNull();
+        assertThat(result.clientRole()).isNull();
+    }
+
+    @Test
+    void shouldParseFullKafkaSourceDefinition() throws IOException {
+        // given
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source.json");
+
+        // when
+        final KafkaSource result = cut.unsafeParse(input);
+
+        // then
+        final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder()
+                .username("the user")
+                .password("the passwd")
+                .build();
+        assertThat(result.aafCredentials()).isEqualTo(expectedCredentials);
+        assertThat(result.bootstrapServerList()).isEqualTo(List.of("dmaap-mr-kafka-0:6060", "dmaap-mr-kafka-1:6060"));
+        assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+        assertThat(result.consumerGroupId()).isEqualTo("nokia-perf3gpp-processor");
+        assertThat(result.clientId()).isEqualTo("1500462518108");
+        assertThat(result.clientRole()).isEqualTo("com.dcae.member");
+    }
+
+    @Test
+    void shouldReturnErrorWhenTypeIsWrong() throws IOException {
+        // given
+        RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_invalid_type.json");
+
+        // when
+        final Either<StreamParserError, KafkaSource> result = cut.parse(input);
+
+        // then
+        assertThat(result.isRight()).describedAs("should not be right").isFalse();
+        result.peekLeft(error -> {
+            assertThat(error.message()).containsIgnoringCase("invalid stream type");
+            assertThat(error.message()).containsIgnoringCase("kafka");
+            assertThat(error.message()).containsIgnoringCase("message_router");
+        });
+    }
+
+    @Test
+    void shouldReturnErrorWhenDirectionIsWrong() throws IOException {
+        // given
+        RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_source.json");
+
+        // when
+        final Either<StreamParserError, KafkaSource> result = cut.parse(input);
+
+        // then
+        assertThat(result.isRight()).describedAs("should not be right").isFalse();
+        result.peekLeft(error -> {
+            assertThat(error.message()).containsIgnoringCase("invalid stream direction");
+        });
+    }
+}
\ No newline at end of file
index a95b723..266326f 100644 (file)
@@ -1,3 +1,33 @@
 {
-    "keystore.path": "/var/run/security/keystore.p12"
+    "keystore.path": "/var/run/security/keystore.p12",
+    "streams_publishes": {
+        "perf3gpp": {
+            "type": "kafka",
+            "kafka_info": {
+                "bootstrap_servers": "dmaap-mr-kafka:6060",
+                "topic_name": "HVVES_PERF3GPP"
+            }
+        },
+        "pnf_ready": {
+            "type": "message_router",
+            "dmaap_info": {
+                "topic_url": "http://message-router:3904/events/VES_PNF_READY"
+            }
+        },
+        "call_trace": {
+            "type": "kafka",
+            "kafka_info": {
+                "bootstrap_servers": "dmaap-mr-kafka:6060",
+                "topic_name": "HVVES_TRACE"
+            }
+        }
+    },
+    "streams_subscribes": {
+        "measurements": {
+            "type": "message_router",
+            "dmaap_info": {
+                "topic_url": "http://message-router:3904/events/VES_MEASUREMENT"
+            }
+        }
+    }
 }
index b60388d..e7b4550 100644 (file)
@@ -1,5 +1,9 @@
 {
     "type": "kafka",
+    "aaf_credentials": {
+        "username": "the user",
+        "password": "the passwd"
+    },
     "kafka_info": {
         "client_role": "com.dcae.member",
         "client_id": "1500462518108",
diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json
new file mode 100644 (file)
index 0000000..379dbef
--- /dev/null
@@ -0,0 +1,14 @@
+{
+    "type": "kafka",
+    "aaf_credentials": {
+        "username": "the user",
+        "password": "the passwd"
+    },
+    "kafka_info": {
+        "client_role": "com.dcae.member",
+        "client_id": "1500462518108",
+        "bootstrap_servers": "dmaap-mr-kafka-0:6060,,dmaap-mr-kafka-1:6060,",
+        "topic_name": "HVVES_PERF3GPP",
+        "consumer_group_id": "nokia-perf3gpp-processor"
+    }
+}