Implement StreamParser 33/82233/7
authorkjaniak <kornel.janiak@nokia.com>
Thu, 14 Mar 2019 09:40:40 +0000 (10:40 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 19 Mar 2019 12:37:13 +0000 (12:37 +0000)
Change-Id: I3ee50ad70bb107fa39f227af7dab67948b7dbe4b
Issue-ID: DCAEGEN2-1342
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
33 files changed:
pom.xml
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/data_router_sink_full.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/data_router_sink_minimal.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/data_router_source_full.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/data_router_source_minimal.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/message_router_full.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/message_router_minimal.json [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 51ec777..4152b97 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
     <junit-jupiter.version>5.3.1</junit-jupiter.version>
     <junit-vintage.version>5.3.1</junit-vintage.version>
     <junit-platform.version>1.3.1</junit-platform.version>
-    <immutables.version>2.7.3</immutables.version>
+    <immutables.version>2.7.4</immutables.version>
     <assertj-core.version>3.11.1</assertj-core.version>
     <reactor.bom.version>Californium-SR4</reactor.bom.version>
     <slf4j.version>1.7.25</slf4j.version>
index 4b0223f..9d703bb 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
 
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSinkParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSourceParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.*;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -41,4 +39,20 @@ public final class StreamFromGsonParsers {
     public static StreamFromGsonParser<KafkaSource> kafkaSourceParser() {
         return KafkaSourceParser.create();
     }
+
+    public static StreamFromGsonParser<MessageRouterSink> messageRouterSinkParser() {
+        return MessageRouterSinkParser.create();
+    }
+
+    public static StreamFromGsonParser<MessageRouterSource> messageRouterSourceParser() {
+        return MessageRouterSourceParser.create();
+    }
+
+    public static StreamFromGsonParser<DataRouterSink> dataRouterSinkParser() {
+        return DataRouterSinkParser.create();
+    }
+
+    public static StreamFromGsonParser<DataRouterSource> dataRouterSourceParser() {
+        return DataRouterSourceParser.create();
+    }
 }
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParser.java
new file mode 100644 (file)
index 0000000..468b618
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+public final class DataRouterSinkParser implements StreamFromGsonParser<DataRouterSink> {
+    private final Gson gson;
+
+    public static DataRouterSinkParser create() {
+        return new DataRouterSinkParser(gsonInstance());
+    }
+
+    private DataRouterSinkParser(Gson gson) {
+        this.gson = gson;
+    }
+
+    public DataRouterSink unsafeParse(JsonObject input) {
+        assertStreamType(input, DATA_ROUTER_TYPE);
+
+        final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+
+        return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class);
+
+    }
+
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParser.java
new file mode 100644 (file)
index 0000000..d78c3dd
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+public final class DataRouterSourceParser implements StreamFromGsonParser<DataRouterSource> {
+    private final Gson gson;
+
+    public static DataRouterSourceParser create() {
+        return new DataRouterSourceParser(gsonInstance());
+    }
+
+    private DataRouterSourceParser(Gson gson) {
+        this.gson = gson;
+    }
+
+    public DataRouterSource unsafeParse(JsonObject input) {
+        assertStreamType(input, DATA_ROUTER_TYPE);
+
+        final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+
+        return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class);
+
+    }
+
+}
\ No newline at end of file
index ecafd30..2ad37a8 100644 (file)
@@ -25,7 +25,6 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouter.java
new file mode 100644 (file)
index 0000000..10c00e7
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouter;
+
+import java.util.Objects;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+abstract class GsonMessageRouter implements MessageRouter {
+    private final MessageRouterDmaapInfo dmaapInfo;
+    private final AafCredentials aafCredentials;
+
+    GsonMessageRouter(@NotNull MessageRouterDmaapInfo dmaapInfo,
+                      @Nullable AafCredentials aafCredentials) {
+        this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo");
+        this.aafCredentials = aafCredentials;
+    }
+
+    @Override
+    public String topicUrl() {
+        return dmaapInfo.topicUrl();
+    }
+
+    @Override
+    public @Nullable String clientRole() {
+        return dmaapInfo.clientRole();
+    }
+
+    @Override
+    public @Nullable String clientId() {
+        return dmaapInfo.clientId();
+    }
+
+    @Override
+    public @Nullable String location() {
+        return dmaapInfo.location();
+    }
+
+    @Override
+    public @Nullable AafCredentials aafCredentials() {
+        return aafCredentials;
+    }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSink.java
new file mode 100644 (file)
index 0000000..da21842
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink {
+    GsonMessageRouterSink(
+            @NotNull MessageRouterDmaapInfo dmaapInfo,
+            @Nullable AafCredentials aafCredentials) {
+        super(dmaapInfo, aafCredentials);
+    }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonMessageRouterSource.java
new file mode 100644 (file)
index 0000000..b69c53d
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource {
+    GsonMessageRouterSource(
+            @NotNull MessageRouterDmaapInfo dmaapInfo,
+            @Nullable AafCredentials aafCredentials) {
+        super(dmaapInfo, aafCredentials);
+    }
+}
\ No newline at end of file
index a813607..a088016 100644 (file)
@@ -26,13 +26,17 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import io.vavr.Lazy;
+
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+
+import io.vavr.control.Option;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -43,6 +47,9 @@ final class GsonUtils {
         GsonBuilder gsonBuilder = new GsonBuilder();
         gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo());
         gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersAafCredentials());
+        gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersMessageRouterDmaapInfo());
+        gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersDataRouterSink());
+        gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersDataRouterSource());
         return gsonBuilder.create();
     });
 
@@ -64,6 +71,10 @@ final class GsonUtils {
         return requiredChild(parent, childName).getAsString();
     }
 
+    static Option<String> optionalString(JsonObject parent, String childName) {
+            return Option.of(parent.get(childName).getAsString());
+    }
+
     static JsonElement requiredChild(JsonObject parent, String childName) {
         if (parent.has(childName)) {
             return parent.get(childName);
index 393fe40..f9a546c 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
-
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
 
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-public class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
-
+public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
     private final Gson gson;
 
     public static KafkaSinkParser create() {
         return new KafkaSinkParser(gsonInstance());
     }
 
-    KafkaSinkParser(Gson gson) {
+    private KafkaSinkParser(Gson gson) {
         this.gson = gson;
     }
 
     @Override
     public KafkaSink unsafeParse(JsonObject input) {
-        assertStreamType(input, "kafka");
+        assertStreamType(input, KAFKA_TYPE);
 
-        final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+        final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
         final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
 
         return new GsonKafkaSink(kafkaInfo, null);
index 8b48a2a..08c02b4 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
-
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
 
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-public class KafkaSourceParser implements StreamFromGsonParser<KafkaSource> {
+public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource> {
     private final Gson gson;
 
     public static KafkaSourceParser create() {
         return new KafkaSourceParser(gsonInstance());
     }
 
-    KafkaSourceParser(Gson gson) {
+    private KafkaSourceParser(Gson gson) {
         this.gson = gson;
     }
 
     @Override
     public KafkaSource unsafeParse(JsonObject input) {
-        assertStreamType(input, "kafka");
+        assertStreamType(input, KAFKA_TYPE);
 
-        final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+        final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
         final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
 
         return new GsonKafkaSource(kafkaInfo, null);
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterDmaapInfo.java
new file mode 100644 (file)
index 0000000..ced5ad5
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.annotations.SerializedName;
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+@Gson.TypeAdapters
+@Value.Immutable
+public interface MessageRouterDmaapInfo {
+
+    @SerializedName("topic_url")
+    String topicUrl();
+
+    @SerializedName("client_role")
+    @Nullable String clientRole();
+
+    @SerializedName("client_id")
+    @Nullable String clientId();
+
+    @SerializedName("location")
+    @Nullable String location();
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParser.java
new file mode 100644 (file)
index 0000000..56f5393
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
+
+public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> {
+
+    private final Gson gson;
+
+    public static MessageRouterSinkParser create() {
+        return new MessageRouterSinkParser(gsonInstance());
+    }
+
+    private MessageRouterSinkParser(Gson gson) {
+        this.gson = gson;
+    }
+
+    public MessageRouterSink unsafeParse(JsonObject input) {
+        assertStreamType(input, MESSAGE_ROUTER_TYPE);
+
+        final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+
+        final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+        final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
+
+        return new GsonMessageRouterSink(dmaapInfo, aafCredentials);
+
+    }
+}
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParser.java
new file mode 100644 (file)
index 0000000..25cf9e0
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+public final class MessageRouterSourceParser implements StreamFromGsonParser<MessageRouterSource> {
+    private final Gson gson;
+
+    public static MessageRouterSourceParser create() {
+        return new MessageRouterSourceParser(gsonInstance());
+    }
+
+    private MessageRouterSourceParser(Gson gson) {
+        this.gson = gson;
+    }
+
+    public MessageRouterSource unsafeParse(JsonObject input) {
+        assertStreamType(input, MESSAGE_ROUTER_TYPE);
+
+        final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+
+        final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+        final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
+
+        return new GsonMessageRouterSource(dmaapInfo, aafCredentials);
+
+    }
+
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java
new file mode 100644 (file)
index 0000000..a9dd0c4
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+public final class StreamsConstants {
+
+    public static final String DATA_ROUTER_TYPE = "data_router";
+
+    public static final String MESSAGE_ROUTER_TYPE = "message_router";
+
+    public static final String KAFKA_TYPE = "kafka";
+
+    public static final String DMAAP_INFO_CHILD_NAME = "dmaap_info";
+
+    public static final String KAFKA_INFO_CHILD_NAME = "kafka_info";
+
+}
\ No newline at end of file
index ecb0b55..e8d6319 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
 
 
+import com.google.gson.annotations.SerializedName;
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
@@ -35,7 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 @Gson.TypeAdapters
 public interface AafCredentials {
 
+    @SerializedName("aaf_username")
     @Nullable String username();
 
+    @SerializedName("aaf_password")
     @Nullable String password();
 }
index ba26b10..b4b5549 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
+import com.google.gson.annotations.SerializedName;
 import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
@@ -31,9 +32,12 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 @ExperimentalApi
 public interface DataRouter {
 
-    String location();
+    @SerializedName("location")
+    @Nullable String location();
 
+    @SerializedName("username")
     @Nullable String username();
 
+    @SerializedName("password")
     @Nullable String password();
 }
index a8aa5cb..60d9100 100644 (file)
@@ -21,6 +21,8 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
+import com.google.gson.annotations.SerializedName;
+import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
@@ -30,14 +32,18 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sin
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @version 1.2.1
  */
+@Gson.TypeAdapters
 @ExperimentalApi
 @Value.Immutable
-public abstract class DataRouterSink implements DataRouter, SinkStream {
+public interface DataRouterSink extends DataRouter, SinkStream {
 
-    abstract String publisherUrl();
+    @SerializedName("publish_url")
+    String publishUrl();
 
-    abstract String publisherId();
+    @SerializedName("publisher_id")
+    @Nullable String publisherId();
 
-    abstract @Nullable String logUrl();
+    @SerializedName("log_url")
+    @Nullable String logUrl();
 
 }
index ac8362d..b6dedb7 100644 (file)
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
+import com.google.gson.annotations.SerializedName;
+import org.immutables.gson.Gson;
 import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
 
@@ -29,11 +32,15 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sou
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @version 1.2.1
  */
+@Gson.TypeAdapters
 @ExperimentalApi
 @Value.Immutable
-public abstract class DataRouterSource implements DataRouter, SourceStream {
+public interface DataRouterSource extends DataRouter, SourceStream {
 
-    abstract String deliveryUrl();
+    // in future, since crucial need to be verified if could be nullable
+    @SerializedName("delivery_url")
+    @Nullable String deliveryUrl();
 
-    abstract String subscriberId();
+    @SerializedName("subscriber_id")
+    @Nullable String subscriberId();
 }
index 76c6f6d..7ed720d 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
+import com.google.gson.annotations.SerializedName;
 import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
@@ -32,13 +33,18 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Aaf
 @ExperimentalApi
 public interface MessageRouter {
 
+    @SerializedName("topic_url")
     String topicUrl();
 
+    @SerializedName("client_role")
     @Nullable String clientRole();
 
+    @SerializedName("client_id")
     @Nullable String clientId();
 
+    @SerializedName("location")
     @Nullable String location();
 
+    @SerializedName("aaf_credentials")
     @Nullable AafCredentials aafCredentials();
 }
index 7584bac..74f8810 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
+import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
@@ -31,6 +32,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sin
  */
 @ExperimentalApi
 @Value.Immutable
-public abstract class MessageRouterSink implements MessageRouter, SinkStream {
+public interface MessageRouterSink extends MessageRouter, SinkStream {
 
 }
index e9599a5..66520f5 100644 (file)
@@ -21,6 +21,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
+import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
@@ -31,6 +32,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sou
  */
 @ExperimentalApi
 @Value.Immutable
-public abstract class MessageRouterSource implements MessageRouter, SourceStream {
+public interface MessageRouterSource extends MessageRouter, SourceStream {
 
 }
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSinkParserTest.java
new file mode 100644 (file)
index 0000000..398ebcd
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import io.vavr.control.Either;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+
+
+class DataRouterSinkParserTest {
+    private static final String SAMPLE_LOCATION = "mtc00";
+    private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz";
+    private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs";
+    private static final String SAMPLE_USER = "some-user";
+    private static final String SAMPLE_PASSWORD = "some-password";
+    private static final String SAMPLE_PUBLISHER_ID = "123456";
+
+    private static final Gson gson = new Gson();
+
+    private final StreamFromGsonParser<DataRouterSink> streamParser = StreamFromGsonParsers.dataRouterSinkParser();
+
+    private static final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder()
+            .location(SAMPLE_LOCATION)
+            .publishUrl(SAMPLE_PUBLISH_URL)
+            .logUrl(SAMPLE_LOG_URL)
+            .username(SAMPLE_USER)
+            .password(SAMPLE_PASSWORD)
+            .publisherId(SAMPLE_PUBLISHER_ID)
+            .build();
+
+    private static final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder()
+            .publishUrl(SAMPLE_PUBLISH_URL)
+            .build();
+
+    @Test
+    void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+        // when
+        DataRouterSink result = streamParser.unsafeParse(input);
+        // then
+        assertThat(result).isEqualTo(fullConfigurationStream);
+    }
+
+    @Test
+    void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
+        //given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_minimal.json");
+        // when
+        DataRouterSink result = streamParser.unsafeParse(input);
+        // then
+        assertThat(result).isEqualTo(minimalConfigurationStream);
+    }
+
+    @Test
+    void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+        // when
+        Either<StreamParserError, DataRouterSink> result = streamParser.parse(input);
+        // then
+        assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
+        result.peekLeft(error -> {
+                    assertThat(error.message()).contains("Invalid stream type");
+                    assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
+                            + MESSAGE_ROUTER_TYPE + "'");
+                }
+        );
+    }
+
+    @Test
+    void emptyConfiguration_shouldParseToStreamParserError() {
+        // given
+        JsonObject input = gson.fromJson("{}", JsonObject.class);
+        // when
+        Either<StreamParserError, DataRouterSink> result = streamParser.parse(input);
+        // then
+        assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
+    }
+
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataRouterSourceParserTest.java
new file mode 100644 (file)
index 0000000..681fa14
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import io.vavr.control.Either;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+
+public class DataRouterSourceParserTest {
+    private static final String SAMPLE_LOCATION = "mtc00";
+    private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path";
+    private static final String SAMPLE_USER = "some-user";
+    private static final String SAMPLE_PASSWORD = "some-password";
+    private static final String SAMPLE_SUBSCRIBER_ID = "789012";
+
+    private static final Gson gson = new Gson();
+
+    private static final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder()
+            .location(SAMPLE_LOCATION)
+            .deliveryUrl(SAMPLE_DELIVERY_URL)
+            .username(SAMPLE_USER)
+            .password(SAMPLE_PASSWORD)
+            .subscriberId(SAMPLE_SUBSCRIBER_ID)
+            .build();
+
+    private static final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder()
+            .build();
+
+
+    private final StreamFromGsonParser<DataRouterSource> streamParser = StreamFromGsonParsers.dataRouterSourceParser();
+
+    @Test
+    void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_full.json");
+        // when
+        DataRouterSource result = streamParser.unsafeParse(input);
+        // then
+        assertThat(result).isEqualTo(fullConfigurationStream);
+    }
+
+    @Test
+    void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_minimal.json");
+        // when
+        DataRouterSource result = streamParser.unsafeParse(input);
+        // then
+        assertThat(result).isEqualTo(minimalConfigurationStream);
+    }
+
+    @Test
+    void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+        // when
+        Either<StreamParserError, DataRouterSource> result = streamParser.parse(input);
+        // then
+        assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
+        result.peekLeft(error -> {
+                    assertThat(error.message()).contains("Invalid stream type");
+                    assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
+                            + MESSAGE_ROUTER_TYPE + "'");
+                }
+        );
+    }
+
+    @Test
+    void emptyConfiguration_shouldBeParsedToStreamParserError() {
+        // given
+        JsonObject input = gson.fromJson("{}", JsonObject.class);
+        // when
+        Either<StreamParserError, DataRouterSource> result = streamParser.parse(input);
+        // then
+        assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
+    }
+}
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSinkParserTest.java
new file mode 100644 (file)
index 0000000..63b04d1
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import io.vavr.control.Either;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+public class MessageRouterSinkParserTest {
+
+    private static final String SAMPLE_AAF_USERNAME = "some-user";
+    private static final String SAMPLE_AAF_PASSWORD = "some-password";
+    private static final String SAMPLE_LOCATION = "mtc00";
+    private static final String SAMPLE_CLIENT_ROLE = "com.dcae.member";
+    private static final String SAMPLE_CLIENT_ID = "1500462518108";
+    private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic";
+
+    private static final Gson gson = new Gson();
+
+    private final StreamFromGsonParser<MessageRouterSink> streamParser = StreamFromGsonParsers.messageRouterSinkParser();
+
+    @Test
+    void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+        // when
+        MessageRouterSink result = streamParser.unsafeParse(input);
+        // then
+        assertThat(result).isInstanceOf(MessageRouterSink.class);
+        assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME);
+        assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD);
+        assertThat(result.location()).isEqualTo(SAMPLE_LOCATION);
+        assertThat(result.clientRole()).isEqualTo(SAMPLE_CLIENT_ROLE);
+        assertThat(result.clientId()).isEqualTo(SAMPLE_CLIENT_ID);
+        assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
+    }
+
+    @Test
+    void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json");
+
+        // when
+        MessageRouterSink result = streamParser.unsafeParse(input);
+        // then
+        assertThat(result).isInstanceOf(MessageRouterSink.class);
+        assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
+        assertThat(result.aafCredentials().username()).isNull();
+        assertThat(result.aafCredentials().password()).isNull();
+        assertThat(result.clientId()).isNull();
+    }
+
+    @Test
+    void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+        // when
+        Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input);
+        // then
+        assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
+        result.peekLeft(error -> {
+                    assertThat(error.message()).contains("Invalid stream type");
+                    assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
+                            + DATA_ROUTER_TYPE + "'");
+                }
+        );
+    }
+
+    @Test
+    void emptyConfiguration_shouldParseToStreamParserError() {
+        // given
+        JsonObject input = gson.fromJson("{}", JsonObject.class);
+        // when
+        Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input);
+        // then
+        assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
+    }
+
+
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/MessageRouterSourceParserTest.java
new file mode 100644 (file)
index 0000000..fea63d6
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import io.vavr.control.Either;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+
+/**
+ * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
+ */
+
+public class MessageRouterSourceParserTest {
+
+    private static final String SAMPLE_AAF_USERNAME = "some-user";
+    private static final String SAMPLE_AAF_PASSWORD = "some-password";
+    private static final String SAMPLE_LOCATION = "mtc00";
+    private static final String SAMPLE_CLIENT_ROLE = "com.dcae.member";
+    private static final String SAMPLE_CLIENT_ID = "1500462518108";
+    private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic";
+
+    private static final Gson gson = new Gson();
+
+    private final StreamFromGsonParser<MessageRouterSource> streamParser = StreamFromGsonParsers.messageRouterSourceParser();
+
+    @Test
+    void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+        // when
+        MessageRouterSource result = streamParser.unsafeParse(input);
+        // then
+        assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME);
+        assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD);
+        assertThat(result.location()).isEqualTo(SAMPLE_LOCATION);
+        assertThat(result.clientRole()).isEqualTo(SAMPLE_CLIENT_ROLE);
+        assertThat(result.clientId()).isEqualTo(SAMPLE_CLIENT_ID);
+        assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
+    }
+
+    @Test
+    void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json");
+
+        // when
+        MessageRouterSource result = streamParser.unsafeParse(input);
+        // then
+        assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
+        assertThat(result.aafCredentials().username()).isNull();
+        assertThat(result.aafCredentials().password()).isNull();
+        assertThat(result.clientId()).isNull();
+    }
+
+    @Test
+    void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
+        // given
+        JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+        // when
+        Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input);
+        // then
+        assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
+        result.peekLeft(error -> {
+                    assertThat(error.message()).contains("Invalid stream type");
+                    assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
+                            + DATA_ROUTER_TYPE + "'");
+                }
+        );
+    }
+
+    @Test
+    void emptyConfiguration_shouldParseToStreamParserError() {
+        // given
+        JsonObject input = gson.fromJson("{}", JsonObject.class);
+        // when
+        Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input);
+        // then
+        assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
+    }
+
+
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/streams/data_router_sink_full.json b/rest-services/cbs-client/src/test/resources/streams/data_router_sink_full.json
new file mode 100644 (file)
index 0000000..fc43a82
--- /dev/null
@@ -0,0 +1,11 @@
+{
+  "type": "data_router",
+  "dmaap_info": {
+    "location": "mtc00",
+    "publish_url": "https://we-are-data-router.us/feed/xyz",
+    "log_url": "https://we-are-data-router.us/feed/xyz/logs",
+    "username": "some-user",
+    "password": "some-password",
+    "publisher_id": "123456"
+  }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/streams/data_router_sink_minimal.json b/rest-services/cbs-client/src/test/resources/streams/data_router_sink_minimal.json
new file mode 100644 (file)
index 0000000..8f76eca
--- /dev/null
@@ -0,0 +1,6 @@
+{
+  "type": "data_router",
+  "dmaap_info": {
+    "publish_url": "https://we-are-data-router.us/feed/xyz"
+  }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/streams/data_router_source_full.json b/rest-services/cbs-client/src/test/resources/streams/data_router_source_full.json
new file mode 100644 (file)
index 0000000..56d269c
--- /dev/null
@@ -0,0 +1,10 @@
+{
+  "type": "data_router",
+  "dmaap_info": {
+    "location": "mtc00",
+    "delivery_url": "https://my-subscriber-app.dcae:8080/target-path",
+    "username": "some-user",
+    "password": "some-password",
+    "subscriber_id": "789012"
+  }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/streams/data_router_source_minimal.json b/rest-services/cbs-client/src/test/resources/streams/data_router_source_minimal.json
new file mode 100644 (file)
index 0000000..8e522ba
--- /dev/null
@@ -0,0 +1,5 @@
+{
+  "type": "data_router",
+  "dmaap_info": {
+  }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/streams/message_router_full.json b/rest-services/cbs-client/src/test/resources/streams/message_router_full.json
new file mode 100644 (file)
index 0000000..be1725e
--- /dev/null
@@ -0,0 +1,11 @@
+{
+  "type": "message_router",
+  "aaf_username": "some-user",
+  "aaf_password": "some-password",
+  "dmaap_info": {
+    "client_role": "com.dcae.member",
+    "client_id": "1500462518108",
+    "location": "mtc00",
+    "topic_url": "https://we-are-message-router.us:3905/events/some-topic"
+  }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/resources/streams/message_router_minimal.json b/rest-services/cbs-client/src/test/resources/streams/message_router_minimal.json
new file mode 100644 (file)
index 0000000..bd504b8
--- /dev/null
@@ -0,0 +1,6 @@
+{
+    "type": "message_router",
+    "dmaap_info": {
+        "topic_url": "https://we-are-message-router.us:3905/events/some-topic"
+    }
+}
\ No newline at end of file