2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
  23 import com.google.gson.JsonElement;
 
  24 import com.google.gson.JsonPrimitive;
 
  25 import io.vavr.collection.List;
 
  26 import org.junit.jupiter.api.Test;
 
  27 import org.junit.jupiter.params.ParameterizedTest;
 
  28 import org.junit.jupiter.params.provider.CsvSource;
 
  29 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 
  30 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 
  31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 
  32 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 
  33 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
 
  34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
 
  35 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
 
  36 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 
  38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
 
  39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 
  40 import reactor.core.publisher.Flux;
 
  41 import reactor.core.publisher.Mono;
 
  42 import reactor.test.StepVerifier;
 
  44 import java.time.Duration;
 
  46 import static org.assertj.core.api.Assertions.assertThat;
 
  47 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
 
  48 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
 
  49 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
 
  52  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
 
  55 class MessageRouterPublisherTest {
 
  57     private static final String ERROR_MESSAGE = "Something went wrong";
 
  58     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
 
  59     private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable";
 
  60     private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
 
  61     private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY";
 
  62     private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
 
  63     private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401";
 
  64     private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
 
  65     private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
 
  66     private static final String FAILING_WITH_500_RESP_PATH = "/events/TOPIC500";
 
  67     private static final String FAILING_WITH_429_RESP_PATH = "/events/TOPIC429";
 
  68     private static final Duration TIMEOUT = Duration.ofSeconds(10);
 
  69     private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
 
  70             .map(JsonPrimitive::new);
 
  71     private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
 
  72     private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
 
  73     private static final DummyHttpServer SERVER = initialize();
 
  74     private MessageRouterPublisher sut = DmaapClientFactory
 
  75             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
 
  77     private static DummyHttpServer initialize() {
 
  78         return DummyHttpServer.start(routes -> routes
 
  79                 .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
 
  80                 .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
 
  81                 .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE))
 
  82                 .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE))
 
  83                 .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE))
 
  84                 .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> sendError(resp, 404, ERROR_MESSAGE))
 
  85                 .post(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))
 
  86                 .post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
 
  91     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
 
  93         final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
 
  94         final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
 
  97         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
 
 100         StepVerifier.create(result)
 
 101                 .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
 
 108             FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request",
 
 109             FAILING_WITH_401_RESP_PATH + "," + "401 Unauthorized",
 
 110             FAILING_WITH_403_RESP_PATH + "," + "403 Forbidden",
 
 111             FAILING_WITH_404_RESP_PATH + "," + "404 Not Found",
 
 112             FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error",
 
 113             FAILING_WITH_429_RESP_PATH + "," + "429 Too Many Requests"
 
 115     void publisher_shouldHandleError(String failingPath, String failReason) {
 
 117         final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath, SERVER);
 
 118         final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason);
 
 121         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
 
 124         StepVerifier.create(result)
 
 125                 .expectNext(expectedResponse)
 
 131     void publisher_shouldHandleClientTimeoutError() {
 
 133         final Duration requestTimeout = Duration.ofMillis(1);
 
 134         final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(DELAY_RESP_TOPIC_PATH, requestTimeout);
 
 137         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
 
 140         StepVerifier.create(result)
 
 141                 .consumeNextWith(this::assertTimeoutError)
 
 147     void publisher_shouldHandleConnectionError() {
 
 149         final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(
 
 150                 SUCCESS_RESP_TOPIC_PATH, DISPOSED_HTTP_SERVER);
 
 153         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
 
 156         StepVerifier.create(result)
 
 157                 .consumeNextWith(this::assertConnectionError)
 
 162     private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, DummyHttpServer dummyHttpServer) {
 
 163         final MessageRouterSink sinkDefinition = createMRSink(topicPath, dummyHttpServer);
 
 164         return ImmutableMessageRouterPublishRequest.builder()
 
 165                 .sinkDefinition(sinkDefinition)
 
 166                 .contentType(ContentType.TEXT_PLAIN)
 
 170     private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) {
 
 171         final MessageRouterSink sinkDefinition = createMRSink(topicPath, SERVER);
 
 172         return ImmutableMessageRouterPublishRequest.builder()
 
 173                 .sinkDefinition(sinkDefinition)
 
 174                 .contentType(ContentType.TEXT_PLAIN)
 
 175                 .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build())
 
 179     private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) {
 
 180         return ImmutableMessageRouterSink.builder()
 
 182                 .topicUrl(String.format("http://%s:%d%s",
 
 183                         dummyHttpServer.host(),
 
 184                         dummyHttpServer.port(),
 
 190     private static MessageRouterPublishResponse createErrorResponse(String failReason) {
 
 191         String failReasonFormat = failReason + "\n%s";
 
 192         return ImmutableMessageRouterPublishResponse
 
 194                 .failReason(String.format(failReasonFormat, ERROR_MESSAGE))
 
 198     private void assertTimeoutError(DmaapResponse response) {
 
 199         assertThat(response.failed()).isTrue();
 
 200         assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
 
 203     private void assertConnectionError(DmaapResponse response) {
 
 204         assertThat(response.failed()).isTrue();
 
 205         assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE);