Support retry in DCAE-SDK DMaaP-Client 63/116963/4
authortkogut <tomasz.kogut@nokia.com>
Tue, 19 Jan 2021 08:00:56 +0000 (09:00 +0100)
committertkogut <tomasz.kogut@nokia.com>
Wed, 20 Jan 2021 11:20:55 +0000 (12:20 +0100)
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Id3f98c0a9367f7c7c2c53ed3eba8805a5a6ab87e

30 files changed:
pom.xml
rest-services/dmaap-client/pom.xml
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ClientError.java [moved from rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java with 93% similarity]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/RequestError.java [moved from rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java with 93% similarity]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/model/ServiceException.java [moved from rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java with 94% similarity]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapTimeoutConfig.java [moved from rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java with 92% similarity]
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java [new file with mode: 0644]
rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java [new file with mode: 0644]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java [new file with mode: 0644]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java

diff --git a/pom.xml b/pom.xml
index 80b0c91..2c2ed16 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -60,8 +60,8 @@
 
     <properties>
         <java.version>11</java.version>
-        <junit-jupiter.version>5.3.1</junit-jupiter.version>
-        <junit-vintage.version>5.3.1</junit-vintage.version>
+        <junit-jupiter.version>5.7.0</junit-jupiter.version>
+        <junit-vintage.version>5.7.0</junit-vintage.version>
         <junit-platform.version>1.3.1</junit-platform.version>
         <immutables.version>2.7.5</immutables.version>
         <assertj-core.version>3.12.2</assertj-core.version>
         <commons-text.version>1.6</commons-text.version>
         <jetbrains-annotations.version>16.0.3</jetbrains-annotations.version>
         <protoc-jar-maven-plugin.version>3.6.0.2</protoc-jar-maven-plugin.version>
-        <testcontainers.version>1.15.0</testcontainers.version>
+        <testcontainers.version>1.15.1</testcontainers.version>
         <spring.boot.version>2.4.0</spring.boot.version>
         <system.rules.version>1.17.2</system.rules.version>
         <openapi4j.version>1.0.3</openapi4j.version>
-        <toxiproxy-java.version>2.1.4</toxiproxy-java.version>
+        <mockserver-client.version>5.11.2</mockserver-client.version>
        <sonar.coverage.jacoco.xmlReportPaths>
          ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
        </sonar.coverage.jacoco.xmlReportPaths>
index d619590..b862031 100644 (file)
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-engine</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+    </dependency>
     <dependency>
       <groupId>io.projectreactor</groupId>
       <artifactId>reactor-test</artifactId>
       <groupId>org.testcontainers</groupId>
       <artifactId>junit-jupiter</artifactId>
     </dependency>
-    <dependency>
-      <groupId>eu.rekawek.toxiproxy</groupId>
-      <artifactId>toxiproxy-java</artifactId>
-      <version>${toxiproxy-java.version}</version>
-      <scope>test</scope>
-    </dependency>
+      <dependency>
+          <groupId>org.mock-server</groupId>
+          <artifactId>mockserver-client-java</artifactId>
+          <version>${mockserver-client.version}</version>
+      </dependency>
   </dependencies>
 </project>
index 3c27da1..9d25555 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
+import io.vavr.control.Option;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
+import java.time.Duration;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES;
+
 /**
- *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
@@ -44,19 +55,37 @@ public final class DmaapClientFactory {
         return new MessageRouterPublisherImpl(
                 createHttpClient(clientConfiguration),
                 clientConfiguration.maxBatchSize(),
-                clientConfiguration.maxBatchDuration());
+                clientConfiguration.maxBatchDuration(),
+                new ClientErrorReasonPresenter());
     }
 
     public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber(
             @NotNull MessageRouterSubscriberConfig clientConfiguration) {
         return new MessageRouterSubscriberImpl(
                 createHttpClient(clientConfiguration),
-                clientConfiguration.gsonInstance());
+                clientConfiguration.gsonInstance(),
+                new ClientErrorReasonPresenter());
     }
 
     private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) {
+        RxHttpClientConfig clientConfig = ImmutableRxHttpClientConfig.builder()
+                .retryConfig(createRetry(config))
+                .build();
         return config.securityKeys() == null
-                ? RxHttpClientFactory.create()
-                : RxHttpClientFactory.create(config.securityKeys());
+                ? RxHttpClientFactory.create(clientConfig)
+                : RxHttpClientFactory.create(config.securityKeys(), clientConfig);
+    }
+
+    private static RetryConfig createRetry(DmaapClientConfiguration config) {
+        return Option.of(config.retryConfig())
+                .map(rc -> ImmutableRetryConfig.builder()
+                        .retryInterval(Duration.ofSeconds(rc.retryIntervalInSeconds()))
+                        .retryCount(rc.retryCount())
+                        .retryableHttpResponseCodes(RETRYABLE_HTTP_CODES)
+                        .customRetryableExceptions(RETRYABLE_EXCEPTIONS)
+                        .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION)
+                        .build())
+                .getOrNull();
     }
 }
+
index 6b22b37..1eaae78 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,15 +23,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ClientError;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableClientError;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableRequestError;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model.ImmutableServiceException;
 
 public class ClientErrorReasonPresenter {
 
-    private ClientErrorReasonPresenter() { }
-
     private static final Gson GSON = new GsonBuilder().create();
     private static final String PATTERN = "%s\n%s";
 
-    public static String present(ClientErrorReason clientErrorReason) {
+    public String present(ClientErrorReason clientErrorReason) {
         ImmutableServiceException simpleServiceException = ImmutableServiceException.builder()
                 .messageId(clientErrorReason.messageId())
                 .text(clientErrorReason.text())
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model;
 
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model;
 
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.model;
 
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
index 16068da..7d1b0a9 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -61,12 +61,15 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
     private final RxHttpClient httpClient;
     private final int maxBatchSize;
     private final Duration maxBatchDuration;
+    private final ClientErrorReasonPresenter clientErrorReasonPresenter;
+
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class);
 
-    public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration) {
+    public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) {
         this.httpClient = httpClient;
         this.maxBatchSize = maxBatchSize;
         this.maxBatchDuration = maxBatchDuration;
+        this.clientErrorReasonPresenter = clientErrorReasonPresenter;
     }
 
     @Override
@@ -124,7 +127,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
     }
 
     private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
-        String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+        String failReason = clientErrorReasonPresenter.present(clientErrorReason);
         return Mono.just(ImmutableMessageRouterPublishResponse.builder()
                 .failReason(failReason)
                 .build());
index f7ccf4f..292a715 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
-
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
 import io.netty.handler.timeout.ReadTimeoutException;
 import io.vavr.collection.List;
-import java.nio.charset.StandardCharsets;
-
 import io.vavr.control.Option;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -48,6 +44,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 
+import java.nio.charset.StandardCharsets;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since March 2019
@@ -55,11 +55,14 @@ import reactor.core.publisher.Mono;
 public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
     private final RxHttpClient httpClient;
     private final Gson gson;
+    private final ClientErrorReasonPresenter clientErrorReasonPresenter;
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class);
 
-    public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson) {
+    public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson,
+                                       ClientErrorReasonPresenter clientErrorReasonPresenter) {
         this.httpClient = httpClient;
         this.gson = gson;
+        this.clientErrorReasonPresenter = clientErrorReasonPresenter;
     }
 
     @Override
@@ -67,7 +70,8 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
         LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
         return httpClient.call(buildGetHttpRequest(request))
                 .map(this::buildGetResponse)
-                .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
+                .doOnError(ReadTimeoutException.class,
+                        e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
                 .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
     }
 
@@ -91,7 +95,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
                 : builder.failReason(extractFailReason(httpResponse)).build();
     }
 
-    private List<JsonElement> getAsJsonElements(HttpResponse httpResponse){
+    private List<JsonElement> getAsJsonElements(HttpResponse httpResponse) {
         JsonArray bodyAsJsonArray = httpResponse
                 .bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class);
 
@@ -104,7 +108,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
     }
 
     private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
-        String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+        String failReason = clientErrorReasonPresenter.present(clientErrorReason);
         return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
                 .failReason(failReason)
                 .build());
index 95c5e7d..a5a87fb 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
 
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
 /**
@@ -35,5 +35,5 @@ public interface DmaapRequest {
         return RequestDiagnosticContext.create();
     }
 
-    @Nullable TimeoutConfig timeoutConfig();
+    @Nullable DmaapTimeoutConfig timeoutConfig();
 }
index ac677f0..3e28351 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,5 +32,8 @@ public interface DmaapClientConfiguration {
     default @Nullable SecurityKeys securityKeys() {
         return null;
     }
-
+    @Value.Default
+    default @Nullable DmaapRetryConfig retryConfig(){
+        return null;
+    }
 }
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfig.java
new file mode 100644 (file)
index 0000000..f82edfc
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.collection.HashSet;
+import io.vavr.collection.Set;
+import org.immutables.value.Value;
+
+import java.net.ConnectException;
+
+@Value.Immutable
+public interface DmaapRetryConfig {
+
+    Set<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class);
+    RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE;
+    Set<Integer> RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504);
+
+    @Value.Default
+    default int retryCount() {
+        return 3;
+    }
+
+    @Value.Default
+    default int retryIntervalInSeconds() {
+        return 1;
+    }
+
+    @Value.Check
+    default void validate() {
+        validateRetryCount();
+        validateRetryInterval();
+    }
+
+    private void validateRetryCount() {
+        int rc = retryCount();
+        if (rc < 0)
+            throw new IllegalArgumentException(String.format("Invalid value: %d, retryCount should be (0-n)", rc));
+    }
+
+    private void validateRetryInterval() {
+        long ri = retryIntervalInSeconds();
+        if (ri < 1)
+            throw new IllegalArgumentException(String.format("Invalid value: %d, retryInterval should be (1-n)", ri));
+    }
+}
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ import org.immutables.value.Value;
 import java.time.Duration;
 
 @Value.Immutable
-public interface TimeoutConfig {
+public interface DmaapTimeoutConfig {
 
     @Value.Default
     default Duration getTimeout() {
index 2b8027c..1a31580 100644 (file)
@@ -38,7 +38,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
 import reactor.core.publisher.Flux;
 
 import java.time.Duration;
@@ -56,7 +56,7 @@ public final class MessageRouterTestsUtils {
         return ImmutableMessageRouterPublishRequest.builder()
                 .sinkDefinition(createMessageRouterSink(topicUrl))
                 .contentType(ContentType.APPLICATION_JSON)
-                .timeoutConfig(ImmutableTimeoutConfig.builder()
+                .timeoutConfig(ImmutableDmaapTimeoutConfig.builder()
                         .timeout(timeout)
                         .build())
                 .build();
@@ -86,7 +86,7 @@ public final class MessageRouterTestsUtils {
 
         return ImmutableMessageRouterSubscribeRequest
                 .builder()
-                .timeoutConfig(ImmutableTimeoutConfig.builder()
+                .timeoutConfig(ImmutableDmaapTimeoutConfig.builder()
                         .timeout(timeout)
                         .build())
                 .sourceDefinition(getImmutableMessageRouterSource(topicUrl))
index 494ca62..5b1984d 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,11 +27,10 @@ import java.net.URL;
 
 final class DMaapContainer {
     private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
-    private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(
-            MR_COMPOSE_RESOURCE_NAME);
+    private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(MR_COMPOSE_RESOURCE_NAME);
     static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
     static final String DMAAP_SERVICE_NAME = "dmaap";
-    static final int PROXY_SERVICE_EXPOSED_PORT = 8666;
+    static final int PROXY_MOCK_SERVICE_EXPOSED_PORT = 1080;
     static final String LOCALHOST = "localhost";
 
     private DMaapContainer() {}
@@ -43,11 +42,11 @@ final class DMaapContainer {
                 .withLocalCompose(true);
     }
 
-    private static String getDockerComposeFilePath(String resourceName){
+    private static String getDockerComposeFilePath(String resourceName) {
         URL resource = DMaapContainer.class.getClassLoader()
                 .getResource(resourceName);
 
-        if(resource != null) return resource.getFile();
+        if (resource != null) return resource.getFile();
         else throw new DockerComposeNotFoundException(String
                 .format("File %s does not exist", resourceName));
     }
index 24cd2c3..f6ef94b 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,16 +22,21 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import eu.rekawek.toxiproxy.Proxy;
-import eu.rekawek.toxiproxy.ToxiproxyClient;
 import io.vavr.collection.List;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.matchers.TimeToLive;
+import org.mockserver.matchers.Times;
+import org.mockserver.verify.VerificationTimes;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 import org.testcontainers.containers.DockerComposeContainer;
@@ -41,11 +46,11 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
-import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
@@ -56,14 +61,19 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
 
 @Testcontainers
 class MessageRouterPublisherIT {
     @Container
-    private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
+    private static final DockerComposeContainer CONTAINER = createContainerInstance();
+    private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
+            LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+    private static String EVENTS_PATH;
+    private static String PROXY_MOCK_EVENTS_PATH;
+
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
             + "{"
@@ -85,22 +95,21 @@ class MessageRouterPublisherIT {
             + "}"
             + "}"
             + "}";
-    private static Proxy DMAAP_PROXY;
-    private static String EVENTS_PATH;
-    private static String PROXY_EVENTS_PATH;
+
     private final MessageRouterPublisher publisher = DmaapClientFactory
             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
     private final MessageRouterSubscriber subscriber = DmaapClientFactory
             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
     @BeforeAll
-    static void setUp() throws IOException {
+    static void setUp() {
         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
-        PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
+        PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+    }
 
-        DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
-                String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
-                String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
+    @BeforeEach
+    void set() {
+        MOCK_SERVER_CLIENT.reset();
     }
 
     @Test
@@ -302,17 +311,18 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException {
+    void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
         //given
-        final String toxic = "latency-toxic";
-        DMAAP_PROXY.toxics()
-                .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
         final String topic = "TOPIC10";
         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
         final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
         final MessageRouterPublishRequest mrRequest = createPublishRequest(
-                String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1));
+                String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), Duration.ofSeconds(1));
         final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
+        final String path = String.format("/events/%s", topic);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
 
         //when
         final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
@@ -323,7 +333,75 @@ class MessageRouterPublisherIT {
                 .expectComplete()
                 .verify(TIMEOUT);
 
-        //cleanup
-        DMAAP_PROXY.toxics().get(toxic).remove();
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
+    }
+
+    @Test
+    void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
+        final String topic = "TOPIC11";
+        final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
+        final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+        final String path = String.format("/events/%s", topic);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(404));
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+    }
+
+    @Test
+    void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
+        final String topic = "TOPIC12";
+        final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonElement> plainBatch = plainBatch(singleJsonMessage);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+        final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+        final String path = String.format("/events/%s", topic);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 10));
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+    }
+
+    private MessageRouterPublisherConfig retryConfig() {
+        return ImmutableMessageRouterPublisherConfig.builder()
+                .retryConfig(ImmutableDmaapRetryConfig.builder()
+                        .retryIntervalInSeconds(1)
+                        .retryCount(1)
+                        .build())
+                .build();
     }
 }
index b0a07ed..82b6661 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,14 +25,18 @@ import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -40,8 +44,10 @@ import reactor.test.StepVerifier;
 
 import java.time.Duration;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -50,12 +56,14 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
 class MessageRouterPublisherTest {
 
     private static final String ERROR_MESSAGE = "Something went wrong";
+    private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
     private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
+    private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY";
     private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
     private static final String FAILING_WITH_401_RESP_PATH = "/events/TOPIC401";
     private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
     private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
-    private static final String FAILING_WITH_500_TOPIC_PATH = "/events/TOPIC500";
+    private static final String FAILING_WITH_500_RESP_PATH = "/events/TOPIC500";
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
             .map(JsonPrimitive::new);
@@ -69,29 +77,22 @@ class MessageRouterPublisherTest {
     @BeforeAll
     static void setUp() {
         server = DummyHttpServer.start(routes -> routes
-                .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) ->
-                        sendString(resp, Mono.just("OK")))
-                .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
-                        sendError(resp, 400, ERROR_MESSAGE))
-                .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
-                        sendError(resp, 401, ERROR_MESSAGE))
-                .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
-                        sendError(resp, 403, ERROR_MESSAGE))
-                .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
-                        sendError(resp, 404, ERROR_MESSAGE))
-                .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
-                        sendError(resp, 500, ERROR_MESSAGE))
+                .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
+                .post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
+                .post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE))
+                .post(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE))
+                .post(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE))
+                .post(FAILING_WITH_404_RESP_PATH, (req, resp) -> sendError(resp, 404, ERROR_MESSAGE))
+                .post(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))
         );
     }
 
     @Test
     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
         //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(SUCCESS_RESP_TOPIC_PATH,
-                ContentType.TEXT_PLAIN);
+        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH);
         final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
 
-
         //when
         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
 
@@ -102,13 +103,18 @@ class MessageRouterPublisherTest {
                 .verify(TIMEOUT);
     }
 
-    @Test
-    void publisher_shouldHandleBadRequestError() {
+    @ParameterizedTest
+    @CsvSource({
+            FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request",
+            FAILING_WITH_401_RESP_PATH + "," + "401 Unauthorized",
+            FAILING_WITH_403_RESP_PATH + "," + "403 Forbidden",
+            FAILING_WITH_404_RESP_PATH + "," + "404 Not Found",
+            FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error"
+    })
+    void publisher_shouldHandleError(String failingPath, String failReason) {
         //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_400_RESP_PATH,
-                ContentType.TEXT_PLAIN);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "400 Bad Request\n%s", ERROR_MESSAGE);
+        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath);
+        final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason);
 
         //when
         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
@@ -121,83 +127,40 @@ class MessageRouterPublisherTest {
     }
 
     @Test
-    void publisher_shouldHandleUnauthorizedError() {
+    void publisher_shouldHandleClientTimeoutError() {
         //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_401_RESP_PATH,
-                ContentType.TEXT_PLAIN);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "401 Unauthorized\n%s", ERROR_MESSAGE);
+        final Duration requestTimeout = Duration.ofMillis(1);
+        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(DELAY_RESP_TOPIC_PATH, requestTimeout);
 
         //when
         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
 
         //then
         StepVerifier.create(result)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void publisher_shouldHandleForbiddenError() {
-        //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_403_RESP_PATH,
-                ContentType.TEXT_PLAIN);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "403 Forbidden\n%s", ERROR_MESSAGE);
-
-        //when
-        final Flux<MessageRouterPublishResponse> result = sut
-                .put(mrRequest, messageBatch);
-
-        //then
-        StepVerifier.create(result)
-                .expectNext(expectedResponse)
+                .consumeNextWith(this::assertTimeoutError)
                 .expectComplete()
                 .verify(TIMEOUT);
     }
 
-    @Test
-    void publisher_shouldHandleNotFoundError() {
-        //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_404_RESP_PATH,
-                ContentType.TEXT_PLAIN);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "404 Not Found\n%s", ERROR_MESSAGE);
-
-        //when
-        final Flux<MessageRouterPublishResponse> result = sut
-                .put(mrRequest, messageBatch);
-
-        //then
-        StepVerifier.create(result)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
+    private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) {
+        final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+        return ImmutableMessageRouterPublishRequest.builder()
+                .sinkDefinition(sinkDefinition)
+                .contentType(ContentType.TEXT_PLAIN)
+                .build();
     }
 
-    @Test
-    void publisher_shouldHandleInternalServerError() {
-        //given
-        final MessageRouterPublishRequest mrRequest = createMRRequest(FAILING_WITH_500_TOPIC_PATH,
-                ContentType.TEXT_PLAIN);
-        final MessageRouterPublishResponse expectedResponse = createErrorResponse(
-                "500 Internal Server Error\n%s", ERROR_MESSAGE);
-
-        //when
-        final Flux<MessageRouterPublishResponse> result = sut
-                .put(mrRequest, messageBatch);
-
-        //then
-        StepVerifier.create(result)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
+    private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) {
+        final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+        return ImmutableMessageRouterPublishRequest.builder()
+                .sinkDefinition(sinkDefinition)
+                .contentType(ContentType.TEXT_PLAIN)
+                .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build())
+                .build();
     }
 
-
-    private MessageRouterPublishRequest createMRRequest(String topicPath, ContentType contentType) {
-        final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+    private static MessageRouterSink createMRSink(String topicPath) {
+        return ImmutableMessageRouterSink.builder()
                 .name("the topic")
                 .topicUrl(String.format("http://%s:%d%s",
                         server.host(),
@@ -205,18 +168,19 @@ class MessageRouterPublisherTest {
                         topicPath)
                 )
                 .build();
-
-        return ImmutableMessageRouterPublishRequest.builder()
-                .sinkDefinition(sinkDefinition)
-                .contentType(contentType)
-                .build();
     }
 
-    private MessageRouterPublishResponse createErrorResponse(String failReasonFormat, Object... formatArgs) {
+    private static MessageRouterPublishResponse createErrorResponse(String failReason) {
+        String failReasonFormat = failReason + "\n%s";
         return ImmutableMessageRouterPublishResponse
                 .builder()
-                .failReason(String.format(failReasonFormat, formatArgs))
+                .failReason(String.format(failReasonFormat, ERROR_MESSAGE))
                 .build();
     }
+
+    private void assertTimeoutError(DmaapResponse response) {
+        assertThat(response.failed()).isTrue();
+        assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
+    }
 }
 
index bd161aa..1f4e499 100644 (file)
@@ -22,14 +22,18 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import eu.rekawek.toxiproxy.Proxy;
-import eu.rekawek.toxiproxy.ToxiproxyClient;
 import io.vavr.collection.List;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockserver.client.MockServerClient;
+import org.mockserver.matchers.Times;
+import org.mockserver.verify.VerificationTimes;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 import org.testcontainers.containers.DockerComposeContainer;
@@ -39,11 +43,11 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
-import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
@@ -52,19 +56,23 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
-
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
 
 @Testcontainers
 class MessageRouterSubscriberIT {
+    @Container
+    private static final DockerComposeContainer CONTAINER = createContainerInstance();
+    private static final MockServerClient MOCK_SERVER_CLIENT = new MockServerClient(
+            LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+    private static String EVENTS_PATH;
+    private static String PROXY_MOCK_EVENTS_PATH;
+
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final String CONSUMER_GROUP = "group1";
     private static final String CONSUMER_ID = "consumer200";
-    private static String PROXY_EVENTS_PATH;
-    private static Proxy DMAAP_PROXY;
     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
             "{" +
             "\"mrstatus\":3001," +
@@ -85,27 +93,21 @@ class MessageRouterSubscriberIT {
             + "}"
             + "}";
 
-    @Container
-    private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
-
-    private static String EVENTS_PATH;
-
     private MessageRouterPublisher publisher = DmaapClientFactory
             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
     private MessageRouterSubscriber subscriber = DmaapClientFactory
             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
-
     @BeforeAll
-    static void setUp() throws IOException {
+    static void setUp() {
         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
-        PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
-
-        DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
-                String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
-                String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
+        PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
     }
 
+    @BeforeEach
+    void set() {
+        MOCK_SERVER_CLIENT.reset();
+    }
 
     @Test
     void subscriber_shouldHandleNoSuchTopicException() {
@@ -128,7 +130,7 @@ class MessageRouterSubscriberIT {
     }
 
     @Test
-    void subscriberShouldHandleSingleItemResponse(){
+    void subscriberShouldHandleSingleItemResponse() {
         //given
         final String topic = "TOPIC";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -207,7 +209,7 @@ class MessageRouterSubscriberIT {
     }
 
     @Test
-    void subscriber_shouldSubscribeToTopic(){
+    void subscriber_shouldSubscribeToTopic() {
         //given
         final String topic = "TOPIC4";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -234,17 +236,16 @@ class MessageRouterSubscriberIT {
     }
 
     @Test
-    void subscriber_shouldHandleTimeoutException() throws IOException {
+    void subscriber_shouldHandleTimeoutException() {
         //given
-        final String topic = "newTopic";
+        final String topic = "TOPIC5";
         final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
-                String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
-        final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
-                TIMEOUT_ERROR_MESSAGE);
-
-        final String toxic = "latency-toxic";
-        DMAAP_PROXY.toxics()
-                .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
+                String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+        final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(TIMEOUT_ERROR_MESSAGE);
+        final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 5));
 
         //when
         Mono<MessageRouterSubscribeResponse> response = subscriber
@@ -256,7 +257,88 @@ class MessageRouterSubscriberIT {
                 .expectComplete()
                 .verify(TIMEOUT);
 
-        //cleanup
-        DMAAP_PROXY.toxics().get(toxic).remove();
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
+    }
+
+    @Test
+    void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
+        //given
+        final String topic = "TOPIC6";
+        final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
+
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(404));
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+
+        //when
+        registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+                createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+    }
+
+    @Test
+    void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
+        //given
+        final String topic = "TOPIC7";
+        final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+                proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 10));
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+
+        //when
+        registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+                createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+    }
+
+    private MessageRouterSubscriberConfig retryConfig() {
+        return ImmutableMessageRouterSubscriberConfig.builder()
+                .retryConfig(ImmutableDmaapRetryConfig.builder()
+                        .retryIntervalInSeconds(1)
+                        .retryCount(1)
+                        .build())
+                .build();
     }
 }
index 1858478..0687539 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
-
 import com.google.gson.JsonElement;
 import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
-
-import java.time.Duration;
-
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.netty.http.server.HttpServerRoutes;
 import reactor.test.StepVerifier;
 
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since May 2019
@@ -51,8 +55,10 @@ import reactor.test.StepVerifier;
 class MessageRouterSubscriberTest {
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final String ERROR_MESSAGE = "Something went wrong";
+    private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
     private static final String CONSUMER_GROUP = "group1";
     private static final String SUCCESS_CONSUMER_ID = "consumer200";
+    private static final String DELAY_CONSUMER_ID = "delay200";
     private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
     private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
     private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
@@ -63,6 +69,8 @@ class MessageRouterSubscriberTest {
 
     private static final String SUCCESS_RESP_PATH = String
             .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
+    private static final String DELAY_RESP_PATH = String
+            .format("%s/%s", CONSUMER_PATH, DELAY_CONSUMER_ID);
     private static final String FAILING_WITH_401_RESP_PATH = String
             .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
     private static final String FAILING_WITH_403_RESP_PATH = String
@@ -83,7 +91,15 @@ class MessageRouterSubscriberTest {
 
     @BeforeAll
     static void setUp() {
-        DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberTest::setRoutes);
+        DummyHttpServer server = DummyHttpServer.start(routes -> routes
+                .get(SUCCESS_RESP_PATH, (req, resp) ->
+                        sendResource(resp, "/sample-mr-subscribe-response.json"))
+                .get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
+                .get(FAILING_WITH_401_RESP_PATH, (req, resp) -> sendError(resp, 401, ERROR_MESSAGE))
+                .get(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE))
+                .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE))
+                .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
+                .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)));
 
         sourceDefinition = createMessageRouterSource(server);
 
@@ -110,69 +126,19 @@ class MessageRouterSubscriberTest {
                 .verify(TIMEOUT);
     }
 
-    @Test
-    void subscriber_shouldGetUnauthorizedErrorResponse() {
-        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
-        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("401 Unauthorized\n%s", ERROR_MESSAGE));
-
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriber_shouldGetForbiddenErrorResponse() {
-        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
-        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("403 Forbidden\n%s", ERROR_MESSAGE));
-
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriber_shouldGetConflictErrorResponse() {
-        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
+    @ParameterizedTest
+    @CsvSource({
+            FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized",
+            FAILING_WITH_403_CONSUMER_ID + "," + "403 Forbidden",
+            FAILING_WITH_409_CONSUMER_ID + "," + "409 Conflict",
+            FAILING_WITH_429_CONSUMER_ID + "," + "429 Too Many Requests",
+            FAILING_WITH_500_CONSUMER_ID + "," + "500 Internal Server Error"
+    })
+    void subscriber_shouldHandleError(String consumerId, String failReason) {
+        MessageRouterSubscribeRequest request = createFailingRequest(consumerId);
         Mono<MessageRouterSubscribeResponse> response = sut.get(request);
 
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("409 Conflict\n%s", ERROR_MESSAGE));
-
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriber_shouldGetTooManyRequestsErrorResponse() {
-        MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
-        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
-
-        StepVerifier.create(response)
-                .expectNext(expectedResponse)
-                .expectComplete()
-                .verify(TIMEOUT);
-    }
-
-    @Test
-    void subscriber_shouldGetInternalServerErrorResponse() {
-        Mono<MessageRouterSubscribeResponse> response = sut
-                .get(mrFailingRequest);
-
-        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
-                .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
+        MessageRouterSubscribeResponse expectedResponse = createErrorResponse(failReason);
 
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
@@ -226,20 +192,16 @@ class MessageRouterSubscriberTest {
                 .verify(TIMEOUT);
     }
 
-    private static HttpServerRoutes setRoutes(HttpServerRoutes routes) {
-        return routes
-                .get(SUCCESS_RESP_PATH, (req, resp) ->
-                        sendResource(resp, "/sample-mr-subscribe-response.json"))
-                .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
-                        sendError(resp, 401, ERROR_MESSAGE))
-                .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
-                        sendError(resp, 403, ERROR_MESSAGE))
-                .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
-                        sendError(resp, 409, ERROR_MESSAGE))
-                .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
-                        sendError(resp, 429, ERROR_MESSAGE))
-                .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
-                        sendError(resp, 500, ERROR_MESSAGE));
+    @Test
+    void subscriber_shouldHandleClientTimeoutError() {
+        Duration requestTimeout = Duration.ofMillis(1);
+        MessageRouterSubscribeRequest request = createDelayRequest(DELAY_CONSUMER_ID, requestTimeout);
+        Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+        StepVerifier.create(response)
+                .consumeNextWith(this::assertTimeoutError)
+                .expectComplete()
+                .verify(TIMEOUT);
     }
 
     private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) {
@@ -257,6 +219,15 @@ class MessageRouterSubscriberTest {
                 .build();
     }
 
+    private static MessageRouterSubscribeRequest createDelayRequest(String consumerId, Duration timeout) {
+        return ImmutableMessageRouterSubscribeRequest.builder()
+                .sourceDefinition(sourceDefinition)
+                .consumerGroup(CONSUMER_GROUP)
+                .consumerId(consumerId)
+                .timeoutConfig(ImmutableDmaapTimeoutConfig.builder().timeout(timeout).build())
+                .build();
+    }
+
     private static MessageRouterSubscribeRequest createFailingRequest(String consumerId) {
         return ImmutableMessageRouterSubscribeRequest
                 .builder()
@@ -266,11 +237,17 @@ class MessageRouterSubscriberTest {
                 .build();
     }
 
-    private static MessageRouterSubscribeResponse createErrorResponse(String failReason) {
+    private MessageRouterSubscribeResponse createErrorResponse(String failReason) {
+        String failReasonFormat = failReason + "\n%s";
         return ImmutableMessageRouterSubscribeResponse
                 .builder()
-                .failReason(failReason)
+                .failReason(String.format(failReasonFormat, ERROR_MESSAGE))
                 .build();
     }
+
+    private void assertTimeoutError(DmaapResponse response) {
+        assertThat(response.failed()).isTrue();
+        assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
+    }
 }
 
index 9b318d7..76d7a38 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@ class ClientErrorReasonPresenterTest {
                 + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\"}}}";
 
         //when
-        String actual = ClientErrorReasonPresenter.present(clientErrorReason);
+        String actual = new ClientErrorReasonPresenter().present(clientErrorReason);
 
         //then
         assertThat(actual).isEqualTo(expected);
@@ -48,7 +48,7 @@ class ClientErrorReasonPresenterTest {
                 + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\",\"variables\":[\"v1\",\"v2\"]}}}";
 
         //when
-        String actual = ClientErrorReasonPresenter.present(clientErrorReason);
+        String actual = new ClientErrorReasonPresenter().present(clientErrorReason);
 
         //then
         assertThat(actual).isEqualTo(expected);
index f29bfa2..2825a87 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpR
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import reactor.core.publisher.Flux;
@@ -69,9 +70,11 @@ class MessageRouterPublisherImplTest {
     private static final Duration TIMEOUT = Duration.ofSeconds(5);
     private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
     private static final int MAX_BATCH_SIZE = 3;
-    public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout";
+    private static final String ERROR_MESSAGE = "Something went wrong";
     private final RxHttpClient httpClient = mock(RxHttpClient.class);
-    private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
+    private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
+    private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
+            httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
     private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
     private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
@@ -417,7 +420,10 @@ class MessageRouterPublisherImplTest {
         final List<String> plainMessage = List.of("I", "like", "cookies");
 
         final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
-        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+        given(clientErrorReasonPresenter.present(any()))
+                .willReturn(ERROR_MESSAGE);
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
 
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
@@ -439,9 +445,12 @@ class MessageRouterPublisherImplTest {
         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
 
         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+        given(clientErrorReasonPresenter.present(any()))
+                .willReturn(ERROR_MESSAGE);
         given(httpClient.call(any(HttpRequest.class)))
                 .willReturn(Mono.just(successHttpResponse))
                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
                 .put(jsonPublishRequest, doubleJsonMessageBatch);
@@ -463,9 +472,12 @@ class MessageRouterPublisherImplTest {
         final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
 
         final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+        given(clientErrorReasonPresenter.present(any()))
+                .willReturn(ERROR_MESSAGE);
         given(httpClient.call(any(HttpRequest.class)))
                 .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
                 .willReturn(Mono.just(successHttpResponse));
+
         // when
         final Flux<MessageRouterPublishResponse> responses = cut
                 .put(jsonPublishRequest, doubleJsonMessageBatch);
@@ -540,7 +552,7 @@ class MessageRouterPublisherImplTest {
     private void assertTimeoutError(MessageRouterPublishResponse response) {
         assertThat(response.failed()).isTrue();
         assertThat(response.items()).isEmpty();
-        assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER);
+        assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
     }
 
     private void verifySingleResponse(List<? extends JsonElement> threeMessages,
index 1f97001..0396eff 100644 (file)
@@ -35,6 +35,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouter
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
@@ -47,10 +48,12 @@ import reactor.core.publisher.Mono;
  */
 class MessageRouterSubscriberImplTest {
 
+    private static final String ERROR_MESSAGE = "Something went wrong";
     private final RxHttpClient httpClient = mock(RxHttpClient.class);
+    private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
     private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault();
     private final MessageRouterSubscriber
-            cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance());
+            cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter);
 
     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
     private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
@@ -136,7 +139,10 @@ class MessageRouterSubscriberImplTest {
     void getWithProperRequest_shouldReturnTimeoutError() {
 
         // given
-        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+        given(clientErrorReasonPresenter.present(any()))
+                .willReturn(ERROR_MESSAGE);
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
 
         // when
         final Mono<MessageRouterSubscribeResponse> responses = cut
@@ -145,7 +151,7 @@ class MessageRouterSubscriberImplTest {
 
         // then
         assertThat(response.failed()).isTrue();
-        assertThat(response.failReason()).contains("408 Request Timeout");
+        assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
         assertThat(response.hasElements()).isFalse();
 
 
@@ -156,4 +162,4 @@ class MessageRouterSubscriberImplTest {
                 mrRequest.consumerGroup(), mrRequest.consumerId()));
         assertThat(httpRequest.body()).isNull();
     }
-}
\ No newline at end of file
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapRetryConfigTest.java
new file mode 100644 (file)
index 0000000..da3a88f
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class DmaapRetryConfigTest {
+    @Test
+    void shouldSuccessfullyCreateObject() {
+        DmaapRetryConfig retryConfig = ImmutableDmaapRetryConfig.builder()
+                .retryIntervalInSeconds(1)
+                .retryCount(0)
+                .build();
+
+        assertThat(retryConfig.retryIntervalInSeconds()).isOne();
+        assertThat(retryConfig.retryCount()).isZero();
+    }
+
+    @Test
+    void shouldSuccessfullyCreateObjectForDefaults() {
+        DmaapRetryConfig retryConfig = ImmutableDmaapRetryConfig.builder().build();
+
+        assertThat(retryConfig.retryIntervalInSeconds()).isOne();
+        assertThat(retryConfig.retryCount()).isEqualTo(3);
+    }
+
+    @Test
+    void shouldThrowInvalidArgumentExceptionForInvalidRetryInterval() {
+        assertThrows(IllegalArgumentException.class, () -> withRetryInterval(0));
+        assertThrows(IllegalArgumentException.class, () -> withRetryInterval(-3));
+    }
+
+    @Test
+    void shouldThrowInvalidArgumentExceptionForInvalidRetryCount() {
+        assertThrows(IllegalArgumentException.class, () -> withRetryCount(-1));
+        assertThrows(IllegalArgumentException.class, () -> withRetryCount(-3));
+    }
+
+    private void withRetryInterval(int ri) {
+        ImmutableDmaapRetryConfig.builder()
+                .retryIntervalInSeconds(ri)
+                .build();
+    }
+
+    private void withRetryCount(int rc) {
+        ImmutableDmaapRetryConfig.builder()
+                .retryCount(rc)
+                .build();
+    }
+}
index ab6641c..26eb176 100644 (file)
@@ -67,11 +67,11 @@ services:
       - zookeeper
       - kafka
 
-  toxiproxy:
-    image: shopify/toxiproxy:2.1.4
+  mockserver:
+    image: mockserver/mockserver:mockserver-5.11.2
+    command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost dmaap
     ports:
-      - "8474:8474"
-      - "8666:8666"
+      - "1080:1090"
     networks:
       - net
     depends_on:
index 77b842d..d0bdf41 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
+
 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vavr.collection.HashSet;
 import io.vavr.collection.Stream;
 import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClient;
 import reactor.netty.http.client.HttpClient.ResponseReceiver;
 import reactor.netty.http.client.HttpClientRequest;
 import reactor.netty.http.client.HttpClientResponse;
+import reactor.util.retry.Retry;
+import reactor.util.retry.RetryBackoffSpec;
 
 import java.util.stream.Collectors;
 
@@ -39,17 +46,23 @@ public class RxHttpClient {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class);
     private final HttpClient httpClient;
+    private RetryConfig retryConfig;
 
     RxHttpClient(HttpClient httpClient) {
         this.httpClient = httpClient;
     }
 
+    RxHttpClient(HttpClient httpClient, RetryConfig retryConfig) {
+        this(httpClient);
+        this.retryConfig = retryConfig;
+    }
+
     public Mono<HttpResponse> call(HttpRequest request) {
-        return prepareRequest(request)
-                .responseSingle((resp, content) ->
-                        content.asByteArray()
-                                .defaultIfEmpty(new byte[0])
-                                .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes)));
+        Mono<HttpResponse> httpResponseMono = response(request);
+        return Option.of(retryConfig)
+                .map(rc -> retryConfig(rc, request.diagnosticContext()))
+                .map(httpResponseMono::retryWhen)
+                .getOrElse(() -> httpResponseMono);
     }
 
     ResponseReceiver<?> prepareRequest(HttpRequest request) {
@@ -65,6 +78,27 @@ public class RxHttpClient {
         return prepareBody(request, theClient);
     }
 
+    private Mono<HttpResponse> response(HttpRequest request) {
+        return prepareRequest(request)
+                .responseSingle((resp, content) -> mapResponse(request.url(), resp.status(), content));
+    }
+
+    private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) {
+        if (shouldRetry(status.code())) {
+            return Mono.error(new RetryConfig.RetryableException());
+        }
+        return content.asByteArray()
+                .defaultIfEmpty(new byte[0])
+                .map(bytes -> new NettyHttpResponse(url, status, bytes));
+    }
+
+    private boolean shouldRetry(int code) {
+        return Option.of(retryConfig)
+                .map(RetryConfig::retryableHttpResponseCodes)
+                .getOrElse(HashSet::empty)
+                .contains(code);
+    }
+
     private ResponseReceiver<?> prepareBody(HttpRequest request, HttpClient theClient) {
         if (request.body() == null) {
             return prepareBodyWithoutContents(request, theClient);
@@ -79,7 +113,7 @@ public class RxHttpClient {
         return theClient
                 .headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED))
                 .request(request.method().asNetty())
-                .send(request.body().contents())
+                .send(Flux.from(request.body().contents()))
                 .uri(request.url());
     }
 
@@ -87,7 +121,7 @@ public class RxHttpClient {
         return theClient
                 .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString()))
                 .request(request.method().asNetty())
-                .send(request.body().contents())
+                .send(Flux.from(request.body().contents()))
                 .uri(request.url());
     }
 
@@ -114,4 +148,22 @@ public class RxHttpClient {
         context.withSlf4jMdc(LOGGER.isDebugEnabled(),
                 () -> LOGGER.debug("Response status: {}", httpClientResponse.status()));
     }
+
+    private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) {
+        RetryBackoffSpec retry = Retry
+                .fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval())
+                .doBeforeRetry(retrySignal -> context.withSlf4jMdc(
+                        LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
+                .filter(ex -> isRetryable(retryConfig, ex));
+
+        return Option.of(retryConfig.onRetryExhaustedException())
+                .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex))
+                .getOrElse(retry);
+    }
+
+    private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
+        return retryConfig.retryableExceptions()
+                .toStream()
+                .exists(clazz -> clazz.isAssignableFrom(ex.getClass()));
+    }
 }
index 9b23f1d..118df52 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,7 +21,9 @@
 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 
 import io.netty.handler.ssl.SslContext;
+import io.vavr.control.Option;
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RxHttpClientConfig;
 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
 import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
 import org.onap.dcaegen2.services.sdk.security.ssl.TrustStoreKeys;
@@ -42,23 +44,53 @@ public final class RxHttpClientFactory {
         return new RxHttpClient(HttpClient.create());
     }
 
+    public static RxHttpClient create(RxHttpClientConfig config) {
+        return createWithConfig(HttpClient.create(), config);
+    }
 
     public static RxHttpClient create(SecurityKeys securityKeys) {
         final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys);
         return create(context);
     }
 
+    public static RxHttpClient create(SecurityKeys securityKeys, RxHttpClientConfig config) {
+        final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys);
+        return create(context, config);
+    }
+
     public static RxHttpClient create(TrustStoreKeys trustStoreKeys) {
         final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys);
         return create(context);
     }
 
+    public static RxHttpClient create(TrustStoreKeys trustStoreKeys, RxHttpClientConfig config) {
+        final SslContext context = SSL_FACTORY.createSecureClientContext(trustStoreKeys);
+        return create(context, config);
+    }
+
     public static RxHttpClient createInsecure() {
         final SslContext context = SSL_FACTORY.createInsecureClientContext();
         return create(context);
     }
 
+    public static RxHttpClient createInsecure(RxHttpClientConfig config) {
+        final SslContext context = SSL_FACTORY.createInsecureClientContext();
+        return create(context, config);
+    }
+
     private static RxHttpClient create(@NotNull SslContext sslContext) {
-        return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
+        HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
+        return new RxHttpClient(secure);
+    }
+
+    private static RxHttpClient create(@NotNull SslContext sslContext, RxHttpClientConfig config) {
+        HttpClient secure = HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
+        return createWithConfig(secure, config);
+    }
+
+    private static RxHttpClient createWithConfig(HttpClient httpClient, RxHttpClientConfig config) {
+        return Option.of(config.retryConfig())
+                .map(retryConfig -> new RxHttpClient(httpClient, retryConfig))
+                .getOrElse(() -> new RxHttpClient(httpClient));
     }
 }
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RetryConfig.java
new file mode 100644 (file)
index 0000000..a0ae199
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config;
+
+import io.vavr.collection.HashSet;
+import io.vavr.collection.Set;
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+import java.time.Duration;
+
+@Value.Immutable
+public interface RetryConfig {
+
+    int retryCount();
+
+    Duration retryInterval();
+
+    @Value.Default
+    default Set<Integer> retryableHttpResponseCodes() {
+        return HashSet.empty();
+    }
+
+    @Value.Default
+    default Set<Class<? extends Throwable>> customRetryableExceptions() {
+        return HashSet.empty();
+    }
+
+    @Value.Derived
+    default Set<Class<? extends Throwable>> retryableExceptions() {
+        Set<Class<? extends Throwable>> result = customRetryableExceptions();
+        if (retryableHttpResponseCodes().nonEmpty()) {
+            result = result.add(RetryableException.class);
+        }
+        return result;
+    }
+
+    @Nullable RuntimeException onRetryExhaustedException();
+
+    class RetryableException extends RuntimeException {}
+}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/config/RxHttpClientConfig.java
new file mode 100644 (file)
index 0000000..78a88a4
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config;
+
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+
+@Value.Immutable
+public interface RxHttpClientConfig {
+    @Nullable RetryConfig retryConfig();
+}
index 4795b00..8ac0d1d 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test;
 
 import io.vavr.CheckedFunction0;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
+import io.vavr.Tuple3;
+import io.vavr.control.Try;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +32,13 @@ import reactor.netty.http.server.HttpServer;
 import reactor.netty.http.server.HttpServerResponse;
 import reactor.netty.http.server.HttpServerRoutes;
 
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since February 2019
@@ -63,11 +67,26 @@ public class DummyHttpServer {
         return responses[state.getAndIncrement()];
     }
 
+    public static Publisher<Void> sendInOrderWithDelay(AtomicInteger counter, Tuple3<HttpServerResponse, Integer, Duration>... responses) {
+        Tuple3<HttpServerResponse, Integer, Duration> tuple = responses[counter.get()];
+        HttpServerResponse httpServerResponse = tuple._1;
+        Integer statusCode = tuple._2;
+        long timeout = tuple._3.toMillis();
+        Try.run(() -> Thread.sleep(timeout));
+        counter.incrementAndGet();
+        return sendString(httpServerResponse.status(statusCode), Mono.just("OK"));
+    }
+
+    public static Publisher<Void> sendWithDelay(HttpServerResponse response, int statusCode, Duration timeout) {
+        Try.run(() -> Thread.sleep(timeout.toMillis()));
+        return sendString(response.status(statusCode), Mono.just("OK"));
+    }
+
     public static Publisher<Void> sendResource(HttpServerResponse httpServerResponse, String resourcePath) {
         return sendString(httpServerResponse, Mono.fromCallable(() -> readResource(resourcePath)));
     }
 
-    public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message){
+    public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message) {
         return sendString(httpServerResponse.status(statusCode), Mono.just(message));
     }
 
@@ -79,6 +98,11 @@ public class DummyHttpServer {
         server.disposeNow();
     }
 
+    public DummyHttpServer closeAndGet() {
+        server.disposeNow();
+        return this;
+    }
+
     public String host() {
         return server.host();
     }
index 6f3a090..daf04c6 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,33 +22,55 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.timeout.ReadTimeoutException;
+import io.vavr.Tuple;
+import io.vavr.collection.HashSet;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.net.ConnectException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendInOrderWithDelay;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
 
 class RxHttpClientIT {
 
     private static final Duration TIMEOUT = Duration.ofHours(5);
-    private final RxHttpClient cut = RxHttpClientFactory.create();
-    private static DummyHttpServer httpServer;
-
-    @BeforeAll
-    static void setUpClass() {
-        httpServer = DummyHttpServer.start(routes -> routes
-                .get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK")))
-                .get("/delayed-get", (req, resp) -> sendString(resp, Mono.just("OK").delayElement(Duration.ofMinutes(1))))
+    private static final Duration NO_DELAY = Duration.ofSeconds(0);
+    private static final int RETRY_COUNT = 1;
+    private static final int EXPECTED_REQUESTS_WHEN_RETRY = RETRY_COUNT + 1;
+    private static final DummyHttpServer HTTP_SERVER = initialize();
+    private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+    private static final Mono<String> OK = Mono.just("OK");
+    private static final Duration RETRY_INTERVAL = Duration.ofMillis(1);
+    private static AtomicInteger REQUEST_COUNTER;
+
+    private static DummyHttpServer initialize() {
+        return DummyHttpServer.start(routes -> routes
+                .get("/sample-get", (req, resp) -> sendString(resp, OK))
+                .get("/delay-get", (req, resp) ->
+                        sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, Duration.ofSeconds(3))))
                 .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
+                .get("/retry-get-500", (req, resp) ->
+                        sendInOrderWithDelay(REQUEST_COUNTER,
+                                Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 500, NO_DELAY)))
+                .get("/retry-get-400", (req, resp) ->
+                        sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 400, NO_DELAY)))
+                .get("/retry-get-500-200", (req, resp) ->
+                        sendInOrderWithDelay(REQUEST_COUNTER,
+                                Tuple.of(resp, 500, NO_DELAY), Tuple.of(resp, 200, NO_DELAY)))
+                .get("/retry-get-200", (req, resp) ->
+                        sendInOrderWithDelay(REQUEST_COUNTER, Tuple.of(resp, 200, NO_DELAY)))
                 .post("/headers-post", (req, resp) -> resp
                         .sendString(Mono.just(req.requestHeaders().toString())))
                 .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
@@ -57,12 +79,7 @@ class RxHttpClientIT {
 
     @AfterAll
     static void tearDownClass() {
-        httpServer.close();
-    }
-
-    private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
-        return ImmutableHttpRequest.builder()
-                .url(new URL("http", httpServer.host(), httpServer.port(), path).toString());
+        HTTP_SERVER.close();
     }
 
     @Test
@@ -71,6 +88,7 @@ class RxHttpClientIT {
         final HttpRequest httpRequest = requestFor("/sample-get")
                 .method(HttpMethod.GET)
                 .build();
+        final RxHttpClient cut = RxHttpClientFactory.create();
 
         // when
         final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -90,6 +108,7 @@ class RxHttpClientIT {
         final HttpRequest httpRequest = requestFor("/sample-get-500")
                 .method(HttpMethod.GET)
                 .build();
+        final RxHttpClient cut = RxHttpClientFactory.create();
 
         // when
         final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -110,6 +129,7 @@ class RxHttpClientIT {
                 .method(HttpMethod.POST)
                 .body(RequestBody.fromString(requestBody))
                 .build();
+        final RxHttpClient cut = RxHttpClientFactory.create();
 
         // when
         final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -131,6 +151,7 @@ class RxHttpClientIT {
                 .method(HttpMethod.POST)
                 .body(RequestBody.chunkedFromString(Mono.just(requestBody)))
                 .build();
+        final RxHttpClient cut = RxHttpClientFactory.create();
 
         // when
         final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -155,6 +176,7 @@ class RxHttpClientIT {
                 .method(HttpMethod.POST)
                 .body(RequestBody.fromString(requestBody))
                 .build();
+        final RxHttpClient cut = RxHttpClientFactory.create();
 
         // when
         final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -174,10 +196,12 @@ class RxHttpClientIT {
     @Test
     void getWithTimeoutError() throws Exception {
         // given
-        final HttpRequest httpRequest = requestFor("/delayed-get")
+        REQUEST_COUNTER = new AtomicInteger();
+        final HttpRequest httpRequest = requestFor("/delay-get")
                 .method(HttpMethod.GET)
-                .timeout(Duration.ofSeconds(1))
+                .timeout(Duration.ofMillis(1))
                 .build();
+        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder().build());
 
         // when
         final Mono<HttpResponse> response = cut.call(httpRequest);
@@ -186,5 +210,208 @@ class RxHttpClientIT {
         StepVerifier.create(response)
                 .expectError(ReadTimeoutException.class)
                 .verify(TIMEOUT);
+        assertNoServerResponse();
+    }
+
+    @Test
+    void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception {
+        // given
+        REQUEST_COUNTER = new AtomicInteger();
+        final HttpRequest httpRequest = requestForClosedServer("/sample-get")
+                .method(HttpMethod.GET)
+                .build();
+        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+                .retryConfig(defaultRetryConfig()
+                        .customRetryableExceptions(HashSet.of(ConnectException.class))
+                        .build())
+                .build());
+
+        // when
+        final Mono<HttpResponse> response = cut.call(httpRequest);
+
+        // then
+        StepVerifier.create(response)
+                .expectError(IllegalStateException.class)
+                .verify(TIMEOUT);
+        assertNoServerResponse();
+    }
+
+    @Test
+    void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception {
+        // given
+        REQUEST_COUNTER = new AtomicInteger();
+        final HttpRequest httpRequest = requestForClosedServer("/sample-get")
+                .method(HttpMethod.GET)
+                .build();
+        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+                .retryConfig(defaultRetryConfig()
+                        .customRetryableExceptions(HashSet.of(ConnectException.class))
+                        .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
+                        .build())
+                .build());
+
+        // when
+        final Mono<HttpResponse> response = cut.call(httpRequest);
+
+        // then
+        StepVerifier.create(response)
+                .expectError(ReadTimeoutException.class)
+                .verify(TIMEOUT);
+        assertNoServerResponse();
+    }
+
+    @Test
+    void getWithRetryExhaustedExceptionWhen500() throws Exception {
+        // given
+        REQUEST_COUNTER = new AtomicInteger();
+        final HttpRequest httpRequest = requestFor("/retry-get-500")
+                .method(HttpMethod.GET)
+                .build();
+        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+                .retryConfig(defaultRetryConfig()
+                        .retryableHttpResponseCodes(HashSet.of(500))
+                        .build())
+                .build());
+
+        // when
+        final Mono<HttpResponse> response = cut.call(httpRequest);
+
+        // then
+        StepVerifier.create(response)
+                .expectError(IllegalStateException.class)
+                .verify(TIMEOUT);
+        assertRetry();
+    }
+
+    @Test
+    void getWithCustomRetryExhaustedExceptionWhen500() throws Exception {
+        // given
+        REQUEST_COUNTER = new AtomicInteger();
+        final HttpRequest httpRequest = requestFor("/retry-get-500")
+                .method(HttpMethod.GET)
+                .build();
+        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+                .retryConfig(defaultRetryConfig()
+                        .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
+                        .retryableHttpResponseCodes(HashSet.of(500))
+                        .build())
+                .build());
+
+        // when
+        final Mono<HttpResponse> response = cut.call(httpRequest);
+
+        // then
+        StepVerifier.create(response)
+                .expectError(ReadTimeoutException.class)
+                .verify(TIMEOUT);
+        assertRetry();
+    }
+
+    @Test
+    void getWithRetryWhen500AndThen200() throws Exception {
+        // given
+        REQUEST_COUNTER = new AtomicInteger();
+        final HttpRequest httpRequest = requestFor("/retry-get-500-200")
+                .method(HttpMethod.GET)
+                .build();
+        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+                .retryConfig(defaultRetryConfig()
+                        .retryableHttpResponseCodes(HashSet.of(500))
+                        .build())
+                .build());
+
+        // when
+        final Mono<String> bodyAsString = cut.call(httpRequest)
+                .doOnNext(HttpResponse::throwIfUnsuccessful)
+                .map(HttpResponse::bodyAsString);
+
+        // then
+        StepVerifier.create(bodyAsString)
+                .expectNext("OK")
+                .expectComplete()
+                .verify(TIMEOUT);
+        assertRetry();
+    }
+
+    @Test
+    void getWithoutRetryWhen200() throws Exception {
+        // given
+        REQUEST_COUNTER = new AtomicInteger();
+        final HttpRequest httpRequest = requestFor("/retry-get-200")
+                .method(HttpMethod.GET)
+                .build();
+        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+                .retryConfig(defaultRetryConfig()
+                        .retryableHttpResponseCodes(HashSet.of(500))
+                        .build())
+                .build());
+
+        // when
+        final Mono<String> bodyAsString = cut.call(httpRequest)
+                .doOnNext(HttpResponse::throwIfUnsuccessful)
+                .map(HttpResponse::bodyAsString);
+
+        // then
+        StepVerifier.create(bodyAsString)
+                .expectNext("OK")
+                .expectComplete()
+                .verify(TIMEOUT);
+        assertNoRetry();
+    }
+
+    @Test
+    void getWithoutRetryWhen400() throws Exception {
+        // given
+        REQUEST_COUNTER = new AtomicInteger();
+        final HttpRequest httpRequest = requestFor("/retry-get-400")
+                .method(HttpMethod.GET)
+                .build();
+        final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
+                .retryConfig(defaultRetryConfig()
+                        .retryableHttpResponseCodes(HashSet.of(500))
+                        .build())
+                .build());
+
+        // when
+        Mono<HttpResponse> result = cut.call(httpRequest);
+
+        // then
+        StepVerifier.create(result)
+                .consumeNextWith(this::assert400)
+                .expectComplete()
+                .verify(TIMEOUT);
+        assertNoRetry();
+    }
+
+    private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException {
+        return ImmutableHttpRequest.builder()
+                .url(new URL("http", HTTP_SERVER.host(), HTTP_SERVER.port(), path).toString());
+    }
+
+    private ImmutableHttpRequest.Builder requestForClosedServer(String path) throws MalformedURLException {
+        return ImmutableHttpRequest.builder()
+                .url(new URL("http", DISPOSED_HTTP_SERVER.host(), DISPOSED_HTTP_SERVER.port(), path).toString());
+    }
+
+    private ImmutableRetryConfig.Builder defaultRetryConfig() {
+        return ImmutableRetryConfig.builder()
+                .retryCount(RETRY_COUNT)
+                .retryInterval(RETRY_INTERVAL);
+    }
+
+    private void assertRetry() {
+        assertThat(REQUEST_COUNTER.get()).isEqualTo(EXPECTED_REQUESTS_WHEN_RETRY);
+    }
+
+    private void assertNoRetry() {
+        assertThat(REQUEST_COUNTER.get()).isOne();
+    }
+
+    private void assertNoServerResponse() {
+        assertThat(REQUEST_COUNTER.get()).isZero();
+    }
+    
+    private void assert400(HttpResponse httpResponse) {
+        assertThat(httpResponse.statusCode()).isEqualTo(400);
     }
 }