2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client;
 
  23 import com.google.gson.Gson;
 
  24 import com.google.gson.JsonElement;
 
  25 import com.google.gson.JsonObject;
 
  26 import com.google.gson.JsonParser;
 
  27 import com.google.gson.JsonPrimitive;
 
  28 import io.vavr.collection.List;
 
  29 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 
  30 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
 
  31 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
 
  32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
 
  33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
 
  34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
 
  35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
 
  36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 
  38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 
  39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 
  40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
 
  42 import reactor.core.publisher.Flux;
 
  44 import java.time.Duration;
 
  47 public final class MessageRouterTestsUtils {
 
  48     private MessageRouterTestsUtils() {
 
  51     public static MessageRouterPublishRequest createPublishRequest(String topicUrl) {
 
  52         return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON);
 
  55     public static MessageRouterPublishRequest createPublishRequest(String topicUrl, Duration timeout) {
 
  56         return ImmutableMessageRouterPublishRequest.builder()
 
  57                 .sinkDefinition(createMessageRouterSink(topicUrl))
 
  58                 .contentType(ContentType.APPLICATION_JSON)
 
  59                 .timeoutConfig(ImmutableDmaapTimeoutConfig.builder()
 
  65     public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType) {
 
  66         return ImmutableMessageRouterPublishRequest.builder()
 
  67                 .sinkDefinition(createMessageRouterSink(topicUrl))
 
  68                 .contentType(contentType)
 
  72     public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
 
  73                                                                          String consumerGroup, String consumerId) {
 
  75         return ImmutableMessageRouterSubscribeRequest
 
  77                 .sourceDefinition(getImmutableMessageRouterSource(topicUrl))
 
  78                 .consumerGroup(consumerGroup)
 
  79                 .consumerId(consumerId)
 
  83     public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
 
  84                                                                          String consumerGroup, String consumerId,
 
  87         return ImmutableMessageRouterSubscribeRequest
 
  89                 .timeoutConfig(ImmutableDmaapTimeoutConfig.builder()
 
  92                 .sourceDefinition(getImmutableMessageRouterSource(topicUrl))
 
  93                 .consumerGroup(consumerGroup)
 
  94                 .consumerId(consumerId)
 
  98     private static ImmutableMessageRouterSource getImmutableMessageRouterSource(String topicUrl) {
 
  99         return ImmutableMessageRouterSource.builder()
 
 105     public static List<JsonElement> getAsJsonElements(List<String> messages) {
 
 106         return messages.map(JsonParser::parseString);
 
 109     public static List<JsonObject> getAsJsonObjects(List<String> messages) {
 
 110         return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
 
 113     public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages) {
 
 114         return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive);
 
 117     public static JsonObject getAsJsonObject(String item) {
 
 118         return new Gson().fromJson(item, JsonObject.class);
 
 121     public static Flux<JsonElement> plainBatch(List<String> messages) {
 
 122         return Flux.fromIterable(getAsJsonElements(messages));
 
 125     public static Flux<JsonObject> jsonBatch(List<String> messages) {
 
 126         return Flux.fromIterable(getAsJsonObjects(messages));
 
 129     public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs) {
 
 130         String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
 
 131         return ImmutableMessageRouterSubscribeResponse
 
 133                 .failReason(failReason)
 
 137     public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items) {
 
 138         return ImmutableMessageRouterSubscribeResponse
 
 144     public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs) {
 
 145         String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
 
 146         return ImmutableMessageRouterPublishResponse
 
 148                 .failReason(failReason)
 
 152     public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
 
 153         return ImmutableMessageRouterPublishResponse
 
 159     public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
 
 160                                      MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
 
 161         final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
 
 162                 "{\"differentMessage\":\"message2\"}");
 
 163         final Flux<JsonObject> jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages);
 
 165         publisher.put(publishRequest, jsonMessageBatch).blockLast();
 
 166         subscriber.get(subscribeRequest).block();
 
 169     private static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
 
 170         return ImmutableMessageRouterSink.builder()