Add streams parsing integration tests 12/83212/6
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Mon, 25 Mar 2019 14:32:42 +0000 (15:32 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 26 Mar 2019 11:55:04 +0000 (12:55 +0100)
Change-Id: I22410b3fb110e47bde123556951bb12af5f34a1c
Issue-ID: DCAEGEN2-1315
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
55 files changed:
rest-services/cbs-client/pom.xml
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java [new file with mode: 0644]
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java [new file with mode: 0644]
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java
rest-services/cbs-client/src/test/resources/streams/integration_message_router.json [new file with mode: 0644]
rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json [new file with mode: 0644]
rest-services/model/pom.xml [new file with mode: 0644]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/AafCredentials.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java with 89% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStream.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java with 86% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStreamDirection.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java with 95% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/RawDataStream.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java with 92% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SinkStream.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java with 87% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SourceStream.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java with 87% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java [new file with mode: 0644]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouter.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java with 90% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSink.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java with 85% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSource.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java with 85% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/Kafka.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java with 89% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSink.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java with 80% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSource.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java with 83% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouter.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java with 86% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSink.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java with 80% similarity]
rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSource.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java with 80% similarity]
rest-services/pom.xml

index 9544a7f..3480403 100644 (file)
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>io.vavr</groupId>
-            <artifactId>vavr</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains</groupId>
-            <artifactId>annotations</artifactId>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>model</artifactId>
+            <version>${project.version}</version>
         </dependency>
 
         <dependency>
index cbdea00..3e295a0 100644 (file)
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions;
 
 import io.vavr.control.Either;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.2
  */
-@ExperimentalApi
 public class StreamParserError {
     private final String message;
 
index ca531e8..4fca3d9 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions;
 
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.2
  */
-@ExperimentalApi
 public class StreamParsingException extends CbsClientException {
 
     private final StreamParserError cause;
index 648b7a6..e9263f4 100644 (file)
@@ -23,10 +23,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import io.vavr.collection.Stream;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
 
 /**
  * Extract streams from the application configuration represented as GSON JsonObject.
@@ -64,7 +63,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Raw
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public final class DataStreams {
 
     private DataStreams() {
index a8ce364..2fd1a49 100644 (file)
@@ -21,8 +21,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
 
 import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStream;
 
 /**
  * Represents parser taking GSON JsonObject as an input
@@ -30,7 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface StreamFromGsonParser<S extends DataStream> extends StreamParser<JsonObject, S> {
 
 }
index 7476e97..e117a3c 100644 (file)
@@ -26,7 +26,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.*;
 
 /**
  * Factory methods for GSON-based {@code StreamParser}s
index 69016ed..61afbe4 100644 (file)
@@ -22,11 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
 
 import io.vavr.control.Either;
 import io.vavr.control.Try;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
 
 /**
  * A generic data stream parser which parses {@code T} to data stream {@code S}.
@@ -36,7 +35,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Raw
  * @param <S> output data type
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface StreamParser<T, S extends DataStream> {
 
     /**
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java
new file mode 100644 (file)
index 0000000..dfc6344
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import java.util.Objects;
+import java.util.function.Predicate;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+
+/**
+ * A small collection of predicates usable when filtering {@link RawDataStream}s.
+ *
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public final class StreamPredicates {
+
+    public StreamPredicates() {
+    }
+
+    /**
+     * Predicate for matching {@link RawDataStream} by name.
+     *
+     * @param name data stream name
+     * @param <T> type of data stream
+     * @return a predicate which returns true only when a stream name is equal to the given name
+     */
+    public static <T> Predicate<RawDataStream<T>> streamWithName(String name) {
+        return stream -> Objects.equals(stream.name(), name);
+    }
+
+    /**
+     * Predicate for matching {@link RawDataStream} by type.
+     *
+     * @param type data stream type
+     * @param <T> type of data stream
+     * @return a predicate which returns true only when a stream type is equal to the given type
+     */
+    public static <T> Predicate<RawDataStream<T>> streamOfType(StreamType type) {
+        return stream -> stream.type() == type;
+    }
+}
index 1148574..7f3ccf3 100644 (file)
@@ -25,9 +25,10 @@ import com.google.gson.JsonObject;
 import io.vavr.collection.Stream;
 import java.io.IOException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -46,9 +47,9 @@ public final class DataStreamUtils {
 
     public static void assertStreamType(
             RawDataStream<JsonObject> json,
-            String expectedType,
+            StreamType expectedType,
             DataStreamDirection expectedDirection) {
-        if (!json.type().equals(expectedType)) {
+        if (json.type() != expectedType) {
             throw new StreamParsingException(
                     "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'");
         }
@@ -71,7 +72,7 @@ public final class DataStreamUtils {
         return ImmutableRawDataStream.<JsonObject>builder()
                 .name(name)
                 .direction(direction)
-                .type(GsonUtils.requiredString(json, "type"))
+                .type(StreamType.parse(GsonUtils.requiredString(json, "type")))
                 .descriptor(json)
                 .build();
     }
index 0fdec5d..7776a1e 100644 (file)
@@ -35,9 +35,9 @@ import java.util.stream.Collectors;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.GsonAdaptersDataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.GsonAdaptersDataRouterSource;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
index a9dd0c4..68304ca 100644 (file)
@@ -25,12 +25,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 
 public final class StreamsConstants {
 
-    public static final String DATA_ROUTER_TYPE = "data_router";
-
-    public static final String MESSAGE_ROUTER_TYPE = "message_router";
-
-    public static final String KAFKA_TYPE = "kafka";
-
     public static final String DMAAP_INFO_CHILD_NAME = "dmaap_info";
 
     public static final String KAFKA_INFO_CHILD_NAME = "kafka_info";
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java
new file mode 100644 (file)
index 0000000..858fd73
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public class DmaapUtils {
+
+    public static final ImmutableAafCredentials EMPTY_CREDENTIALS = ImmutableAafCredentials.builder().build();
+
+    private DmaapUtils() {
+    }
+
+    public static @Nullable AafCredentials extractAafCredentials(Gson gson, JsonObject input) {
+        final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+        return EMPTY_CREDENTIALS.equals(aafCredentials) ? null : aafCredentials;
+    }
+}
index 83ca4cb..4cf7cbe 100644 (file)
@@ -22,17 +22,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink;
 
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -51,7 +51,7 @@ public final class DataRouterSinkParser implements StreamFromGsonParser<DataRout
 
     @Override
     public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) {
-        assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK);
+        assertStreamType(input, StreamType.DATA_ROUTER, DataStreamDirection.SINK);
 
         final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
         return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name());
index d8b7109..a880071 100644 (file)
@@ -22,17 +22,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource;
 
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -51,7 +51,7 @@ public final class DataRouterSourceParser implements StreamFromGsonParser<DataRo
 
     @Override
     public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) {
-        assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE);
+        assertStreamType(input, StreamType.DATA_ROUTER, DataStreamDirection.SOURCE);
 
         final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
         return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name());
index c5d254f..40b8f38 100644 (file)
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouter;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouter;
 
 import java.util.Objects;
 
index 7122d7c..650161f 100644 (file)
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
index 49871b1..286c449 100644 (file)
@@ -21,8 +21,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
 
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
index b1f037a..dc2c2e2 100644 (file)
@@ -23,17 +23,17 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 
 public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> {
 
@@ -49,9 +49,9 @@ public final class MessageRouterSinkParser implements StreamFromGsonParser<Messa
 
     @Override
     public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) {
-        assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK);
+        assertStreamType(input, StreamType.MESSAGE_ROUTER, DataStreamDirection.SINK);
 
-        final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
+        final AafCredentials aafCredentials = DmaapUtils.extractAafCredentials(gson, input.descriptor());
 
         final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
         final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
index e6b964d..148584a 100644 (file)
@@ -23,17 +23,17 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
 
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -52,9 +52,9 @@ public final class MessageRouterSourceParser implements StreamFromGsonParser<Mes
 
     @Override
     public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) {
-        assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE);
+        assertStreamType(input, StreamType.MESSAGE_ROUTER, DataStreamDirection.SOURCE);
 
-        final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
+        final AafCredentials aafCredentials = DmaapUtils.extractAafCredentials(gson, input.descriptor());
 
         final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
         final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
index ad9b021..a746fac 100644 (file)
@@ -23,8 +23,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 import java.util.Objects;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.Kafka;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
index 4990f80..4cc28b3 100644 (file)
@@ -22,8 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
index 137964c..1910828 100644 (file)
@@ -22,8 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gso
 
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
index 00e9b50..1cd3b48 100644 (file)
@@ -28,12 +28,11 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -52,7 +51,7 @@ public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> {
 
     @Override
     public KafkaSink unsafeParse(RawDataStream<JsonObject> input) {
-        assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK);
+        assertStreamType(input, StreamType.KAFKA, DataStreamDirection.SINK);
         final JsonObject json = input.descriptor();
 
         final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
index 9465564..7bdc12c 100644 (file)
@@ -28,12 +28,11 @@ import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.strea
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -52,7 +51,7 @@ public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource
 
     @Override
     public KafkaSource unsafeParse(RawDataStream<JsonObject> input) {
-        assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE);
+        assertStreamType(input, StreamType.KAFKA, DataStreamDirection.SOURCE);
         final JsonObject json = input.descriptor();
 
         final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
index 4cfa33a..50a004c 100644 (file)
@@ -27,8 +27,8 @@ import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import io.vavr.control.Option;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java
new file mode 100644 (file)
index 0000000..c57ce02
--- /dev/null
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+import com.google.gson.JsonObject;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class MessageRouterSinksIT {
+
+    final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_message_router.json");
+
+    MessageRouterSinksIT() throws IOException {
+    }
+
+    @Test
+    void thereShouldBeNoDataSources() {
+        assertThat(DataStreams.namedSources(json)).isEmpty();
+    }
+
+    @Test
+    void thereShouldBeSomeSinksDefined() {
+        assertThat(DataStreams.namedSinks(json)).isNotEmpty();
+        assertThat(DataStreams.namedSinks(json)).hasSize(4);
+    }
+
+    @Test
+    void allSinksShouldBeOfMessageRouterType() {
+        assertThat(DataStreams.namedSinks(json).map(RawDataStream::type).distinct())
+                .containsExactly(StreamType.MESSAGE_ROUTER);
+    }
+
+    @Test
+    void sinksShouldHaveProperDirection() {
+        assertThat(DataStreams.namedSinks(json).map(RawDataStream::direction).distinct())
+                .containsExactly(DataStreamDirection.SINK);
+    }
+
+    @Test
+    void verifySecMeasurementSink() {
+        // given
+        final String streamName = "sec_measurement";
+        final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+                .get();
+
+        // when
+        final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+        // then
+        assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull();
+        assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+        assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+        assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+        assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("111111");
+        assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+        assertThat(parsedSink.topicUrl()).describedAs("topic url")
+                .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT");
+    }
+
+    @Test
+    void verifySecFaultUnsecureSink() {
+        // given
+        final String streamName = "sec_fault_unsecure";
+        final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+                .get();
+
+        // when
+        final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+        // then
+        assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull();
+        assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+        assertThat(parsedSink.clientId()).describedAs("client id").isNull();
+        assertThat(parsedSink.clientRole()).describedAs("client role").isNull();
+        assertThat(parsedSink.topicUrl()).describedAs("topic url")
+                .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV");
+    }
+
+    @Test
+    void verifySecMeasurementUnsecureSink() {
+        // given
+        final String streamName = "sec_measurement_unsecure";
+        final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+                .get();
+
+        // when
+        final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+        // then
+        assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull();
+        assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+        assertThat(parsedSink.clientId()).describedAs("client id").isNull();
+        assertThat(parsedSink.clientRole()).describedAs("client role").isNull();
+        assertThat(parsedSink.topicUrl()).describedAs("topic url")
+                .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV");
+    }
+
+    @Test
+    void verifySecFaultSink() {
+        // given
+        final String streamName = "sec_fault";
+        final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName))
+                .get();
+
+        // when
+        final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+        // then
+        assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull();
+        assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+        assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+        assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5");
+        assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("222222");
+        assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+        assertThat(parsedSink.topicUrl()).describedAs("topic url")
+                .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT");
+    }
+}
\ No newline at end of file
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java
new file mode 100644 (file)
index 0000000..4508939
--- /dev/null
@@ -0,0 +1,204 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class MixedDmaapStreamsIT {
+
+    final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_mixed_dmaap.json");
+    final List<RawDataStream<JsonObject>> sources = DataStreams.namedSources(json).toList();
+    final List<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json).toList();
+
+    MixedDmaapStreamsIT() throws IOException {
+    }
+
+    @Test
+    void thereShouldBeSomeSinksDefined() {
+        assertThat(sinks).isNotEmpty();
+        assertThat(sinks).hasSize(3);
+    }
+
+    @Test
+    void thereShouldBeSomeSourcesDefined() {
+        assertThat(sources).isNotEmpty();
+        assertThat(sources).hasSize(3);
+    }
+
+    @Test
+    void allStreamsShouldBeOfProperType() {
+        assertThat(sources.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER, StreamType.MESSAGE_ROUTER);
+        assertThat(sinks.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER);
+    }
+
+    @Test
+    void sinksShouldHaveProperDirection() {
+        assertThat(sinks.map(RawDataStream::direction).distinct())
+                .containsExactly(DataStreamDirection.SINK);
+    }
+
+    @Test
+    void sourcesShouldHaveProperDirection() {
+        assertThat(sources.map(RawDataStream::direction).distinct())
+                .containsExactly(DataStreamDirection.SOURCE);
+    }
+
+    @Test
+    void verifyDcaeGuestOsSource() {
+        // given
+        final String streamName = "DCAE_GUEST_OS";
+        final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get();
+
+        // when
+        final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source);
+
+        // then
+        assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+        assertThat(parsedSource.username()).describedAs("user name").isEqualTo("xyz");
+        assertThat(parsedSource.password()).describedAs("password").isEqualTo("abc");
+        assertThat(parsedSource.deliveryUrl()).describedAs("delivery url")
+                .isEqualTo("https://dr.global:8666/DCAE_SAM_GUEST_OS");
+        assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("811");
+    }
+
+    @Test
+    void verifyDcaeRawDataSource() {
+        // given
+        final String streamName = "DCAE_RAW_DATA";
+        final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get();
+
+        // when
+        final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source);
+
+        // then
+        assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+        assertThat(parsedSource.username()).describedAs("user name").isEqualTo("abc");
+        assertThat(parsedSource.password()).describedAs("password").isEqualTo("xyz");
+        assertThat(parsedSource.deliveryUrl()).describedAs("delivery url")
+                .isEqualTo("https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA");
+        assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("812");
+    }
+
+    @Test
+    void verifySecMeasurementOutputSource() {
+        // given
+        final String streamName = "sec-measurement-output";
+        final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName))
+                .get();
+
+        // when
+        final MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+        // then
+        assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSource.aafCredentials()).describedAs("aaf credentials").isNotNull();
+        assertThat(parsedSource.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username");
+        assertThat(parsedSource.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password");
+        assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23");
+        assertThat(parsedSource.clientId()).describedAs("client id").isEqualTo("1111");
+        assertThat(parsedSource.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member");
+        assertThat(parsedSource.topicUrl()).describedAs("topic url")
+                .isEqualTo("https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1");
+    }
+
+    @Test
+    void verifyDcaeVoipPmDataSink() {
+        // given
+        final String streamName = "DCAE_VOIP_PM_DATA";
+        final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+        // when
+        final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+        // then
+        assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23");
+        assertThat(parsedSink.username()).describedAs("user name").isEqualTo("abc");
+        assertThat(parsedSink.password()).describedAs("password").isEqualTo("xyz");
+        assertThat(parsedSink.logUrl()).describedAs("log url")
+                .isEqualTo("https://dcae-drps/feedlog/206");
+        assertThat(parsedSink.publishUrl()).describedAs("publish url")
+                .isEqualTo("https://dcae-drps/publish/206");
+        assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("206.518hu");
+    }
+
+    @Test
+    void verifyDcaeGuestOsOSink() {
+        // given
+        final String streamName = "DCAE_GUEST_OS_O";
+        final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+        // when
+        final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+        // then
+        assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23");
+        assertThat(parsedSink.username()).describedAs("user name").isEqualTo("axyz");
+        assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc");
+        assertThat(parsedSink.logUrl()).describedAs("log url")
+                .isEqualTo("https://dcae-drps/feedlog/203");
+        assertThat(parsedSink.publishUrl()).describedAs("publish url")
+                .isEqualTo("https://dcae-drps/publish/203");
+        assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("203.2od8s");
+    }
+
+
+    @Test
+    void verifyDcaePmDataSink() {
+        // given
+        final String streamName = "DCAE_PM_DATA";
+        final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get();
+
+        // when
+        final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink);
+
+        // then
+        assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName);
+        assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23bdce2");
+        assertThat(parsedSink.username()).describedAs("user name").isEqualTo("xyz");
+        assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc");
+        assertThat(parsedSink.logUrl()).describedAs("log url")
+                .isEqualTo("https://dcae-drps/feedlog/493");
+        assertThat(parsedSink.publishUrl()).describedAs("publish url")
+                .isEqualTo("https://dcae-drps/publish/493");
+        assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("493.eacqs");
+    }
+
+}
\ No newline at end of file
index a51b87a..a296c92 100644 (file)
@@ -23,9 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
 
 import com.google.gson.JsonObject;
-import io.vavr.collection.Map;
 import io.vavr.collection.Stream;
 import java.time.Duration;
 import org.junit.jupiter.api.AfterAll;
@@ -42,10 +44,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.Strea
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -182,12 +184,11 @@ class CbsClientImplIT {
         // when
         final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
                 .map(json -> {
-                    final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
-                            .groupBy(RawDataStream::type);
+                    final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json);
 
-                    final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+                    final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA))
                             .map(kafkaSinkParser::unsafeParse);
-                    final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+                    final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER))
                             .map(mrSinkParser::unsafeParse);
 
                     assertThat(allKafkaSinks.size())
@@ -225,8 +226,8 @@ class CbsClientImplIT {
                 .expectErrorSatisfies(ex -> {
                     assertThat(ex).isInstanceOf(StreamParsingException.class);
                     assertThat(ex).hasMessageContaining("Invalid stream type");
-                    assertThat(ex).hasMessageContaining("message_router");
-                    assertThat(ex).hasMessageContaining("kafka");
+                    assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString());
+                    assertThat(ex).hasMessageContaining(KAFKA.toString());
                 })
                 .verify(Duration.ofSeconds(5));
     }
diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java
new file mode 100644 (file)
index 0000000..a26af44
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class DmaapUtilsTest {
+
+    @Test
+    void extractAafCredentials_shouldReturnNull_whenAllFieldsAreNull() {
+        // given
+        Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+        JsonObject json = gson.fromJson("{\"aaf_username\":null,\"aaf_password\":null}", JsonObject.class);
+
+        // when
+        final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+        // then
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void extractAafCredentials_shouldReturnNull_whenAllFieldsAreAbsent() {
+        // given
+        Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+        JsonObject json = gson.fromJson("{\"whatever\":\"else\"}", JsonObject.class);
+
+        // when
+        final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+        // then
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void extractAafCredentials_shouldReturnValue_whenBothFieldsAreSet() {
+        // given
+        Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+        JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":\"passwd\"}", JsonObject.class);
+
+        // when
+        final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+        // then
+        assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").password("passwd").build());
+    }
+
+    @Test
+    void extractAafCredentials_shouldReturnValueWithUser_whenOnlyUserIsSet() {
+        // given
+        Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+        JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\"}", JsonObject.class);
+
+        // when
+        final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+        // then
+        assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build());
+    }
+
+    @Test
+    void extractAafCredentials_shouldReturnValueWithUser_whenPasswordIsNull() {
+        // given
+        Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create();
+        JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":null}", JsonObject.class);
+
+        // when
+        final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json);
+
+        // then
+        assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build());
+    }
+}
\ No newline at end of file
index 7092de5..90c6994 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
 
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import com.google.gson.JsonObject;
 import io.vavr.control.Either;
+import java.io.IOException;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink;
 
 
 class DataRouterSinkParserTest {
@@ -101,8 +98,8 @@ class DataRouterSinkParserTest {
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
         result.peekLeft(error -> {
                     assertThat(error.message()).contains("Invalid stream type");
-                    assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
-                            + MESSAGE_ROUTER_TYPE + "'");
+                    assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '"
+                            + StreamType.MESSAGE_ROUTER + "'");
                 }
         );
     }
@@ -113,7 +110,7 @@ class DataRouterSinkParserTest {
         JsonObject json = new JsonObject();
         final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
                 .name("empty")
-                .type("data_router")
+                .type(StreamType.DATA_ROUTER)
                 .descriptor(json)
                 .direction(DataStreamDirection.SINK)
                 .build();
index b2d0130..f704e52 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import com.google.gson.JsonObject;
 import io.vavr.control.Either;
+import java.io.IOException;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource;
 
 public class DataRouterSourceParserTest {
 
@@ -100,8 +95,8 @@ public class DataRouterSourceParserTest {
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
         result.peekLeft(error -> {
                     assertThat(error.message()).contains("Invalid stream type");
-                    assertThat(error.message()).contains("Expected '" + DATA_ROUTER_TYPE + "', but was '"
-                            + MESSAGE_ROUTER_TYPE + "'");
+                    assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '"
+                            + StreamType.MESSAGE_ROUTER + "'");
                 }
         );
     }
@@ -112,7 +107,7 @@ public class DataRouterSourceParserTest {
         JsonObject json = new JsonObject();
         final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
                 .name("empty")
-                .type("data_router")
+                .type(StreamType.DATA_ROUTER)
                 .descriptor(json)
                 .direction(DataStreamDirection.SOURCE)
                 .build();
index 4d3b88b..e3182c5 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
 
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import com.google.gson.JsonObject;
 import io.vavr.control.Either;
+import java.io.IOException;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -83,8 +79,7 @@ public class MessageRouterSinkParserTest {
         // then
         assertThat(result).isInstanceOf(MessageRouterSink.class);
         assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
-        assertThat(result.aafCredentials().username()).isNull();
-        assertThat(result.aafCredentials().password()).isNull();
+        assertThat(result.aafCredentials()).isNull();
         assertThat(result.clientId()).isNull();
     }
 
@@ -100,8 +95,8 @@ public class MessageRouterSinkParserTest {
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
         result.peekLeft(error -> {
                     assertThat(error.message()).contains("Invalid stream type");
-                    assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
-                            + DATA_ROUTER_TYPE + "'");
+                    assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '"
+                            + StreamType.DATA_ROUTER + "'");
                 }
         );
     }
@@ -112,7 +107,7 @@ public class MessageRouterSinkParserTest {
         JsonObject json = new JsonObject();
         final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
                 .name("empty")
-                .type("data_router")
+                .type(StreamType.MESSAGE_ROUTER)
                 .descriptor(json)
                 .direction(DataStreamDirection.SINK)
                 .build();
index d497817..51e5676 100644 (file)
  */
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
 
-import com.google.gson.Gson;
+import static org.assertj.core.api.Assertions.assertThat;
+
 import com.google.gson.JsonObject;
 import io.vavr.control.Either;
+import java.io.IOException;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
-
-import java.io.IOException;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
+import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
 
 /**
  * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
@@ -81,8 +77,7 @@ public class MessageRouterSourceParserTest {
 
         // then
         assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
-        assertThat(result.aafCredentials().username()).isNull();
-        assertThat(result.aafCredentials().password()).isNull();
+        assertThat(result.aafCredentials()).isNull();
         assertThat(result.clientId()).isNull();
     }
 
@@ -98,8 +93,8 @@ public class MessageRouterSourceParserTest {
         assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
         result.peekLeft(error -> {
                     assertThat(error.message()).contains("Invalid stream type");
-                    assertThat(error.message()).contains("Expected '" + MESSAGE_ROUTER_TYPE + "', but was '"
-                            + DATA_ROUTER_TYPE + "'");
+                    assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '"
+                            + StreamType.DATA_ROUTER + "'");
                 }
         );
     }
@@ -110,7 +105,7 @@ public class MessageRouterSourceParserTest {
         JsonObject json = new JsonObject();
         final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
                 .name("empty")
-                .type("data_router")
+                .type(StreamType.MESSAGE_ROUTER)
                 .descriptor(json)
                 .direction(DataStreamDirection.SOURCE)
                 .build();
index 5974639..2e4f71b 100644 (file)
@@ -30,10 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
index d255d99..1e8e3f5 100644 (file)
@@ -31,10 +31,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.St
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json
new file mode 100644 (file)
index 0000000..d38b0cc
--- /dev/null
@@ -0,0 +1,62 @@
+{
+    "collector.schema.file": "./etc/CommonEventFormat_27.2.json",
+    "collector.service.port": 8080,
+    "collector.dmaap.streamid": "fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert",
+    "collector.schema.checkflag": 1,
+    "tomcat.maxthreads": "200",
+    "collector.keystore.passwordfile": "/opt/app/dcae-certificate/.password",
+    "streams_subscribes": {},
+    "services_calls": {},
+    "collector.inputQueue.maxPending": 8096,
+    "header.authflag": 0,
+    "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks",
+    "collector.service.secure.port": -1,
+    "header.authlist": "userid1,base64encodepwd1|userid2,base64encodepwd2",
+    "collector.keystore.alias": "dynamically generated",
+    "streams_publishes": {
+        "sec_measurement": {
+            "type": "message_router",
+            "aaf_password": "aaf_password",
+            "dmaap_info": {
+                "location": "mtl5",
+                "client_id": "111111",
+                "client_role": "com.att.dcae.member",
+                "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT"
+            },
+            "aaf_username": "aaf_username"
+        },
+        "sec_fault_unsecure": {
+            "type": "message_router",
+            "aaf_password": null,
+            "dmaap_info": {
+                "location": "mtl5",
+                "client_id": null,
+                "client_role": null,
+                "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+            },
+            "aaf_username": null
+        },
+        "sec_measurement_unsecure": {
+            "type": "message_router",
+            "aaf_password": null,
+            "dmaap_info": {
+                "location": "mtl5",
+                "client_id": null,
+                "client_role": null,
+                "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+            },
+            "aaf_username": null
+        },
+        "sec_fault": {
+            "type": "message_router",
+            "aaf_password": "aaf_password",
+            "dmaap_info": {
+                "location": "mtl5",
+                "client_id": "222222",
+                "client_role": "com.att.dcae.member",
+                "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT"
+            },
+            "aaf_username": "aaf_username"
+        }
+    }
+}
diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json
new file mode 100644 (file)
index 0000000..acc7b98
--- /dev/null
@@ -0,0 +1,80 @@
+{
+
+    "streams_subscribes": {
+        "DCAE_GUEST_OS": {
+            "type": "data_router",
+            "dmaap_info": {
+                "username": "xyz",
+                "password": "abc",
+                "location": "mtn23",
+                "delivery_url": "https://dr.global:8666/DCAE_SAM_GUEST_OS",
+                "subscriber_id": "811"
+            }
+        },
+        "DCAE_RAW_DATA": {
+            "type": "data_router",
+            "dmaap_info": {
+                "username": "abc",
+                "password": "xyz",
+                "location": "mtn23",
+                "delivery_url": "https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA",
+                "subscriber_id": "812"
+            }
+        },
+        "sec-measurement-output": {
+            "type": "message_router",
+            "aaf_password": "aaf_password",
+            "dmaap_info": {
+                "topic_url": "https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1",
+                "client_role": "com.att.dcae.member",
+                "location": "mtn23",
+                "client_id": "1111"
+            },
+            "aaf_username": "aaf_username"
+
+        }
+
+    },
+
+    "streams_publishes": {
+
+        "DCAE_VOIP_PM_DATA": {
+            "type": "data_router",
+            "dmaap_info": {
+                "username": "abc",
+                "log_url": "https://dcae-drps/feedlog/206",
+                "publish_url": "https://dcae-drps/publish/206",
+                "location": "mtn23",
+                "password": "xyz",
+                "publisher_id": "206.518hu"
+
+            }
+        },
+
+        "DCAE_GUEST_OS_O": {
+            "type": "data_router",
+            "dmaap_info": {
+                "username": "axyz",
+                "log_url": "https://dcae-drps/feedlog/203",
+                "publish_url": "https://dcae-drps/publish/203",
+                "location": "mtn23",
+                "password": "abc",
+
+                "publisher_id": "203.2od8s"
+            }
+        },
+
+        "DCAE_PM_DATA": {
+            "type": "data_router",
+            "dmaap_info": {
+                "username": "xyz",
+                "log_url": "https://dcae-drps/feedlog/493",
+                "publish_url": "https://dcae-drps/publish/493",
+                "location": "mtn23bdce2",
+                "password": "abc",
+                "publisher_id": "493.eacqs"
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/rest-services/model/pom.xml b/rest-services/model/pom.xml
new file mode 100644 (file)
index 0000000..51f8ffc
--- /dev/null
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START====================================
+  ~ DCAEGEN2-SERVICES-SDK
+  ~ =========================================================
+  ~ Copyright (C) 2019 Nokia. All rights reserved.
+  ~ =========================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=====================================
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onap.dcaegen2.services.sdk</groupId>
+        <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
+        <version>1.1.4-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+    <artifactId>model</artifactId>
+
+    <name>dcaegen2-services-sdk-rest-services-model</name>
+    <description>Rest Services Model</description>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.immutables</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.immutables</groupId>
+            <artifactId>value</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.vavr</groupId>
+            <artifactId>vavr</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains</groupId>
+            <artifactId>annotations</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+package org.onap.dcaegen2.services.sdk.model.streams;
 
 
 import com.google.gson.annotations.SerializedName;
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * Represents the AAF Credentials. Currently it contains only user name and password.
@@ -33,7 +32,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 @Value.Immutable
 @Gson.TypeAdapters
 public interface AafCredentials {
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+package org.onap.dcaegen2.services.sdk.model.streams;
 
 import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * Represents a named data stream.
@@ -29,7 +28,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface DataStream {
     @Value.Default
     default String name() {
@@ -18,7 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+package org.onap.dcaegen2.services.sdk.model.streams;
 
 import org.immutables.value.Value;
 
@@ -32,7 +32,7 @@ import org.immutables.value.Value;
 @Value.Immutable
 public interface RawDataStream<T> {
     String name();
-    String type();
+    StreamType type();
     DataStreamDirection direction();
     T descriptor();
 }
@@ -18,9 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+package org.onap.dcaegen2.services.sdk.model.streams;
 
 /**
  * Represents an output stream, ie. one of objects in <em>streams_publishes</em> array from application configuration.
@@ -29,7 +27,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface SinkStream extends DataStream {
 
 }
@@ -18,9 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
-
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+package org.onap.dcaegen2.services.sdk.model.streams;
 
 /**
  * Represents an input stream, ie. one of objects in <em>streams_subscribes</em> array from application configuration.
@@ -29,7 +27,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface SourceStream extends DataStream {
 
 }
diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java
new file mode 100644 (file)
index 0000000..2e08c82
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.model.streams;
+
+import io.vavr.collection.Stream;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+public enum StreamType {
+    MESSAGE_ROUTER("message_router"),
+    DATA_ROUTER("data_router"),
+    KAFKA("kafka"),
+    UNKNOWN("unknown");
+
+    private final String rawType;
+
+    StreamType(String rawType) {
+        this.rawType = rawType;
+    }
+
+    public static StreamType parse(@NotNull String rawType) {
+        return Stream.of(StreamType.values())
+                .find(type -> type.rawType.equals(rawType))
+                .getOrElse(UNKNOWN);
+    }
+
+    @Override
+    public String toString() {
+        return rawType;
+    }
+}
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 
 import com.google.gson.annotations.SerializedName;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface DataRouter {
 
     /**
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 
 import com.google.gson.annotations.SerializedName;
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
 @Gson.TypeAdapters
-@ExperimentalApi
 @Value.Immutable
 public interface DataRouterSink extends DataRouter, SinkStream {
 
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 
 import com.google.gson.annotations.SerializedName;
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SourceStream;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
 @Gson.TypeAdapters
-@ExperimentalApi
 @Value.Immutable
 public interface DataRouterSource extends DataRouter, SourceStream {
 
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 import static io.vavr.Predicates.not;
 
 import io.vavr.collection.List;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface Kafka {
 
     /**
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 @Value.Immutable
 public interface KafkaSink extends Kafka, SinkStream {
 
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SourceStream;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 @Value.Immutable
 public interface KafkaSource extends Kafka, SourceStream {
 
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 import com.google.gson.annotations.SerializedName;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface MessageRouter {
 
     /**
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 @Value.Immutable
 public interface MessageRouterSink extends MessageRouter, SinkStream {
 
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+package org.onap.dcaegen2.services.sdk.model.streams.dmaap;
 
 import org.immutables.value.Value;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream;
+import org.onap.dcaegen2.services.sdk.model.streams.SourceStream;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 @Value.Immutable
 public interface MessageRouterSource extends MessageRouter, SourceStream {
 
index f54ea77..aa8caf2 100644 (file)
@@ -18,6 +18,7 @@
   <packaging>pom</packaging>
 
   <modules>
+    <module>model</module>
     <module>common-dependency</module>
     <module>aai-client</module>
     <module>cbs-client</module>