package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import reactor.core.publisher.Mono;
*/
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+import static java.lang.String.valueOf;
+
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
-import org.jetbrains.annotations.NotNull;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-
-import static java.lang.String.valueOf;
+import org.jetbrains.annotations.NotNull;
/**
--- /dev/null
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.Stream;
+import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.1.4
+ */
+@ExperimentalApi
+public final class DataStreams {
+
+ private DataStreams() {
+ }
+
+ public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) {
+ return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE);
+ }
+
+ public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) {
+ return createCollectionOfStreams(rootJson, DataStreamDirection.SINK);
+ }
+
+ private static Stream<RawDataStream<JsonObject>> createCollectionOfStreams(JsonObject rootJson, DataStreamDirection direction) {
+ final JsonElement streamsJson = rootJson.get(direction.configurationKey());
+ return streamsJson == null
+ ? Stream.empty()
+ : DataStreamUtils.mapJsonToStreams(streamsJson, direction);
+ }
+
+
+}
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
import com.google.gson.JsonObject;
-import io.vavr.control.Either;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
/**
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.*;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
+ * @since 1.1.4
*/
public final class StreamFromGsonParsers {
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
/**
* A generic data stream parser which parses {@code T} to data stream {@code S}.
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @param <T> input data type, eg. Gson Object
* @param <S> output data type
- * @since 1.1.3
+ * @since 1.1.4
*/
@ExperimentalApi
public interface StreamParser<T, S extends DataStream> {
* @param input - the input data
* @return Right(parsing result) or Left(parsing error)
*/
- default Either<StreamParserError, S> parse(T input) {
+ default Either<StreamParserError, S> parse(RawDataStream<T> input) {
return Try.of(() -> unsafeParse(input))
.toEither()
.mapLeft(StreamParserError::fromThrowable);
* @return parsing result
* @throws StreamParsingException when parsing was unsuccessful
*/
- default S unsafeParse(T input) {
+ default S unsafeParse(RawDataStream<T> input) {
return parse(input).getOrElseThrow(StreamParsingException::new);
}
}
import java.net.MalformedURLException;
import java.net.URL;
import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import reactor.core.publisher.Mono;
public class CbsClientImpl implements CbsClient {
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
-
import java.net.InetSocketAddress;
-
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import reactor.core.publisher.Mono;
/**
--- /dev/null
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.collection.Stream;
+import java.io.IOException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public final class DataStreamUtils {
+
+ public static Stream<RawDataStream<JsonObject>> mapJsonToStreams(JsonElement streamsJson,
+ DataStreamDirection direction) {
+ return Stream.ofAll(streamsJson.getAsJsonObject().entrySet())
+ .map(namedSinkJson -> {
+ final JsonObject jsonObject = namedSinkJson.getValue().getAsJsonObject();
+ return rawDataStream(namedSinkJson.getKey(), direction, jsonObject);
+ });
+ }
+
+ public static void assertStreamType(
+ RawDataStream<JsonObject> json,
+ String expectedType,
+ DataStreamDirection expectedDirection) {
+ if (!json.type().equals(expectedType)) {
+ throw new IllegalArgumentException(
+ "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'");
+ }
+ if (json.direction() != expectedDirection) {
+ throw new IllegalArgumentException(
+ "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction()
+ + "'");
+ }
+ }
+
+ public static RawDataStream<JsonObject> readSourceFromResource(String resource) throws IOException {
+ return rawDataStream(resource, DataStreamDirection.SOURCE, GsonUtils.readObjectFromResource(resource));
+ }
+
+ public static RawDataStream<JsonObject> readSinkFromResource(String resource) throws IOException {
+ return rawDataStream(resource, DataStreamDirection.SINK, GsonUtils.readObjectFromResource(resource));
+ }
+
+ private static RawDataStream<JsonObject> rawDataStream(String name, DataStreamDirection direction, JsonObject json) {
+ return ImmutableRawDataStream.<JsonObject>builder()
+ .name(name)
+ .direction(direction)
+ .type(GsonUtils.requiredString(json, "type"))
+ .descriptor(json)
+ .build();
+ }
+}
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.vavr.Lazy;
-
+import io.vavr.control.Option;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Map.Entry;
import java.util.stream.Collectors;
-
-import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.GsonAdaptersDataRouterSource;
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since March 2019
*/
-final class GsonUtils {
+public final class GsonUtils {
+
private static final Lazy<Gson> GSON = Lazy.of(() -> {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo());
private GsonUtils() {
}
- static Gson gsonInstance() {
+ public static Gson gsonInstance() {
return GSON.get();
}
- static void assertStreamType(JsonObject json, String expectedType) {
- final String actualType = requiredString(json, "type");
- if (!actualType.equals(expectedType)) {
- throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'");
- }
- }
-
- static String requiredString(JsonObject parent, String childName) {
+ public static String requiredString(JsonObject parent, String childName) {
return requiredChild(parent, childName).getAsString();
}
- static Option<String> optionalString(JsonObject parent, String childName) {
+ public static Option<String> optionalString(JsonObject parent, String childName) {
return Option.of(parent.get(childName).getAsString());
}
- static JsonElement requiredChild(JsonObject parent, String childName) {
- if (parent.has(childName)) {
- return parent.get(childName);
- } else {
- throw new IllegalArgumentException(
- "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent));
- }
+ public static JsonElement requiredChild(JsonObject parent, String childName) {
+ return optionalChild(parent, childName)
+ .getOrElseThrow(() -> new IllegalArgumentException(
+ "Could not find sub-node '" + childName + "'. Actual sub-nodes: "
+ + stringifyChildrenNames(parent)));
+
+ }
+
+ public static Option<JsonElement> optionalChild(JsonObject parent, String childName) {
+ return Option.of(parent.get(childName));
}
- static JsonObject readObjectFromResource(String resource) throws IOException {
+ public static JsonObject readObjectFromResource(String resource) throws IOException {
return readFromResource(resource).getAsJsonObject();
}
- static JsonElement readFromResource(String resource) throws IOException {
+ public static JsonElement readFromResource(String resource) throws IOException {
try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) {
return new JsonParser().parse(reader);
}
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
this.gson = gson;
}
- public DataRouterSink unsafeParse(JsonObject input) {
- assertStreamType(input, DATA_ROUTER_TYPE);
-
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ @Override
+ public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SINK);
- return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
+ return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name());
}
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DATA_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
this.gson = gson;
}
- public DataRouterSource unsafeParse(JsonObject input) {
- assertStreamType(input, DATA_ROUTER_TYPE);
-
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ @Override
+ public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, DATA_ROUTER_TYPE, DataStreamDirection.SOURCE);
- return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
+ return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name());
}
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
*/
abstract class GsonMessageRouter implements MessageRouter {
+ private final String name;
private final MessageRouterDmaapInfo dmaapInfo;
private final AafCredentials aafCredentials;
- GsonMessageRouter(@NotNull MessageRouterDmaapInfo dmaapInfo,
- @Nullable AafCredentials aafCredentials) {
+ GsonMessageRouter(String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
+ @Nullable AafCredentials aafCredentials) {
+ this.name = name;
this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo");
this.aafCredentials = aafCredentials;
}
+ public String name() {
+ return name;
+ }
+
@Override
public String topicUrl() {
return dmaapInfo.topicUrl();
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink {
GsonMessageRouterSink(
- @NotNull MessageRouterDmaapInfo dmaapInfo,
+ String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
@Nullable AafCredentials aafCredentials) {
- super(dmaapInfo, aafCredentials);
+ super(name, dmaapInfo, aafCredentials);
}
}
\ No newline at end of file
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource {
GsonMessageRouterSource(
- @NotNull MessageRouterDmaapInfo dmaapInfo,
+ String name, @NotNull MessageRouterDmaapInfo dmaapInfo,
@Nullable AafCredentials aafCredentials) {
- super(dmaapInfo, aafCredentials);
+ super(name, dmaapInfo, aafCredentials);
}
}
\ No newline at end of file
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-
public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> {
private final Gson gson;
this.gson = gson;
}
- public MessageRouterSink unsafeParse(JsonObject input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE);
+ @Override
+ public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SINK);
- final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
- return new GsonMessageRouterSink(dmaapInfo, aafCredentials);
+ return new GsonMessageRouterSink(input.name(), dmaapInfo, aafCredentials);
}
}
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
-
/**
* @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a>
*/
this.gson = gson;
}
- public MessageRouterSource unsafeParse(JsonObject input) {
- assertStreamType(input, MESSAGE_ROUTER_TYPE);
+ @Override
+ public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, MESSAGE_ROUTER_TYPE, DataStreamDirection.SOURCE);
- final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class);
+ final AafCredentials aafCredentials = gson.fromJson(input.descriptor(), ImmutableAafCredentials.class);
- final JsonElement dmaapInfoJson = requiredChild(input, DMAAP_INFO_CHILD_NAME);
+ final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME);
final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class);
- return new GsonMessageRouterSource(dmaapInfo, aafCredentials);
+ return new GsonMessageRouterSource(input.name(), dmaapInfo, aafCredentials);
}
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import java.util.Objects;
import org.jetbrains.annotations.NotNull;
*/
abstract class GsonKafka implements Kafka {
- protected final KafkaInfo kafkaInfo;
+ private final String name;
+ final KafkaInfo kafkaInfo;
private final AafCredentials aafCredentials;
- GsonKafka(@NotNull KafkaInfo kafkaInfo,
+ GsonKafka(
+ @NotNull String name,
+ @NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
+ this.name = Objects.requireNonNull(name, "name");
this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo");
this.aafCredentials = aafCredentials;
}
+ public String name() {
+ return name;
+ }
+
@Override
public String bootstrapServers() {
return kafkaInfo.bootstrapServers();
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
class GsonKafkaSink extends GsonKafka implements KafkaSink {
GsonKafkaSink(
+ @NotNull String name,
@NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
- super(kafkaInfo, aafCredentials);
+ super(name, kafkaInfo, aafCredentials);
}
+
}
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
class GsonKafkaSource extends GsonKafka implements KafkaSource {
GsonKafkaSource(
+ @NotNull String name,
@NotNull KafkaInfo kafkaInfo,
@Nullable AafCredentials aafCredentials) {
- super(kafkaInfo, aafCredentials);
+ super(name, kafkaInfo, aafCredentials);
}
@Override
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import com.google.gson.annotations.SerializedName;
import org.immutables.gson.Gson;
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
/**
}
@Override
- public KafkaSink unsafeParse(JsonObject input) {
- assertStreamType(input, KAFKA_TYPE);
+ public KafkaSink unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SINK);
+ final JsonObject json = input.descriptor();
- final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
- final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
+ final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull();
- return new GsonKafkaSink(kafkaInfo, null);
+ return new GsonKafkaSink(input.name(), kafkaInfo, aafCreds);
}
}
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo;
import com.google.gson.Gson;
-import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.*;
-import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_INFO_CHILD_NAME;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.KAFKA_TYPE;
/**
}
@Override
- public KafkaSource unsafeParse(JsonObject input) {
- assertStreamType(input, KAFKA_TYPE);
+ public KafkaSource unsafeParse(RawDataStream<JsonObject> input) {
+ assertStreamType(input, KAFKA_TYPE, DataStreamDirection.SOURCE);
+ final JsonObject json = input.descriptor();
- final JsonElement kafkaInfoJson = requiredChild(input, KAFKA_INFO_CHILD_NAME);
- final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json);
+ final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull();
- return new GsonKafkaSource(kafkaInfo, null);
+ return new GsonKafkaSource(input.name(), kafkaInfo, aafCreds);
}
}
--- /dev/null
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.optionalChild;
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+final class KafkaUtils {
+
+ private KafkaUtils() {
+ }
+
+ static KafkaInfo extractKafkaInfo(Gson gson, JsonObject input) {
+ final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info");
+ return gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class);
+ }
+
+ static Option<AafCredentials> extractAafCredentials(Gson gson, JsonObject input) {
+ return optionalChild(input, "aaf_credentials")
+ .map(aafCredsJson -> gson.fromJson(aafCredsJson, ImmutableAafCredentials.class));
+ }
+}
@Gson.TypeAdapters
public interface AafCredentials {
- @SerializedName("aaf_username")
+ @SerializedName(value = "username", alternate = "aaf_username")
@Nullable String username();
- @SerializedName("aaf_password")
+ @SerializedName(value = "password", alternate = "aaf_password")
@Nullable String password();
}
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+import org.immutables.value.Value;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
/**
*/
@ExperimentalApi
public interface DataStream {
-
+ @Value.Default
+ default String name() {
+ return "";
+ }
}
--- /dev/null
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+public enum DataStreamDirection {
+
+ SINK("streams_publishes"),
+ SOURCE("streams_subscribes");
+
+ private final String configurationKey;
+
+ DataStreamDirection(String configurationKey) {
+ this.configurationKey = configurationKey;
+ }
+
+ public String configurationKey() {
+ return configurationKey;
+ }
+}
--- /dev/null
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams;
+
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+@Value.Immutable
+public interface RawDataStream<T> {
+ String name();
+ String type();
+ DataStreamDirection direction();
+ T descriptor();
+}
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap;
+import static io.vavr.Predicates.not;
+
+import io.vavr.collection.List;
import org.immutables.value.Value;
import org.jetbrains.annotations.Nullable;
import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
default int maxPayloadSizeBytes() {
return 1024 * 1024;
}
+
+ @Value.Derived
+ default List<String> bootstrapServerList() {
+ return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty));
+ }
}
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener;
+import static org.assertj.core.api.Assertions.assertThat;
+
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
+import java.math.BigInteger;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
-import java.math.BigInteger;
-
-
-import static org.assertj.core.api.Assertions.assertThat;
-
class MerkleTreeParserTest {
private final MerkleTreeParser cut = new MerkleTreeParser();
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString;
import com.google.gson.JsonObject;
+import io.vavr.collection.Map;
import io.vavr.collection.Stream;
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
.verify(Duration.ofSeconds(5));
}
+ @Test
+ void testCbsClientWithStreamsParsing() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+ // when
+ final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json ->
+ DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
+ );
+
+ // then
+ StepVerifier.create(result)
+ .consumeNextWith(kafkaSink -> {
+ assertThat(kafkaSink.name()).isEqualTo("perf3gpp");
+ assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060");
+ assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP");
+ })
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void testCbsClientWithStreamsParsingUsingSwitch() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ // TODO: Use these parsers below
+ final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
+ final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
+
+ // when
+ final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json -> {
+ final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
+ .groupBy(RawDataStream::type);
+
+ final Stream<KafkaSink> allKafkaSinks = sinks.getOrElse("kafka", Stream.empty())
+ .map(kafkaSinkParser::unsafeParse);
+ final Stream<MessageRouterSink> allMrSinks = sinks.getOrElse("message_router", Stream.empty())
+ .map(mrSinkParser::unsafeParse);
+
+ assertThat(allKafkaSinks.size())
+ .describedAs("Number of kafka sinks")
+ .isEqualTo(2);
+ assertThat(allMrSinks.size())
+ .describedAs("Number of DMAAP-MR sinks")
+ .isEqualTo(1);
+
+ return true;
+ })
+ .then();
+
+ // then
+ StepVerifier.create(result)
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
+ @Test
+ void testCbsClientWithStreamsParsingWhenUsingInvalidParser() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
+ final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+
+ // when
+ final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ .map(json ->
+ DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
+ );
+
+ // then
+ StepVerifier.create(result)
+ .expectErrorSatisfies(ex -> {
+ assertThat(ex).isInstanceOf(IllegalArgumentException.class);
+ assertThat(ex).hasMessageContaining("Invalid stream type");
+ assertThat(ex).hasMessageContaining("message_router");
+ assertThat(ex).hasMessageContaining("kafka");
+ })
+ .verify(Duration.ofSeconds(5));
+ }
+
private String sampleConfigValue(JsonObject obj) {
return obj.get(SAMPLE_CONFIG_KEY).getAsString();
}
import com.google.gson.JsonObject;
import java.net.InetSocketAddress;
import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Mono;
/**
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import io.vavr.CheckedFunction0;
-import io.vavr.Function0;
-import java.io.IOException;
-import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
+++ /dev/null
-/*
- * ============LICENSE_START====================================
- * DCAEGEN2-SERVICES-SDK
- * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
- * =========================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=====================================
- */
-
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import com.google.gson.JsonObject;
-import java.io.IOException;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
-
-/**
- * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
- * @since March 2019
- */
-class KafkaSourceParserTest {
-
- private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
-
- @Test
- void precondition_assureInstanceOf() {
- assertThat(cut).isInstanceOf(KafkaSourceParser.class);
- }
-
- @Test
- void shouldParseMinimalKafkaSourceDefinition() throws IOException {
- // given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json");
-
- // when
- final KafkaSource result = cut.unsafeParse(input);
-
- // then
- assertThat(result.aafCredentials()).isNull();
- assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
- assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
- assertThat(result.clientId()).isNull();
- assertThat(result.clientRole()).isNull();
- }
-}
\ No newline at end of file
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableDataRouterSink;
class DataRouterSinkParserTest {
+
private static final String SAMPLE_LOCATION = "mtc00";
private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz";
private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs";
private static final String SAMPLE_PASSWORD = "some-password";
private static final String SAMPLE_PUBLISHER_ID = "123456";
- private static final Gson gson = new Gson();
-
private final StreamFromGsonParser<DataRouterSink> streamParser = StreamFromGsonParsers.dataRouterSinkParser();
- private static final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder()
- .location(SAMPLE_LOCATION)
- .publishUrl(SAMPLE_PUBLISH_URL)
- .logUrl(SAMPLE_LOG_URL)
- .username(SAMPLE_USER)
- .password(SAMPLE_PASSWORD)
- .publisherId(SAMPLE_PUBLISHER_ID)
- .build();
-
- private static final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder()
- .publishUrl(SAMPLE_PUBLISH_URL)
- .build();
-
@Test
void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json");
+
// when
DataRouterSink result = streamParser.unsafeParse(input);
+
// then
+ final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder()
+ .name(input.name())
+ .location(SAMPLE_LOCATION)
+ .publishUrl(SAMPLE_PUBLISH_URL)
+ .logUrl(SAMPLE_LOG_URL)
+ .username(SAMPLE_USER)
+ .password(SAMPLE_PASSWORD)
+ .publisherId(SAMPLE_PUBLISHER_ID)
+ .build();
assertThat(result).isEqualTo(fullConfigurationStream);
}
@Test
void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
//given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils
+ .readSinkFromResource("/streams/data_router_sink_minimal.json");
+
// when
DataRouterSink result = streamParser.unsafeParse(input);
+
// then
+ final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder()
+ .name(input.name())
+ .publishUrl(SAMPLE_PUBLISH_URL)
+ .build();
assertThat(result).isEqualTo(minimalConfigurationStream);
}
@Test
void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json");
+
// when
Either<StreamParserError, DataRouterSink> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
@Test
void emptyConfiguration_shouldParseToStreamParserError() {
// given
- JsonObject input = gson.fromJson("{}", JsonObject.class);
+ JsonObject json = new JsonObject();
+ final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+ .name("empty")
+ .type("data_router")
+ .descriptor(json)
+ .direction(DataStreamDirection.SINK)
+ .build();
+
// when
Either<StreamParserError, DataRouterSink> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
}
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*;
import java.io.IOException;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.MESSAGE_ROUTER_TYPE;
public class DataRouterSourceParserTest {
+
private static final String SAMPLE_LOCATION = "mtc00";
private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path";
private static final String SAMPLE_USER = "some-user";
private static final String SAMPLE_PASSWORD = "some-password";
private static final String SAMPLE_SUBSCRIBER_ID = "789012";
- private static final Gson gson = new Gson();
-
- private static final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder()
- .location(SAMPLE_LOCATION)
- .deliveryUrl(SAMPLE_DELIVERY_URL)
- .username(SAMPLE_USER)
- .password(SAMPLE_PASSWORD)
- .subscriberId(SAMPLE_SUBSCRIBER_ID)
- .build();
-
- private static final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder()
- .build();
-
-
private final StreamFromGsonParser<DataRouterSource> streamParser = StreamFromGsonParsers.dataRouterSourceParser();
@Test
void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_source_full.json");
+
// when
DataRouterSource result = streamParser.unsafeParse(input);
+
// then
+
+ final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder()
+ .name(input.name())
+ .location(SAMPLE_LOCATION)
+ .deliveryUrl(SAMPLE_DELIVERY_URL)
+ .username(SAMPLE_USER)
+ .password(SAMPLE_PASSWORD)
+ .subscriberId(SAMPLE_SUBSCRIBER_ID)
+ .build();
assertThat(result).isEqualTo(fullConfigurationStream);
}
@Test
void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_source_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils
+ .readSourceFromResource("/streams/data_router_source_minimal.json");
+
// when
DataRouterSource result = streamParser.unsafeParse(input);
+
// then
+ final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder()
+ .name(input.name())
+ .build();
assertThat(result).isEqualTo(minimalConfigurationStream);
}
@Test
void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json");
+
// when
Either<StreamParserError, DataRouterSource> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
@Test
void emptyConfiguration_shouldBeParsedToStreamParserError() {
// given
- JsonObject input = gson.fromJson("{}", JsonObject.class);
+ JsonObject json = new JsonObject();
+ final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+ .name("empty")
+ .type("data_router")
+ .descriptor(json)
+ .direction(DataStreamDirection.SOURCE)
+ .build();
+
// when
Either<StreamParserError, DataRouterSource> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
}
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.DataRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
private static final String SAMPLE_CLIENT_ID = "1500462518108";
private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic";
- private static final Gson gson = new Gson();
-
private final StreamFromGsonParser<MessageRouterSink> streamParser = StreamFromGsonParsers.messageRouterSinkParser();
@Test
void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json");
+
// when
MessageRouterSink result = streamParser.unsafeParse(input);
+
// then
assertThat(result).isInstanceOf(MessageRouterSink.class);
assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME);
@Test
void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_minimal.json");
// when
MessageRouterSink result = streamParser.unsafeParse(input);
+
// then
assertThat(result).isInstanceOf(MessageRouterSink.class);
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
@Test
void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json");
+
// when
Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
@Test
void emptyConfiguration_shouldParseToStreamParserError() {
// given
- JsonObject input = gson.fromJson("{}", JsonObject.class);
+ JsonObject json = new JsonObject();
+ final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+ .name("empty")
+ .type("data_router")
+ .descriptor(json)
+ .direction(DataStreamDirection.SINK)
+ .build();
+
// when
Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
}
* limitations under the License.
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.MessageRouterSource;
private static final String SAMPLE_CLIENT_ID = "1500462518108";
private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic";
- private static final Gson gson = new Gson();
-
private final StreamFromGsonParser<MessageRouterSource> streamParser = StreamFromGsonParsers.messageRouterSourceParser();
@Test
void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json");
+
// when
MessageRouterSource result = streamParser.unsafeParse(input);
+
// then
assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME);
assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD);
@Test
void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/message_router_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_minimal.json");
// when
MessageRouterSource result = streamParser.unsafeParse(input);
+
// then
assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL);
assertThat(result.aafCredentials().username()).isNull();
@Test
void incorrectConfiguration_shouldParseToStreamParserError() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/data_router_sink_full.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_sink_full.json");
+
// when
Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input);
+
// then
assertThat(result.getLeft()).isInstanceOf(StreamParserError.class);
result.peekLeft(error -> {
@Test
void emptyConfiguration_shouldParseToStreamParserError() {
// given
- JsonObject input = gson.fromJson("{}", JsonObject.class);
+ JsonObject json = new JsonObject();
+ final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder()
+ .name("empty")
+ .type("data_router")
+ .descriptor(json)
+ .direction(DataStreamDirection.SOURCE)
+ .build();
// when
Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input);
// then
* ============LICENSE_END=====================================
*/
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson;
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.in;
-import static org.junit.jupiter.api.Assertions.*;
import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.vavr.Function1;
import io.vavr.control.Either;
import java.io.IOException;
-import java.io.InputStreamReader;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink;
/**
@Test
void shouldParseMinimalKafkaSinkDefinition() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_minimal.json");
// when
final KafkaSink result = cut.unsafeParse(input);
}
@Test
- void shouldParseBasicKafkaSinkDefinition() throws IOException {
+ void shouldParseFullKafkaSinkDefinition() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink.json");
// when
final KafkaSink result = cut.unsafeParse(input);
// then
- assertThat(result.aafCredentials()).isNull();
+ final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder()
+ .username("the user")
+ .password("the passwd")
+ .build();
+ assertThat(result.aafCredentials()).isEqualTo(expectedCredentials);
assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
assertThat(result.clientId()).isEqualTo("1500462518108");
@Test
void shouldReturnErrorWhenStructureIsWrong() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_missing_child.json");
// when
final Either<StreamParserError, KafkaSink> result = cut.parse(input);
@Test
void shouldReturnErrorWhenTypeIsWrong() throws IOException {
// given
- JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json");
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_invalid_type.json");
// when
final Either<StreamParserError, KafkaSink> result = cut.parse(input);
assertThat(error.message()).containsIgnoringCase("message_router");
});
}
+
+ @Test
+ void shouldReturnErrorWhenDirectionIsWrong() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_sink.json");
+
+ // when
+ final Either<StreamParserError, KafkaSink> result = cut.parse(input);
+
+ // then
+ assertThat(result.isRight()).describedAs("should not be right").isFalse();
+ result.peekLeft(error -> {
+ assertThat(error.message()).containsIgnoringCase("invalid stream direction");
+ });
+ }
}
\ No newline at end of file
--- /dev/null
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import io.vavr.control.Either;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since March 2019
+ */
+class KafkaSourceParserTest {
+
+ private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser();
+
+ @Test
+ void precondition_assureInstanceOf() {
+ assertThat(cut).isInstanceOf(KafkaSourceParser.class);
+ }
+
+ @Test
+ void shouldParseMinimalKafkaSourceDefinition() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source_minimal.json");
+
+ // when
+ final KafkaSource result = cut.unsafeParse(input);
+
+ // then
+ assertThat(result.aafCredentials()).isNull();
+ assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060");
+ assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+ assertThat(result.clientId()).isNull();
+ assertThat(result.clientRole()).isNull();
+ }
+
+ @Test
+ void shouldParseFullKafkaSourceDefinition() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source.json");
+
+ // when
+ final KafkaSource result = cut.unsafeParse(input);
+
+ // then
+ final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder()
+ .username("the user")
+ .password("the passwd")
+ .build();
+ assertThat(result.aafCredentials()).isEqualTo(expectedCredentials);
+ assertThat(result.bootstrapServerList()).isEqualTo(List.of("dmaap-mr-kafka-0:6060", "dmaap-mr-kafka-1:6060"));
+ assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP");
+ assertThat(result.consumerGroupId()).isEqualTo("nokia-perf3gpp-processor");
+ assertThat(result.clientId()).isEqualTo("1500462518108");
+ assertThat(result.clientRole()).isEqualTo("com.dcae.member");
+ }
+
+ @Test
+ void shouldReturnErrorWhenTypeIsWrong() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_invalid_type.json");
+
+ // when
+ final Either<StreamParserError, KafkaSource> result = cut.parse(input);
+
+ // then
+ assertThat(result.isRight()).describedAs("should not be right").isFalse();
+ result.peekLeft(error -> {
+ assertThat(error.message()).containsIgnoringCase("invalid stream type");
+ assertThat(error.message()).containsIgnoringCase("kafka");
+ assertThat(error.message()).containsIgnoringCase("message_router");
+ });
+ }
+
+ @Test
+ void shouldReturnErrorWhenDirectionIsWrong() throws IOException {
+ // given
+ RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_source.json");
+
+ // when
+ final Either<StreamParserError, KafkaSource> result = cut.parse(input);
+
+ // then
+ assertThat(result.isRight()).describedAs("should not be right").isFalse();
+ result.peekLeft(error -> {
+ assertThat(error.message()).containsIgnoringCase("invalid stream direction");
+ });
+ }
+}
\ No newline at end of file
{
- "keystore.path": "/var/run/security/keystore.p12"
+ "keystore.path": "/var/run/security/keystore.p12",
+ "streams_publishes": {
+ "perf3gpp": {
+ "type": "kafka",
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka:6060",
+ "topic_name": "HVVES_PERF3GPP"
+ }
+ },
+ "pnf_ready": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://message-router:3904/events/VES_PNF_READY"
+ }
+ },
+ "call_trace": {
+ "type": "kafka",
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka:6060",
+ "topic_name": "HVVES_TRACE"
+ }
+ }
+ },
+ "streams_subscribes": {
+ "measurements": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://message-router:3904/events/VES_MEASUREMENT"
+ }
+ }
+ }
}
{
"type": "kafka",
+ "aaf_credentials": {
+ "username": "the user",
+ "password": "the passwd"
+ },
"kafka_info": {
"client_role": "com.dcae.member",
"client_id": "1500462518108",
--- /dev/null
+{
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "the user",
+ "password": "the passwd"
+ },
+ "kafka_info": {
+ "client_role": "com.dcae.member",
+ "client_id": "1500462518108",
+ "bootstrap_servers": "dmaap-mr-kafka-0:6060,,dmaap-mr-kafka-1:6060,",
+ "topic_name": "HVVES_PERF3GPP",
+ "consumer_group_id": "nokia-perf3gpp-processor"
+ }
+}