Improve retry mechanism in dmaap-client. 86/117286/2
authortkogut <tomasz.kogut@nokia.com>
Mon, 1 Feb 2021 09:26:29 +0000 (10:26 +0100)
committerTomasz Kogut <tomasz.kogut@nokia.com>
Mon, 1 Feb 2021 11:43:49 +0000 (11:43 +0000)
Return last exception instead of timeout exception when retry exhausted.
Handle no connection exception when sending requests to dmaap-mr.

Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Ibe318fa349b79999a5c8054e04e72e444a42ea78

15 files changed:
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java [new file with mode: 0644]
rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java

index 9d25555..9585007 100644 (file)
@@ -36,7 +36,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
 
 import java.time.Duration;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES;
 
@@ -83,7 +82,6 @@ public final class DmaapClientFactory {
                         .retryCount(rc.retryCount())
                         .retryableHttpResponseCodes(RETRYABLE_HTTP_CODES)
                         .customRetryableExceptions(RETRYABLE_EXCEPTIONS)
-                        .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION)
                         .build())
                 .getOrNull();
     }
index 5a51e5f..431843a 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,4 +32,10 @@ public class ClientErrorReasons {
             .messageId("SVC0001")
             .variables(Collections.singletonList("408")).build();
 
+    public static final ClientErrorReason SERVICE_UNAVAILABLE = ImmutableClientErrorReason.builder()
+            .header("503 Service unavailable")
+            .text("DMaaP MR is unavailable")
+            .messageId("SVC2001")
+            .build();
+
 }
index 7d1b0a9..5f72808 100644 (file)
@@ -34,6 +34,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
@@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.ConnectException;
 import java.time.Duration;
 import java.util.stream.Collectors;
 
@@ -87,8 +89,12 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
         LOGGER.trace("The items to be sent: {}", batch);
         return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
                 .map(httpResponse -> buildResponse(httpResponse, batch))
-                .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
-                .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
+                .doOnError(ReadTimeoutException.class,
+                        e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
+                .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+                .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+                .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+                .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch)));
     }
 
     private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
@@ -126,7 +132,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
                 : builder.failReason(extractFailReason(httpResponse)).build();
     }
 
-    private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+    private Mono<MessageRouterPublishResponse> buildErrorResponse(ClientErrorReason clientErrorReason) {
         String failReason = clientErrorReasonPresenter.present(clientErrorReason);
         return Mono.just(ImmutableMessageRouterPublishResponse.builder()
                 .failReason(failReason)
index 292a715..acb297a 100644 (file)
@@ -33,6 +33,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 
+import java.net.ConnectException;
 import java.nio.charset.StandardCharsets;
 
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
@@ -72,10 +74,12 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
                 .map(this::buildGetResponse)
                 .doOnError(ReadTimeoutException.class,
                         e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
-                .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
+                .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+                .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+                .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+                .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
     }
 
-
     private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
         ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
                 .method(HttpMethod.GET)
@@ -107,7 +111,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
                 request.consumerId());
     }
 
-    private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+    private Mono<MessageRouterSubscribeResponse> buildErrorResponse(ClientErrorReason clientErrorReason) {
         String failReason = clientErrorReasonPresenter.present(clientErrorReason);
         return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
                 .failReason(failReason)
index f82edfc..bb66220 100644 (file)
@@ -31,7 +31,6 @@ import java.net.ConnectException;
 public interface DmaapRetryConfig {
 
     Set<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class);
-    RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE;
     Set<Integer> RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504);
 
     @Value.Default
index 159fc59..825ea39 100644 (file)
@@ -438,6 +438,41 @@ class MessageRouterPublisherIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
     }
 
+    @Test
+    void publisher_shouldHandleLastRetryError500() {
+        final String topic = "TOPIC14";
+        final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+        final String responseMessage = "Response Message";
+        final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
+                "500 Internal Server Error\n%s", responseMessage);
+
+        final String path = String.format("/events/%s", topic);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(404));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(500).withBody(responseMessage));
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1));
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+    }
+
     private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
         return ImmutableMessageRouterPublisherConfig.builder()
                 .retryConfig(ImmutableDmaapRetryConfig.builder()
index 82b6661..82a2b00 100644 (file)
@@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -57,6 +56,7 @@ class MessageRouterPublisherTest {
 
     private static final String ERROR_MESSAGE = "Something went wrong";
     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
+    private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable";
     private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
     private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY";
     private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
@@ -68,15 +68,13 @@ class MessageRouterPublisherTest {
     private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
             .map(JsonPrimitive::new);
     private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
-
-    private static DummyHttpServer server;
+    private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+    private static final DummyHttpServer SERVER = initialize();
     private MessageRouterPublisher sut = DmaapClientFactory
             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
 
-
-    @BeforeAll
-    static void setUp() {
-        server = DummyHttpServer.start(routes -> routes
+    private static DummyHttpServer initialize() {
+        return DummyHttpServer.start(routes -> routes
                 .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
                 .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
                 .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE))
@@ -90,7 +88,7 @@ class MessageRouterPublisherTest {
     @Test
     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
         //given
-        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH);
+        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
         final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
 
         //when
@@ -113,7 +111,7 @@ class MessageRouterPublisherTest {
     })
     void publisher_shouldHandleError(String failingPath, String failReason) {
         //given
-        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath);
+        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath, SERVER);
         final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason);
 
         //when
@@ -142,8 +140,24 @@ class MessageRouterPublisherTest {
                 .verify(TIMEOUT);
     }
 
-    private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) {
-        final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+    @Test
+    void publisher_shouldHandleConnectionError() {
+        //given
+        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(
+                SUCCESS_RESP_TOPIC_PATH, DISPOSED_HTTP_SERVER);
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+
+        //then
+        StepVerifier.create(result)
+                .consumeNextWith(this::assertConnectionError)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, DummyHttpServer dummyHttpServer) {
+        final MessageRouterSink sinkDefinition = createMRSink(topicPath, dummyHttpServer);
         return ImmutableMessageRouterPublishRequest.builder()
                 .sinkDefinition(sinkDefinition)
                 .contentType(ContentType.TEXT_PLAIN)
@@ -151,7 +165,7 @@ class MessageRouterPublisherTest {
     }
 
     private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) {
-        final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+        final MessageRouterSink sinkDefinition = createMRSink(topicPath, SERVER);
         return ImmutableMessageRouterPublishRequest.builder()
                 .sinkDefinition(sinkDefinition)
                 .contentType(ContentType.TEXT_PLAIN)
@@ -159,12 +173,12 @@ class MessageRouterPublisherTest {
                 .build();
     }
 
-    private static MessageRouterSink createMRSink(String topicPath) {
+    private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) {
         return ImmutableMessageRouterSink.builder()
                 .name("the topic")
                 .topicUrl(String.format("http://%s:%d%s",
-                        server.host(),
-                        server.port(),
+                        dummyHttpServer.host(),
+                        dummyHttpServer.port(),
                         topicPath)
                 )
                 .build();
@@ -182,5 +196,10 @@ class MessageRouterPublisherTest {
         assertThat(response.failed()).isTrue();
         assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
     }
+
+    private void assertConnectionError(DmaapResponse response) {
+        assertThat(response.failed()).isTrue();
+        assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE);
+    }
 }
 
index 15c3bd8..2cc6d33 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import io.vavr.collection.List;
+import io.vavr.control.Try;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -382,6 +383,39 @@ class MessageRouterSubscriberIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
     }
 
+    @Test
+    void subscriber_shouldHandleLastRetryError500() {
+        //given
+        final String topic = "TOPIC9";
+        final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+                proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
+        final String responseMessage = "Response Message";
+        final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+                "500 Internal Server Error\n%s", responseMessage);
+
+        final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(404));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(500).withBody(responseMessage));
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+                retryConfig(1, 1));
+
+        //when
+        Mono<MessageRouterSubscribeResponse> response = subscriber.get(subscribeRequest);
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+    }
+
     private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
         return ImmutableMessageRouterSubscriberConfig.builder()
                 .retryConfig(ImmutableDmaapRetryConfig.builder()
index 0687539..e928f03 100644 (file)
@@ -23,7 +23,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -55,6 +54,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
 class MessageRouterSubscriberTest {
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final String ERROR_MESSAGE = "Something went wrong";
+    private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable";
     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
     private static final String CONSUMER_GROUP = "group1";
     private static final String SUCCESS_CONSUMER_ID = "consumer200";
@@ -82,16 +82,18 @@ class MessageRouterSubscriberTest {
     private static final String FAILING_WITH_500_RESP_PATH = String
             .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
 
-    private static MessageRouterSubscribeRequest mrSuccessRequest;
-    private static MessageRouterSubscribeRequest mrFailingRequest;
+    private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+    private static final DummyHttpServer SERVER = initialize();
+
     private MessageRouterSubscriber sut = DmaapClientFactory
             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-    private static MessageRouterSource sourceDefinition;
-
+    private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER);
+    private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER);
+    private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition);
+    private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
 
-    @BeforeAll
-    static void setUp() {
-        DummyHttpServer server = DummyHttpServer.start(routes -> routes
+    private static DummyHttpServer initialize() {
+        return DummyHttpServer.start(routes -> routes
                 .get(SUCCESS_RESP_PATH, (req, resp) ->
                         sendResource(resp, "/sample-mr-subscribe-response.json"))
                 .get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
@@ -100,12 +102,6 @@ class MessageRouterSubscriberTest {
                 .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE))
                 .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
                 .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)));
-
-        sourceDefinition = createMessageRouterSource(server);
-
-        mrSuccessRequest = createSuccessRequest();
-
-        mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
     }
 
     @Test
@@ -204,6 +200,17 @@ class MessageRouterSubscriberTest {
                 .verify(TIMEOUT);
     }
 
+    @Test
+    void subscriber_shouldHandleConnectionError() {
+        MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition);
+        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+        StepVerifier.create(response)
+                .consumeNextWith(this::assertConnectionError)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
     private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) {
         return ImmutableMessageRouterSource.builder()
                 .name("the topic")
@@ -211,9 +218,9 @@ class MessageRouterSubscriberTest {
                 .build();
     }
 
-    private static MessageRouterSubscribeRequest createSuccessRequest() {
+    private static MessageRouterSubscribeRequest createSuccessRequest(MessageRouterSource source) {
         return ImmutableMessageRouterSubscribeRequest.builder()
-                .sourceDefinition(sourceDefinition)
+                .sourceDefinition(source)
                 .consumerGroup(CONSUMER_GROUP)
                 .consumerId(SUCCESS_CONSUMER_ID)
                 .build();
@@ -249,5 +256,10 @@ class MessageRouterSubscriberTest {
         assertThat(response.failed()).isTrue();
         assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
     }
+
+    private void assertConnectionError(DmaapResponse response) {
+        assertThat(response.failed()).isTrue();
+        assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE);
+    }
 }
 
index 2825a87..2fde441 100644 (file)
@@ -46,6 +46,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.net.ConnectException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 
@@ -79,6 +80,7 @@ class MessageRouterPublisherImplTest {
     private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
     private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
     private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
+    private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
 
     @Test
     void puttingElementsShouldYieldNonChunkedHttpRequest() {
@@ -431,7 +433,49 @@ class MessageRouterPublisherImplTest {
 
         // then
         StepVerifier.create(responses)
-                .consumeNextWith(this::assertTimeoutError)
+                .consumeNextWith(this::assertFailedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() {
+        // given
+        final List<String> plainMessage = List.of("I", "like", "cookies");
+
+        final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+        given(clientErrorReasonPresenter.present(any()))
+                .willReturn(ERROR_MESSAGE);
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.error(new ConnectException()));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, plainMessagesMaxBatch);
+
+        // then
+        StepVerifier.create(responses)
+                .consumeNextWith(this::assertFailedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() {
+        // given
+        final List<String> plainMessage = List.of("I", "like", "cookies");
+
+        final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.just(retryableHttpResponse));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, plainMessagesMaxBatch);
+
+        // then
+        StepVerifier.create(responses)
+                .consumeNextWith(this::assertRetryableFailedResponse)
                 .expectComplete()
                 .verify(TIMEOUT);
     }
@@ -458,7 +502,7 @@ class MessageRouterPublisherImplTest {
         // then
         StepVerifier.create(responses)
                 .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
-                .consumeNextWith(this::assertTimeoutError)
+                .consumeNextWith(this::assertFailedResponse)
                 .expectComplete()
                 .verify(TIMEOUT);
     }
@@ -484,7 +528,7 @@ class MessageRouterPublisherImplTest {
 
         // then
         StepVerifier.create(responses)
-                .consumeNextWith(this::assertTimeoutError)
+                .consumeNextWith(this::assertFailedResponse)
                 .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
                 .expectComplete()
                 .verify(TIMEOUT);
@@ -549,12 +593,18 @@ class MessageRouterPublisherImplTest {
         }
     }
 
-    private void assertTimeoutError(MessageRouterPublishResponse response) {
+    private void assertFailedResponse(MessageRouterPublishResponse response) {
         assertThat(response.failed()).isTrue();
         assertThat(response.items()).isEmpty();
         assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
     }
 
+    private void assertRetryableFailedResponse(MessageRouterPublishResponse response) {
+        assertThat(response.failed()).isTrue();
+        assertThat(response.items()).isEmpty();
+        assertThat(response.failReason()).startsWith("500 ERROR");
+    }
+
     private void verifySingleResponse(List<? extends JsonElement> threeMessages,
                                       Flux<MessageRouterPublishResponse> responses) {
         StepVerifier.create(responses)
index 0396eff..74b21ad 100644 (file)
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import com.google.gson.JsonSyntaxException;
@@ -42,6 +43,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 import reactor.core.publisher.Mono;
 
+import java.net.ConnectException;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since May 2019
@@ -70,6 +73,12 @@ class MessageRouterSubscriberImplTest {
             .url(sourceDefinition.topicUrl())
             .rawBody("[]".getBytes())
             .build();
+    private final HttpResponse retryableHttpResponse = ImmutableHttpResponse.builder()
+            .statusCode(500)
+            .statusReason("Something braked")
+            .url(sourceDefinition.topicUrl())
+            .rawBody("[]".getBytes())
+            .build();
     private final HttpResponse httpResponseWithWrongStatusCode = ImmutableHttpResponse.builder()
             .statusCode(301)
             .statusReason("Something braked")
@@ -154,6 +163,53 @@ class MessageRouterSubscriberImplTest {
         assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
         assertThat(response.hasElements()).isFalse();
 
+        verify(clientErrorReasonPresenter, times(1)).present(any());
+        verify(httpClient).call(httpRequestArgumentCaptor.capture());
+        final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+        assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET);
+        assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(),
+                mrRequest.consumerGroup(), mrRequest.consumerId()));
+        assertThat(httpRequest.body()).isNull();
+    }
+
+    @Test
+    void getWithProperRequest_shouldReturnConnectionException() {
+        given(clientErrorReasonPresenter.present(any()))
+                .willReturn(ERROR_MESSAGE);
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.error(new ConnectException()));
+
+        // when
+        final Mono<MessageRouterSubscribeResponse> responses = cut
+                .get(mrRequest);
+        final MessageRouterSubscribeResponse response = responses.block();
+
+        assertThat(response.failed()).isTrue();
+        assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
+        assertThat(response.hasElements()).isFalse();
+
+        verify(clientErrorReasonPresenter, times(1)).present(any());
+        verify(httpClient).call(httpRequestArgumentCaptor.capture());
+        final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+        assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET);
+        assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(),
+                mrRequest.consumerGroup(), mrRequest.consumerId()));
+        assertThat(httpRequest.body()).isNull();
+    }
+
+    @Test
+    void getWithProperRequest_shouldReturnCertainFailedResponse() {
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.just(retryableHttpResponse));
+
+        // when
+        final Mono<MessageRouterSubscribeResponse> responses = cut
+                .get(mrRequest);
+        final MessageRouterSubscribeResponse response = responses.block();
+
+        assertThat(response.failed()).isTrue();
+        assertThat(response.failReason()).startsWith("500 Something braked");
+        assertThat(response.hasElements()).isFalse();
 
         verify(httpClient).call(httpRequestArgumentCaptor.capture());
         final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
index 76bde27..d25d746 100644 (file)
@@ -25,6 +25,7 @@ import io.vavr.collection.HashSet;
 import io.vavr.collection.Stream;
 import io.vavr.control.Option;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,12 +84,17 @@ public class RxHttpClient {
     }
 
     private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) {
-        if (shouldRetry(status.code())) {
-            return Mono.error(new RetryConfig.RetryableException());
-        }
         return content.asByteArray()
                 .defaultIfEmpty(new byte[0])
-                .map(bytes -> new NettyHttpResponse(url, status, bytes));
+                .map(bytes -> new NettyHttpResponse(url, status, bytes))
+                .map(this::validatedResponse);
+    }
+
+    private HttpResponse validatedResponse(HttpResponse response) {
+        if (shouldRetry(response.statusCode())) {
+            throw new RetryableException(response);
+        }
+        return response;
     }
 
     private boolean shouldRetry(int code) {
@@ -149,15 +155,12 @@ public class RxHttpClient {
     }
 
     private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) {
-        RetryBackoffSpec retry = Retry
+        return Retry
                 .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval())
                 .doBeforeRetry(retrySignal -> context.withSlf4jMdc(
                         LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
-                .filter(ex -> isRetryable(retryConfig, ex));
-
-        return Option.of(retryConfig.onRetryExhaustedException())
-                .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex))
-                .getOrElse(retry);
+                .filter(ex -> isRetryable(retryConfig, ex))
+                .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure());
     }
 
     private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
index a0ae199..e458490 100644 (file)
@@ -23,7 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config;
 import io.vavr.collection.HashSet;
 import io.vavr.collection.Set;
 import org.immutables.value.Value;
-import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
 
 import java.time.Duration;
 
@@ -52,8 +52,4 @@ public interface RetryConfig {
         }
         return result;
     }
-
-    @Nullable RuntimeException onRetryExhaustedException();
-
-    class RetryableException extends RuntimeException {}
 }
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/RetryableException.java
new file mode 100644 (file)
index 0000000..aa48497
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions;
+
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+
+public class RetryableException extends RuntimeException {
+
+    private final HttpResponse response;
+
+    public RetryableException(HttpResponse response) {
+        this.response = response;
+    }
+
+    public HttpResponse getResponse() {
+        return response;
+    }
+}
index daf04c6..8d076e0 100644 (file)
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -214,7 +215,7 @@ class RxHttpClientIT {
     }
 
     @Test
-    void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception {
+    void getWithConnectExceptionWhenClosedServer() throws Exception {
         // given
         REQUEST_COUNTER = new AtomicInteger();
         final HttpRequest httpRequest = requestForClosedServer("/sample-get")
@@ -231,37 +232,13 @@ class RxHttpClientIT {
 
         // then
         StepVerifier.create(response)
-                .expectError(IllegalStateException.class)
+                .expectError(ConnectException.class)
                 .verify(TIMEOUT);
         assertNoServerResponse();
     }
 
     @Test
-    void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception {
-        // given
-        REQUEST_COUNTER = new AtomicInteger();
-        final HttpRequest httpRequest = requestForClosedServer("/sample-get")
-                .method(HttpMethod.GET)
-                .build();
-        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
-                .retryConfig(defaultRetryConfig()
-                        .customRetryableExceptions(HashSet.of(ConnectException.class))
-                        .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
-                        .build())
-                .build());
-
-        // when
-        final Mono<HttpResponse> response = cut.call(httpRequest);
-
-        // then
-        StepVerifier.create(response)
-                .expectError(ReadTimeoutException.class)
-                .verify(TIMEOUT);
-        assertNoServerResponse();
-    }
-
-    @Test
-    void getWithRetryExhaustedExceptionWhen500() throws Exception {
+    void getWithRetryableExceptionWhen500() throws Exception {
         // given
         REQUEST_COUNTER = new AtomicInteger();
         final HttpRequest httpRequest = requestFor("/retry-get-500")
@@ -278,31 +255,7 @@ class RxHttpClientIT {
 
         // then
         StepVerifier.create(response)
-                .expectError(IllegalStateException.class)
-                .verify(TIMEOUT);
-        assertRetry();
-    }
-
-    @Test
-    void getWithCustomRetryExhaustedExceptionWhen500() throws Exception {
-        // given
-        REQUEST_COUNTER = new AtomicInteger();
-        final HttpRequest httpRequest = requestFor("/retry-get-500")
-                .method(HttpMethod.GET)
-                .build();
-        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
-                .retryConfig(defaultRetryConfig()
-                        .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
-                        .retryableHttpResponseCodes(HashSet.of(500))
-                        .build())
-                .build());
-
-        // when
-        final Mono<HttpResponse> response = cut.call(httpRequest);
-
-        // then
-        StepVerifier.create(response)
-                .expectError(ReadTimeoutException.class)
+                .expectError(RetryableException.class)
                 .verify(TIMEOUT);
         assertRetry();
     }