Add JavaDoc to CBS streams API 89/82789/3
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 20 Mar 2019 10:40:27 +0000 (11:40 +0100)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Fri, 22 Mar 2019 11:19:23 +0000 (12:19 +0100)
Change-Id: Ia702750e0ec6bf54418eafb00523681429d86b22
Issue-ID: DCAEGEN2-1341
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
22 files changed:
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java

index aa8c2ed..ca531e8 100644 (file)
@@ -36,6 +36,10 @@ public class StreamParsingException extends CbsClientException {
         this.cause = cause;
     }
 
+    public StreamParsingException(String message) {
+        this(new StreamParserError(message));
+    }
+
     public StreamParserError cause() {
         return cause;
     }
index 4fdb31b..648b7a6 100644 (file)
@@ -29,6 +29,38 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
 
 /**
+ * Extract streams from the application configuration represented as GSON JsonObject.
+ *
+ * Example input:
+ * <pre>
+ * {
+ *     "application_config_1": "value_1,
+ *     ...
+ *     "streams_publishes": {
+ *         "stream1": {
+ *             "type": "message_router",
+ *             "dmaap_info": {
+ *                 ...
+ *             }
+ *         },
+ *         "stream2": {
+ *             "type": "data_router",
+ *             "dmaap_info": {
+ *                 ...
+ *             }
+ *         }
+ *     },
+ *     "streams_subscribes": {
+ *         "stream3": {
+ *             "type": "message_router",
+ *             "dmaap_info": {
+ *                 ...
+ *             }
+ *         },
+ *     }
+ * }
+ * </pre>
+ *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
@@ -38,10 +70,78 @@ public final class DataStreams {
     private DataStreams() {
     }
 
+    /**
+     * <p>
+     * Extracts sources from application configuration. Parses <em>streams_subscribes</em> subtree.
+     * </p>
+     *
+     * <p>
+     * For sample input it will yield:
+     * </p>
+     *
+     * <pre>
+     * [
+     *     RawDataStream{
+     *         name="stream3"
+     *         type="message_router"
+     *         direction=SOURCE
+     *         descriptor=JsonObject{
+     *             type: "message_router",
+     *             dmaap_info: {
+     *                 ...
+     *             }
+     *         }
+     *     }
+     * ]
+     * </pre>
+     *
+     * @param rootJson - the full application configuration
+     * @return io.vavr.collection.Stream of data streams
+     */
     public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) {
         return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE);
     }
 
+
+    /**
+     * <p>
+     * Extracts sinks from application configuration. Parses <em>streams_publishes</em> subtree.
+     * </p>
+     *
+     * <p>
+     * For sample input it will yield:
+     * </p>
+     *
+     * <pre>
+     * [
+     *     RawDataStream{
+     *         name="stream1"
+     *         type="message_router"
+     *         direction=SINK
+     *         descriptor=JsonObject{
+     *             type: "message_router",
+     *             dmaap_info: {
+     *                 ...
+     *             }
+     *         }
+     *     },
+     *     RawDataStream{
+     *         name="stream2"
+     *         type="data_router"
+     *         direction=SINK
+     *         descriptor=JsonObject{
+     *             type: "data_router"
+     *             dmaap_info: {
+     *                 ...
+     *             }
+     *         }
+     *     }
+     * ]
+     * </pre>
+     *
+     * @param rootJson - the full application configuration
+     * @return io.vavr.collection.Stream of data streams
+     */
     public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) {
         return createCollectionOfStreams(rootJson, DataStreamDirection.SINK);
     }
index 460d710..a8ce364 100644 (file)
@@ -25,6 +25,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
 
 /**
+ * Represents parser taking GSON JsonObject as an input
+ *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
index 7ae92ba..7476e97 100644 (file)
@@ -29,6 +29,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
 
 /**
+ * Factory methods for GSON-based {@code StreamParser}s
+ *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
@@ -37,26 +39,56 @@ public final class StreamFromGsonParsers {
     private StreamFromGsonParsers() {
     }
 
+    /**
+     * Creates a stream parser capable of parsing Kafka sinks.
+     *
+     * @return a stream parser
+     */
     public static StreamFromGsonParser<KafkaSink> kafkaSinkParser() {
         return KafkaSinkParser.create();
     }
 
+    /**
+     * Creates a stream parser capable of parsing Kafka sources.
+     *
+     * @return a stream parser
+     */
     public static StreamFromGsonParser<KafkaSource> kafkaSourceParser() {
         return KafkaSourceParser.create();
     }
 
+    /**
+     * Creates a stream parser capable of parsing DMaaP Message Router sinks.
+     *
+     * @return a stream parser
+     */
     public static StreamFromGsonParser<MessageRouterSink> messageRouterSinkParser() {
         return MessageRouterSinkParser.create();
     }
 
+    /**
+     * Creates a stream parser capable of parsing DMaaP Message Router sources.
+     *
+     * @return a stream parser
+     */
     public static StreamFromGsonParser<MessageRouterSource> messageRouterSourceParser() {
         return MessageRouterSourceParser.create();
     }
 
+    /**
+     * Creates a stream parser capable of parsing DMaaP Data Router sinks.
+     *
+     * @return a stream parser
+     */
     public static StreamFromGsonParser<DataRouterSink> dataRouterSinkParser() {
         return DataRouterSinkParser.create();
     }
 
+    /**
+     * Creates a stream parser capable of parsing DMaaP Data Router sources.
+     *
+     * @return a stream parser
+     */
     public static StreamFromGsonParser<DataRouterSource> dataRouterSourceParser() {
         return DataRouterSourceParser.create();
     }
index d34b144..1148574 100644 (file)
@@ -24,6 +24,7 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import io.vavr.collection.Stream;
 import java.io.IOException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
@@ -48,11 +49,11 @@ public final class DataStreamUtils {
             String expectedType,
             DataStreamDirection expectedDirection) {
         if (!json.type().equals(expectedType)) {
-            throw new IllegalArgumentException(
+            throw new StreamParsingException(
                     "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'");
         }
         if (json.direction() != expectedDirection) {
-            throw new IllegalArgumentException(
+            throw new StreamParsingException(
                     "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction()
                             + "'");
         }
index 0b66228..0fdec5d 100644 (file)
@@ -32,6 +32,7 @@ import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
@@ -71,7 +72,7 @@ public final class GsonUtils {
 
     public static JsonElement requiredChild(JsonObject parent, String childName) {
         return optionalChild(parent, childName)
-                .getOrElseThrow(() -> new IllegalArgumentException(
+                .getOrElseThrow(() -> new StreamParsingException(
                         "Could not find sub-node '" + childName + "'. Actual sub-nodes: "
                                 + stringifyChildrenNames(parent)));
 
index c3c70b7..9fa83bc 100644 (file)
@@ -28,8 +28,10 @@ import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
+ * Represents the AAF Credentials. Currently it contains only user name and password.
+ *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version 1.2.1
+ * @since 1.1.4
  */
 @ExperimentalApi
 @Value.Immutable
index 37bf7e5..1950a30 100644 (file)
@@ -24,8 +24,10 @@ import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
+ * Represents a named data stream.
+ *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version  1.2.1
+ * @since 1.1.4
  */
 @ExperimentalApi
 public interface DataStream {
index f3cac54..3d05c9a 100644 (file)
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
 
 /**
+ * The direction of the stream, ie. whether it's input ({@code SOURCE}) or output ({@code SINK}) stream.
+ *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
+ * @since 1.1.4
  */
 public enum DataStreamDirection {
 
@@ -35,6 +37,11 @@ public enum DataStreamDirection {
         this.configurationKey = configurationKey;
     }
 
+    /**
+     * The configuration key under which the single stream definitions should reside.
+     *
+     * @return the configuration key
+     */
     public String configurationKey() {
         return configurationKey;
     }
index 7a39ede..d6bc800 100644 (file)
@@ -23,8 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
 import org.immutables.value.Value;
 
 /**
+ * Represents a raw/uninterpreted data stream.
+ *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
+ * @since 1.1.4
+ * @param <T> type of raw data, eg. JsonObject
  */
 @Value.Immutable
 public interface RawDataStream<T> {
index e338920..7002fd6 100644 (file)
@@ -23,10 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
- * AKA PublishStream
+ * Represents an output stream, ie. one of objects in <em>streams_publishes</em> array from application configuration.
+ * Application can put data to this stream.
  *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version  1.2.1
+ * @since 1.1.4
  */
 @ExperimentalApi
 public interface SinkStream extends DataStream {
index 2bea143..c5ab8a3 100644 (file)
@@ -23,10 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
- * AKA SubscribeStream
+ * Represents an input stream, ie. one of objects in <em>streams_subscribes</em> array from application configuration.
+ * Application can read data from this stream.
  *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version  1.2.1
+ * @since 1.1.4
  */
 @ExperimentalApi
 public interface SourceStream extends DataStream {
index b4b5549..072d4b0 100644 (file)
@@ -17,7 +17,6 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
@@ -27,17 +26,34 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version 1.2.1
+ * @since 1.1.4
  */
 @ExperimentalApi
 public interface DataRouter {
 
+    /**
+     * DCAE location for the publisher, used to set up routing.
+     */
     @SerializedName("location")
     @Nullable String location();
 
+    /**
+     * Username
+     * <ul>
+     * <li>Data Router uses to authenticate to the subscriber when delivering files OR</li>
+     * <li>the publisher uses to authenticate to Data Router.</li>
+     * </ul>
+     */
     @SerializedName("username")
     @Nullable String username();
 
+    /**
+     * Password
+     * <ul>
+     * <li>Data Router uses to authenticate to the subscriber when delivering files OR</li>
+     * <li>the publisher uses to authenticate to Data Router.</li>
+     * </ul>
+     */
     @SerializedName("password")
     @Nullable String password();
 }
index 60d9100..baddb91 100644 (file)
@@ -17,7 +17,6 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
@@ -30,19 +29,28 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sin
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version 1.2.1
+ * @since 1.1.4
  */
 @Gson.TypeAdapters
 @ExperimentalApi
 @Value.Immutable
 public interface DataRouterSink extends DataRouter, SinkStream {
 
+    /**
+     * URL to which the publisher makes Data Router publish requests.
+     */
     @SerializedName("publish_url")
     String publishUrl();
 
+    /**
+     * Publisher id in Data Router
+     */
     @SerializedName("publisher_id")
     @Nullable String publisherId();
 
+    /**
+     * URL from which log data for the feed can be obtained.
+     */
     @SerializedName("log_url")
     @Nullable String logUrl();
 
index b6dedb7..d089a40 100644 (file)
@@ -17,7 +17,6 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 
@@ -30,17 +29,23 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sou
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version 1.2.1
+ * @since 1.1.4
  */
 @Gson.TypeAdapters
 @ExperimentalApi
 @Value.Immutable
 public interface DataRouterSource extends DataRouter, SourceStream {
 
-    // in future, since crucial need to be verified if could be nullable
+    /**
+     * URL to which the Data Router should deliver files.
+     */
+    // TODO: since crucial, we need to verify if it should be non-null
     @SerializedName("delivery_url")
     @Nullable String deliveryUrl();
 
+    /**
+     * Subscriber id in Data Router.
+     */
     @SerializedName("subscriber_id")
     @Nullable String subscriberId();
 }
index 1810fc6..42558cb 100644 (file)
@@ -17,7 +17,6 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 import static io.vavr.Predicates.not;
@@ -35,21 +34,44 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Aaf
 @ExperimentalApi
 public interface Kafka {
 
+    /**
+     * Kafka bootstrap servers as defined in Kafka client documentation under <em>bootstrap.servers</em> configuration
+     * key.
+     */
     String bootstrapServers();
 
+    /**
+     * The name of the topic where application should publish or subscribe for the messages.
+     */
     String topicName();
 
+    /**
+     * The credentials to use when authenticating to Kafka cluster or null when connection should be unauthenticated.
+     */
     @Nullable AafCredentials aafCredentials();
 
+    /**
+     * AAF client role that’s requesting publish or subscribe access to the topic.
+     */
     @Nullable String clientRole();
 
+    /**
+     * Client id for given AAF client.
+     */
     @Nullable String clientId();
 
+    /**
+     * The limit on the size of message published to/subscribed from the topic. Can be used to set Kafka client
+     * <em>max.request.size</em> configuration property.
+     */
     @Value.Default
     default int maxPayloadSizeBytes() {
         return 1024 * 1024;
     }
 
+    /**
+     * The {@code bootstrapServers} converted to the list of servers' addresses.
+     */
     @Value.Derived
     default List<String> bootstrapServerList() {
         return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty));
index 65280a9..78f5c3a 100644 (file)
@@ -17,7 +17,6 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 import org.immutables.value.Value;
@@ -33,5 +32,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sou
 @Value.Immutable
 public interface KafkaSource extends Kafka, SourceStream {
 
+    /**
+     * A unique string that identifies the consumer group this consumer belongs to as defined in Kafka consumer
+     * configuration key <em>group.id</em>.
+     */
     @Nullable String consumerGroupId();
 }
index 7ed720d..3cca513 100644 (file)
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
-
 import com.google.gson.annotations.SerializedName;
 import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
@@ -28,23 +26,38 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Aaf
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version 1.2.1
+ * @since 1.1.4
  */
 @ExperimentalApi
 public interface MessageRouter {
 
+    /**
+     * URL for accessing the topic to publish or receive events.
+     */
     @SerializedName("topic_url")
     String topicUrl();
 
+    /**
+     * AAF client role that’s requesting publish or subscribe access to the topic.
+     */
     @SerializedName("client_role")
     @Nullable String clientRole();
 
+    /**
+     * Client id for given AAF client.
+     */
     @SerializedName("client_id")
     @Nullable String clientId();
 
+    /**
+     * DCAE location for the publisher or subscriber, used to set up routing.
+     */
     @SerializedName("location")
     @Nullable String location();
 
+    /**
+     * The AAF credentials.
+     */
     @SerializedName("aaf_credentials")
     @Nullable AafCredentials aafCredentials();
 }
index 8f4a533..3af7963 100644 (file)
@@ -17,7 +17,6 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 import org.immutables.value.Value;
@@ -26,7 +25,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sin
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version 1.2.1
+ * @since 1.1.4
  */
 @ExperimentalApi
 @Value.Immutable
index ab80203..c7159f2 100644 (file)
@@ -17,7 +17,6 @@
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
 
 import org.immutables.value.Value;
@@ -26,7 +25,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sou
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @version 1.2.1
+ * @since 1.1.4
  */
 @ExperimentalApi
 @Value.Immutable
index 33b0920..a51b87a 100644 (file)
@@ -35,6 +35,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttp
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
@@ -222,7 +223,7 @@ class CbsClientImplIT {
         // then
         StepVerifier.create(result)
                 .expectErrorSatisfies(ex -> {
-                    assertThat(ex).isInstanceOf(IllegalArgumentException.class);
+                    assertThat(ex).isInstanceOf(StreamParsingException.class);
                     assertThat(ex).hasMessageContaining("Invalid stream type");
                     assertThat(ex).hasMessageContaining("message_router");
                     assertThat(ex).hasMessageContaining("kafka");