2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
  23 import com.google.gson.JsonElement;
 
  24 import com.google.gson.JsonObject;
 
  25 import io.vavr.collection.List;
 
  26 import org.junit.jupiter.api.BeforeAll;
 
  27 import org.junit.jupiter.api.BeforeEach;
 
  28 import org.junit.jupiter.api.Test;
 
  29 import org.mockserver.client.MockServerClient;
 
  30 import org.mockserver.matchers.Times;
 
  31 import org.mockserver.verify.VerificationTimes;
 
  32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 
  33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 
  34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 
  35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
 
  36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 
  38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
  39 import org.testcontainers.containers.DockerComposeContainer;
 
  40 import org.testcontainers.junit.jupiter.Container;
 
  41 import org.testcontainers.junit.jupiter.Testcontainers;
 
  42 import reactor.core.publisher.Flux;
 
  43 import reactor.core.publisher.Mono;
 
  44 import reactor.test.StepVerifier;
 
  46 import java.time.Duration;
 
  47 import java.util.concurrent.TimeUnit;
 
  49 import static org.mockserver.model.HttpRequest.request;
 
  50 import static org.mockserver.model.HttpResponse.response;
 
  51 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
 
  52 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
 
  53 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
 
  54 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
 
  55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObject;
 
  56 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
 
  57 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
 
  58 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
 
  59 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
 
  60 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
 
  61 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
 
  62 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
 
  65 class MessageRouterSubscriberIT {
 
  67     private static final DockerComposeContainer CONTAINER = createContainerInstance();
 
  68     private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
 
  69             LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
 
  70     private static String EVENTS_PATH;
 
  71     private static String PROXY_MOCK_EVENTS_PATH;
 
  73     private static final Duration TIMEOUT = Duration.ofSeconds(10);
 
  74     private static final String CONSUMER_GROUP = "group1";
 
  75     private static final String CONSUMER_ID = "consumer200";
 
  76     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
 
  78             "\"mrstatus\":3001," +
 
  79             "\"helpURL\":\"http://onap.readthedocs.io\"," +
 
  80             "\"message\":\"No such topic exists.-[%s]\"," +
 
  83     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
 
  87             + "\"serviceException\":"
 
  89             + "\"messageId\":\"SVC0001\","
 
  90             + "\"text\":\"Client timeout exception occurred, Error code is %1\","
 
  91             + "\"variables\":[\"408\"]"
 
  96     private MessageRouterPublisher publisher = DmaapClientFactory
 
  97             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
 
  98     private MessageRouterSubscriber subscriber = DmaapClientFactory
 
  99             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
 102     static void setUp() {
 
 103         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
 
 104         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
 
 109         MOCK_SERVER_CLIENT.reset();
 
 113     void subscriber_shouldHandleNoSuchTopicException() {
 
 115         final String topic = "newTopic";
 
 116         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
 
 117                 String.format("%s/%s", EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID);
 
 118         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
 
 119                 DMAAP_404_ERROR_RESPONSE_FORMAT, topic);
 
 122         Mono<MessageRouterSubscribeResponse> response = subscriber
 
 123                 .get(mrSubscribeRequest);
 
 126         StepVerifier.create(response)
 
 127                 .expectNext(expectedResponse)
 
 133     void subscriberShouldHandleSingleItemResponse() {
 
 135         final String topic = "TOPIC";
 
 136         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 137         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 138         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
 
 140         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 141         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 142         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
 
 143         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 146         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 147         Mono<MessageRouterSubscribeResponse> response = publisher
 
 148                 .put(publishRequest, jsonMessageBatch)
 
 149                 .then(subscriber.get(subscribeRequest));
 
 152         StepVerifier.create(response)
 
 153                 .expectNext(expectedResponse)
 
 159     void subscriber_shouldHandleMultipleItemsResponse() {
 
 161         final String topic = "TOPIC2";
 
 162         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 163         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 164         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID);
 
 166         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 167                 "{\"differentMessage\":\"message2\"}");
 
 168         final List<JsonElement> expectedElements = getAsJsonElements(twoJsonMessages);
 
 169         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 170         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
 
 173         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 174         Mono<MessageRouterSubscribeResponse> response = publisher
 
 175                 .put(publishRequest, jsonMessageBatch)
 
 176                 .then(subscriber.get(subscribeRequest));
 
 179         StepVerifier.create(response)
 
 180                 .expectNext(expectedResponse)
 
 186     void subscriber_shouldExtractItemsFromResponse() {
 
 188         final String topic = "TOPIC3";
 
 189         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 190         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 191         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
 
 192                 CONSUMER_GROUP, CONSUMER_ID);
 
 194         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 195                 "{\"differentMessage\":\"message2\"}");
 
 196         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 199         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 200         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
 
 201                 .thenMany(subscriber.getElements(subscribeRequest));
 
 204         StepVerifier.create(result)
 
 205                 .expectNext(getAsJsonObject(twoJsonMessages.get(0)))
 
 206                 .expectNext(getAsJsonObject(twoJsonMessages.get(1)))
 
 212     void subscriber_shouldSubscribeToTopic() {
 
 214         final String topic = "TOPIC4";
 
 215         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 216         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
 
 217         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl,
 
 218                 CONSUMER_GROUP, CONSUMER_ID);
 
 220         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
 
 221                 "{\"differentMessage\":\"message2\"}");
 
 222         final List<JsonElement> messages = getAsJsonElements(twoJsonMessages);
 
 223         final Flux<JsonObject> jsonMessageBatch = jsonBatch(twoJsonMessages);
 
 226         registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
 
 227         final Flux<JsonElement> result = publisher.put(publishRequest, jsonMessageBatch)
 
 228                 .thenMany(subscriber.subscribeForElements(subscribeRequest, Duration.ofSeconds(1)));
 
 231         StepVerifier.create(result.take(2))
 
 232                 .expectNext(messages.get(0))
 
 233                 .expectNext(messages.get(1))
 
 239     void subscriber_shouldHandleTimeoutException() {
 
 241         final String topic = "TOPIC5";
 
 242         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
 
 243                 String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
 
 244         final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE);
 
 245         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
 
 247                 .when(request().withPath(path), Times.once())
 
 248                 .respond(response().withDelay(TimeUnit.SECONDS, 5));
 
 251         Mono<MessageRouterSubscribeResponse> response = subscriber
 
 252                 .get(mrSubscribeRequest);
 
 255         StepVerifier.create(response)
 
 256                 .expectNext(expectedResponse)
 
 260         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
 
 264     void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
 
 266         final String topic = "TOPIC6";
 
 267         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
 
 268         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 269         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
 
 270         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
 
 272         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 273         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 274         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
 
 275         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 277         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
 
 279                 .when(request().withPath(path), Times.once())
 
 280                 .respond(response().withStatusCode(404));
 
 281         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
 
 284         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
 
 285                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
 
 286         Mono<MessageRouterSubscribeResponse> response = publisher
 
 287                 .put(publishRequest, jsonMessageBatch)
 
 288                 .then(subscriber.get(subscribeRequest));
 
 291         StepVerifier.create(response)
 
 292                 .expectNext(expectedResponse)
 
 296         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
 
 300     void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
 
 302         final String topic = "TOPIC7";
 
 303         final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
 
 304         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
 
 305         final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
 
 306         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
 
 307                 proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
 
 309         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
 
 310         final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
 
 311         final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
 
 312         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
 314         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
 
 316                 .when(request().withPath(path), Times.once())
 
 317                 .respond(response().withDelay(TimeUnit.SECONDS, 10));
 
 318         final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
 
 321         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
 
 322                 createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
 
 323         Mono<MessageRouterSubscribeResponse> response = publisher
 
 324                 .put(publishRequest, jsonMessageBatch)
 
 325                 .then(subscriber.get(subscribeRequest));
 
 328         StepVerifier.create(response)
 
 329                 .expectNext(expectedResponse)
 
 333         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
 
 336     private MessageRouterSubscriberConfig retryConfig() {
 
 337         return ImmutableMessageRouterSubscriberConfig.builder()
 
 338                 .retryConfig(ImmutableDmaapRetryConfig.builder()
 
 339                         .retryIntervalInSeconds(1)