2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
  23 import com.google.gson.JsonElement;
 
  24 import com.google.gson.JsonObject;
 
  25 import io.vavr.collection.List;
 
  26 import org.junit.jupiter.api.BeforeAll;
 
  27 import org.junit.jupiter.api.BeforeEach;
 
  28 import org.junit.jupiter.api.Test;
 
  29 import org.mockserver.client.MockServerClient;
 
  30 import org.mockserver.matchers.TimeToLive;
 
  31 import org.mockserver.matchers.Times;
 
  32 import org.mockserver.verify.VerificationTimes;
 
  33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 
  34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 
  35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 
  36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 
  38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
 
  39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
 
  40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
  42 import org.testcontainers.containers.DockerComposeContainer;
 
  43 import org.testcontainers.junit.jupiter.Container;
 
  44 import org.testcontainers.junit.jupiter.Testcontainers;
 
  45 import reactor.core.publisher.Flux;
 
  46 import reactor.core.publisher.Mono;
 
  47 import reactor.test.StepVerifier;
 
  49 import java.time.Duration;
 
  50 import java.util.concurrent.TimeUnit;
 
  52 import static org.mockserver.model.HttpRequest.request;
 
  53 import static org.mockserver.model.HttpResponse.response;
 
  54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
 
  55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
 
  56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
 
  57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
 
  58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
 
  59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
 
  60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
 
  61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
 
  62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
 
  63 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
 
  64 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
 
  65 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
 
  66 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
 
  69 class MessageRouterPublisherIT {
 
  71     private static final DockerComposeContainer CONTAINER = createContainerInstance();
 
  72     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
 
  73             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
 
  74     private static String EVENTS_PATH;
 
  75     private static String PROXY_MOCK_EVENTS_PATH;
 
  77     private static final Duration TIMEOUT = Duration.ofSeconds(10);
 
  78     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
 
  80             + "\"mrstatus\":5007,"
 
  81             + "\"helpURL\":\"http://onap.readthedocs.io\","
 
  82             + "\"message\":\"Error while publishing data to topic.:%s."
 
  83             + "Successfully published number of messages :0."
 
  84             + "Expected { to start an object.\",\"status\":400"
 
  86     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
 
  90             + "\"serviceException\":"
 
  92             + "\"messageId\":\"SVC0001\","
 
  93             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
 
  94             + "\"variables\":[\"408\"]"
 
  99     private final MessageRouterPublisher publisher = DmaapClientFactory
 
 100             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
 
 101     private final MessageRouterSubscriber subscriber = DmaapClientFactory
 
 102             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
 105     static void setUp() {
 
 106         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
 
 107         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
 
 112         MOCK_SERVER_CLIENT.reset();
 
 116     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
 
 118         final String topic = "TOPIC";
 
 119         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 120                 "{\"differentMessage\":\"message2\"}");
 
 121         final Flux<JsonObject> messageBatch = jsonBatch(twoJsonMessages);
 
 122         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
 
 123         final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
 
 126         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
 
 129         StepVerifier.create(result)
 
 130                 .expectNext(expectedResponse)
 
 136     void publisher_shouldHandleBadRequestError() {
 
 138         final String topic = "TOPIC2";
 
 139         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
 
 140         final Flux<JsonElement> messageBatch = plainBatch(threePlainTextMessages);
 
 141         final MessageRouterPublishRequest mrRequest = createPublishRequest(String.format("%s/%s", EVENTS_PATH, topic));
 
 142         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
 
 143                 DMAAP_400_ERROR_RESPONSE_FORMAT, topic);
 
 146         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
 
 149         StepVerifier.create(result)
 
 150                 .expectNext(expectedResponse)
 
 156     void publisher_shouldSuccessfullyPublishSingleMessage() {
 
 158         final String topic = "TOPIC3";
 
 159         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 160         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 161         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 162         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
 
 163         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 164         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
 
 165         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 168         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 169         Mono<MessageRouterSubscribeResponse> response = publisher
 
 170                 .put(publishRequest, jsonMessageBatch)
 
 171                 .then(subscriber.get(subscribeRequest));
 
 174         StepVerifier.create(response)
 
 175                 .expectNext(expectedResponse)
 
 181     void publisher_shouldSuccessfullyPublishMultipleMessages() {
 
 182         final String topic = "TOPIC5";
 
 183         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 184         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
 
 185                 "{\"differentMessage\":\"message2\"}");
 
 186         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 187         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
 
 188         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 189         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
 
 190         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 193         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 194         Mono<MessageRouterSubscribeResponse> response = publisher
 
 195                 .put(publishRequest, jsonMessageBatch)
 
 196                 .then(subscriber.get(subscribeRequest));
 
 199         StepVerifier.create(response)
 
 200                 .expectNext(expectedResponse)
 
 206     void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
 
 208         final String topic = "TOPIC6";
 
 209         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 211         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 212         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 213         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
 
 215         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
 
 216         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
 
 217         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 220         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 221         Mono<MessageRouterSubscribeResponse> response = publisher
 
 222                 .put(publishRequest, plainBatch)
 
 223                 .then(subscriber.get(subscribeRequest));
 
 226         StepVerifier.create(response)
 
 227                 .expectNext(expectedResponse)
 
 233     void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
 
 235         final String topic = "TOPIC7";
 
 236         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 238         final List<String> twoJsonMessage = List.of("{\"message\":\"message1\"}", "{\"message2\":\"message2\"}");
 
 239         final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessage);
 
 240         final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessage);
 
 242         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
 
 243         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
 
 244         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 247         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 248         Mono<MessageRouterSubscribeResponse> response = publisher
 
 249                 .put(publishRequest, plainBatch)
 
 250                 .then(subscriber.get(subscribeRequest));
 
 253         StepVerifier.create(response)
 
 254                 .expectNext(expectedResponse)
 
 260     void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
 
 262         final String topic = "TOPIC8";
 
 263         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 265         final List<String> singlePlainMessage = List.of("kebab");
 
 266         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
 
 267         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
 
 269         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
 
 270         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
 
 271         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 274         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 275         Mono<MessageRouterSubscribeResponse> response = publisher
 
 276                 .put(publishRequest, plainBatch)
 
 277                 .then(subscriber.get(subscribeRequest));
 
 280         StepVerifier.create(response)
 
 281                 .expectNext(expectedResponse)
 
 287     void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
 
 289         final String topic = "TOPIC9";
 
 290         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 292         final List<String> singlePlainMessage = List.of("I", "like", "pizza");
 
 293         final List<JsonElement> expectedItems = getAsJsonElements(singlePlainMessage);
 
 294         final Flux<JsonElement> plainBatch = plainBatch(singlePlainMessage);
 
 296         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, ContentType.TEXT_PLAIN);
 
 297         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
 
 298         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 301         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 302         Mono<MessageRouterSubscribeResponse> response = publisher
 
 303                 .put(publishRequest, plainBatch)
 
 304                 .then(subscriber.get(subscribeRequest));
 
 307         StepVerifier.create(response)
 
 308                 .expectNext(expectedResponse)
 
 314     void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
 
 316         final String topic = "TOPIC10";
 
 317         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 318         final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
 
 319         final MessageRouterPublishRequest mrRequest = createPublishRequest(
 
 320                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
 
 321         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
 
 322         final String path = String.format("/events/%s", topic);
 
 324                 .when(request().withPath(path), Times.once())
 
 325                 .respond(response().withDelay(TimeUnit.SECONDS, 2));
 
 328         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
 
 331         StepVerifier.create(result)
 
 332                 .expectNext(expectedResponse)
 
 336         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
 
 340     void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
 
 341         final String topic = "TOPIC11";
 
 342         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
 
 344         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 345         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 346         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
 
 348         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 349         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
 
 351         final String path = String.format("/events/%s", topic);
 
 353                 .when(request().withPath(path), Times.once())
 
 354                 .respond(response().withStatusCode(404));
 
 355         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
 
 358         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
 
 361         StepVerifier.create(result)
 
 362                 .expectNext(expectedResponse)
 
 366         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
 
 370     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
 
 371         final String topic = "TOPIC12";
 
 372         final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
 
 374         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 375         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 376         final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
 
 378         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
 
 379         final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
 
 381         final String path = String.format("/events/%s", topic);
 
 383                 .when(request().withPath(path), Times.once())
 
 384                 .respond(response().withDelay(TimeUnit.SECONDS, 10));
 
 385         final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
 
 388         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
 
 391         StepVerifier.create(result)
 
 392                 .expectNext(expectedResponse)
 
 396         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
 
 399     private MessageRouterPublisherConfig retryConfig() {
 
 400         return ImmutableMessageRouterPublisherConfig.builder()
 
 401                 .retryConfig(ImmutableDmaapRetryConfig.builder()
 
 402                         .retryIntervalInSeconds(1)