Add timeout for Publisher(dmaap-client) 37/116537/4
authortkogut <tomasz.kogut@nokia.com>
Tue, 29 Dec 2020 14:13:22 +0000 (15:13 +0100)
committertkogut <tomasz.kogut@nokia.com>
Wed, 30 Dec 2020 15:48:26 +0000 (16:48 +0100)
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Ia5b7320bc3e491548a1fa1dba2d95843a98f01ae

44 files changed:
pom.xml
rest-services/cbs-client/pom.xml
rest-services/dmaap-client/pom.xml
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java [new file with mode: 0644]
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java [new file with mode: 0644]
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
rest-services/http-client/pom.xml
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java
rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java
rest-services/model/pom.xml
rest-services/pom.xml
security/crypt-password/pom.xml
security/pom.xml
security/ssl/pom.xml
services/external-schema-manager/pom.xml
services/hv-ves-client/pom.xml
services/hv-ves-client/producer/api/pom.xml
services/hv-ves-client/producer/ct/pom.xml
services/hv-ves-client/producer/impl/pom.xml
services/hv-ves-client/producer/pom.xml
services/hv-ves-client/protobuf/pom.xml
services/pom.xml
standardization/api-custom-header/pom.xml
standardization/moher-api/healthstate/pom.xml
standardization/moher-api/metrics/pom.xml
standardization/moher-api/pom.xml
standardization/moher-api/server-adapters/pom.xml
standardization/moher-api/server-adapters/reactor-netty/pom.xml
standardization/moher-api/server-adapters/spring-webflux/pom.xml
standardization/pom.xml

diff --git a/pom.xml b/pom.xml
index 7b6186c..80b0c91 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -11,7 +11,7 @@
 
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>sdk</artifactId>
-    <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
 
     <name>dcaegen2-services-sdk</name>
     <description>Common SDK repo for all DCAE Services</description>
@@ -78,6 +78,7 @@
         <spring.boot.version>2.4.0</spring.boot.version>
         <system.rules.version>1.17.2</system.rules.version>
         <openapi4j.version>1.0.3</openapi4j.version>
+        <toxiproxy-java.version>2.1.4</toxiproxy-java.version>
        <sonar.coverage.jacoco.xmlReportPaths>
          ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
        </sonar.coverage.jacoco.xmlReportPaths>
index 3b4af49..ea1eae0 100644 (file)
@@ -7,7 +7,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
         <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
index f81b04f..d619590 100644 (file)
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services.sdk</groupId>
     <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
       <groupId>org.testcontainers</groupId>
       <artifactId>junit-jupiter</artifactId>
     </dependency>
+    <dependency>
+      <groupId>eu.rekawek.toxiproxy</groupId>
+      <artifactId>toxiproxy-java</artifactId>
+      <version>${toxiproxy-java.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientError.java
new file mode 100644 (file)
index 0000000..57187c8
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface ClientError {
+    RequestError requestError();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReason.java
new file mode 100644 (file)
index 0000000..9754719
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.value.Value;
+import reactor.util.annotation.Nullable;
+
+import java.util.List;
+
+@Value.Immutable
+public interface ClientErrorReason {
+    String header();
+
+    String messageId();
+
+    String text();
+
+    @Nullable List<String> variables();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenter.java
new file mode 100644 (file)
index 0000000..6b22b37
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.vavr.control.Option;
+
+public class ClientErrorReasonPresenter {
+
+    private ClientErrorReasonPresenter() { }
+
+    private static final Gson GSON = new GsonBuilder().create();
+    private static final String PATTERN = "%s\n%s";
+
+    public static String present(ClientErrorReason clientErrorReason) {
+        ImmutableServiceException simpleServiceException = ImmutableServiceException.builder()
+                .messageId(clientErrorReason.messageId())
+                .text(clientErrorReason.text())
+                .build();
+        ImmutableServiceException serviceException = Option.of(clientErrorReason.variables())
+                .map(simpleServiceException::withVariables)
+                .getOrElse(simpleServiceException);
+        ImmutableRequestError requestError = ImmutableRequestError.builder()
+                .serviceException(serviceException)
+                .build();
+        ClientError clientError = ImmutableClientError.builder()
+                .requestError(requestError)
+                .build();
+        return String.format(PATTERN, clientErrorReason.header(), GSON.toJson(clientError));
+    }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasons.java
new file mode 100644 (file)
index 0000000..5a51e5f
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import java.util.Collections;
+
+public class ClientErrorReasons {
+
+    private ClientErrorReasons() { }
+
+    public static final ClientErrorReason TIMEOUT = ImmutableClientErrorReason.builder()
+            .header("408 Request Timeout")
+            .text("Client timeout exception occurred, Error code is %1")
+            .messageId("SVC0001")
+            .variables(Collections.singletonList("408")).build();
+
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/RequestError.java
new file mode 100644 (file)
index 0000000..71e673f
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface RequestError {
+    ServiceException serviceException();
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ServiceException.java
new file mode 100644 (file)
index 0000000..e99330a
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import reactor.util.annotation.Nullable;
+
+import java.util.List;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface ServiceException {
+
+    String messageId();
+
+    String text();
+
+    @Nullable List<String> variables();
+}
index 191ec64..16068da 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
-
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
+import io.netty.handler.timeout.ReadTimeoutException;
 import io.vavr.collection.HashMap;
 import io.vavr.collection.List;
-import java.time.Duration;
-import java.util.stream.Collectors;
+import io.vavr.control.Option;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
@@ -38,6 +36,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
@@ -47,6 +48,11 @@ import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.time.Duration;
+import java.util.stream.Collectors;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since March 2019
@@ -77,29 +83,34 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
         LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size());
         LOGGER.trace("The items to be sent: {}", batch);
         return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
-                .map(httpResponse -> buildResponse(httpResponse, batch));
+                .map(httpResponse -> buildResponse(httpResponse, batch))
+                .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
+                .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
     }
 
     private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
-        if(contentType == ContentType.APPLICATION_JSON) {
+        if (contentType == ContentType.APPLICATION_JSON) {
             final JsonArray elements = new JsonArray(subItems.size());
             subItems.forEach(elements::add);
             return RequestBody.fromJson(elements);
-        }else if(contentType == ContentType.TEXT_PLAIN){
+        } else if (contentType == ContentType.TEXT_PLAIN) {
             String messages = subItems.map(JsonElement::toString)
                     .collect(Collectors.joining("\n"));
             return RequestBody.fromString(messages);
-        }else throw new IllegalArgumentException("Unsupported content type: " + contentType);
+        } else throw new IllegalArgumentException("Unsupported content type: " + contentType);
     }
 
     private @NotNull HttpRequest buildHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
-        return ImmutableHttpRequest.builder()
+        ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
                 .method(HttpMethod.POST)
                 .url(request.sinkDefinition().topicUrl())
                 .diagnosticContext(request.diagnosticContext().withNewInvocationId())
                 .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType().toString()))
-                .body(body)
-                .build();
+                .body(body);
+
+        return Option.of(request.timeoutConfig())
+                .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
+                .getOrElse(requestBuilder::build);
     }
 
     private MessageRouterPublishResponse buildResponse(
@@ -111,4 +122,11 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
                 ? builder.items(batch).build()
                 : builder.failReason(extractFailReason(httpResponse)).build();
     }
+
+    private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+        String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+        return Mono.just(ImmutableMessageRouterPublishResponse.builder()
+                .failReason(failReason)
+                .build());
+    }
 }
index 314756d..4490c79 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
 
 import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -33,6 +35,8 @@ public interface MessageRouterPublishRequest extends DmaapRequest {
 
     MessageRouterSink sinkDefinition();
 
+    @Nullable TimeoutConfig timeoutConfig();
+
     @Value.Default
     default ContentType contentType() {
         return ContentType.APPLICATION_JSON;
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/TimeoutConfig.java
new file mode 100644 (file)
index 0000000..413bf8e
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import org.immutables.value.Value;
+
+import java.time.Duration;
+
+@Value.Immutable
+public interface TimeoutConfig {
+
+    @Value.Default
+    default Duration getTimeout() {
+        return Duration.ofSeconds(4);
+    }
+}
index 8561e0b..d94639a 100644 (file)
@@ -28,7 +28,6 @@ import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
@@ -39,30 +38,39 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableTimeoutConfig;
 import reactor.core.publisher.Flux;
 
+import java.time.Duration;
+
 
 public final class MessageRouterTestsUtils {
-    private MessageRouterTestsUtils() {}
+    private MessageRouterTestsUtils() {
+    }
 
-    public static MessageRouterPublishRequest createPublishRequest(String topicUrl){
+    public static MessageRouterPublishRequest createPublishRequest(String topicUrl) {
         return createPublishRequest(topicUrl, ContentType.APPLICATION_JSON);
     }
 
-    public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType){
-        MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
-                .name("the topic")
-                .topicUrl(topicUrl)
+    public static MessageRouterPublishRequest createPublishRequest(String topicUrl, Duration timeout) {
+        return ImmutableMessageRouterPublishRequest.builder()
+                .sinkDefinition(createMessageRouterSink(topicUrl))
+                .contentType(ContentType.APPLICATION_JSON)
+                .timeoutConfig(ImmutableTimeoutConfig.builder()
+                        .timeout(timeout)
+                        .build())
                 .build();
+    }
 
+    public static MessageRouterPublishRequest createPublishRequest(String topicUrl, ContentType contentType) {
         return ImmutableMessageRouterPublishRequest.builder()
-                .sinkDefinition(sinkDefinition)
+                .sinkDefinition(createMessageRouterSink(topicUrl))
                 .contentType(contentType)
                 .build();
     }
 
     public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
-            String consumerGroup, String consumerId) {
+                                                                         String consumerGroup, String consumerId) {
         ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
                 .name("the topic")
                 .topicUrl(topicUrl)
@@ -76,52 +84,53 @@ public final class MessageRouterTestsUtils {
                 .build();
     }
 
-    public static List<JsonElement> getAsJsonElements(List<String> messages){
+    public static List<JsonElement> getAsJsonElements(List<String> messages) {
         return messages.map(JsonParser::parseString);
     }
 
-    public static List<JsonObject> getAsJsonObjects(List<String> messages){
+    public static List<JsonObject> getAsJsonObjects(List<String> messages) {
         return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
     }
 
-    public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages){
+    public static List<JsonPrimitive> getAsJsonPrimitives(List<String> messages) {
         return getAsJsonElements(messages).map(JsonElement::getAsJsonPrimitive);
     }
 
-    public static JsonObject getAsJsonObject(String item){
+    public static JsonObject getAsJsonObject(String item) {
         return new Gson().fromJson(item, JsonObject.class);
     }
 
-    public static Flux<JsonElement> plainBatch(List<String> messages){
+    public static Flux<JsonElement> plainBatch(List<String> messages) {
         return Flux.fromIterable(getAsJsonElements(messages));
     }
 
-    public static Flux<JsonObject> jsonBatch(List<String> messages){
+    public static Flux<JsonObject> jsonBatch(List<String> messages) {
         return Flux.fromIterable(getAsJsonObjects(messages));
     }
 
-    public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs){
+    public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs) {
         return ImmutableMessageRouterSubscribeResponse
                 .builder()
                 .failReason(String.format(failReasonFormat, formatArgs))
                 .build();
     }
 
-    public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items){
+    public static MessageRouterSubscribeResponse successSubscribeResponse(List<JsonElement> items) {
         return ImmutableMessageRouterSubscribeResponse
                 .builder()
                 .items(items)
                 .build();
     }
 
-    public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs){
+    public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs) {
+        String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
         return ImmutableMessageRouterPublishResponse
                 .builder()
-                .failReason(String.format(failReasonFormat, formatArgs))
+                .failReason(failReason)
                 .build();
     }
 
-    public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items){
+    public static MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
         return ImmutableMessageRouterPublishResponse
                 .builder()
                 .items(items)
@@ -129,7 +138,7 @@ public final class MessageRouterTestsUtils {
     }
 
     public static void registerTopic(MessageRouterPublisher publisher, MessageRouterPublishRequest publishRequest,
-            MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
+                                     MessageRouterSubscriber subscriber, MessageRouterSubscribeRequest subscribeRequest) {
         final List<String> sampleJsonMessages = List.of("{\"message\":\"message1\"}",
                 "{\"differentMessage\":\"message2\"}");
         final Flux<JsonObject> jsonMessageBatch = MessageRouterTestsUtils.jsonBatch(sampleJsonMessages);
@@ -137,4 +146,11 @@ public final class MessageRouterTestsUtils {
         publisher.put(publishRequest, jsonMessageBatch).blockLast();
         subscriber.get(subscribeRequest).block();
     }
+
+    private static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
+        return ImmutableMessageRouterSink.builder()
+                .name("the topic")
+                .topicUrl(topicUrl)
+                .build();
+    }
 }
index a314ccf..494ca62 100644 (file)
@@ -31,6 +31,8 @@ final class DMaapContainer {
             MR_COMPOSE_RESOURCE_NAME);
     static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
     static final String DMAAP_SERVICE_NAME = "dmaap";
+    static final int PROXY_SERVICE_EXPOSED_PORT = 8666;
+    static final String LOCALHOST = "localhost";
 
     private DMaapContainer() {}
 
index f62359d..24cd2c3 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.ToxiproxyClient;
 import io.vavr.collection.List;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -39,8 +41,11 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.io.IOException;
 import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
+import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorPublishResponse;
@@ -50,6 +55,10 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successPublishResponse;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
 
 @Testcontainers
 class MessageRouterPublisherIT {
@@ -64,23 +73,38 @@ class MessageRouterPublisherIT {
             + "Successfully published number of messages :0."
             + "Expected { to start an object.\",\"status\":400"
             + "}";
+    private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+            + "{"
+            + "\"requestError\":"
+            + "{"
+            + "\"serviceException\":"
+            + "{"
+            + "\"messageId\":\"SVC0001\","
+            + "\"text\":\"Client timeout exception occurred, Error code is %1\","
+            + "\"variables\":[\"408\"]"
+            + "}"
+            + "}"
+            + "}";
+    private static Proxy DMAAP_PROXY;
     private static String EVENTS_PATH;
+    private static String PROXY_EVENTS_PATH;
     private final MessageRouterPublisher publisher = DmaapClientFactory
             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-    private MessageRouterSubscriber subscriber = DmaapClientFactory
+    private final MessageRouterSubscriber subscriber = DmaapClientFactory
             .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
 
     @BeforeAll
-    static void setUp() {
-        EVENTS_PATH = String.format("http://%s:%d/events",
-                CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
-                        DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
-                CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
-                        DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
+    static void setUp() throws IOException {
+        EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
+        PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
+
+        DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
+                String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
+                String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
     }
 
     @Test
-    void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch(){
+    void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
         //given
         final String topic = "TOPIC";
         final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
@@ -100,7 +124,7 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldHandleBadRequestError(){
+    void publisher_shouldHandleBadRequestError() {
         //given
         final String topic = "TOPIC2";
         final List<String> threePlainTextMessages = List.of("I", "like", "pizza");
@@ -120,7 +144,7 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldSuccessfullyPublishSingleMessage(){
+    void publisher_shouldSuccessfullyPublishSingleMessage() {
         //given
         final String topic = "TOPIC3";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -145,7 +169,7 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldSuccessfullyPublishMultipleMessages(){
+    void publisher_shouldSuccessfullyPublishMultipleMessages() {
         final String topic = "TOPIC5";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
         final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}",
@@ -170,7 +194,7 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType(){
+    void publisher_shouldSuccessfullyPublishSingleJsonMessageWithPlainContentType() {
         //given
         final String topic = "TOPIC6";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -197,7 +221,7 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType(){
+    void publisher_shouldSuccessfullyPublishMultipleJsonMessagesWithPlainContentType() {
         //given
         final String topic = "TOPIC7";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -224,7 +248,7 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType(){
+    void publisher_shouldSuccessfullyPublishSinglePlainMessageWithPlainContentType() {
         //given
         final String topic = "TOPIC8";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -251,7 +275,7 @@ class MessageRouterPublisherIT {
     }
 
     @Test
-    void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType(){
+    void publisher_shouldSuccessfullyPublishMultiplePlainMessagesWithPlainContentType() {
         //given
         final String topic = "TOPIC9";
         final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
@@ -276,4 +300,30 @@ class MessageRouterPublisherIT {
                 .expectComplete()
                 .verify();
     }
+
+    @Test
+    void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() throws IOException {
+        //given
+        final String toxic = "latency-toxic";
+        DMAAP_PROXY.toxics()
+                .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
+        final String topic = "TOPIC10";
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final Flux<JsonObject> messageBatch = jsonBatch(singleJsonMessage);
+        final MessageRouterPublishRequest mrRequest = createPublishRequest(
+                String.format("%s/%s", PROXY_EVENTS_PATH, topic), Duration.ofSeconds(1));
+        final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.put(mrRequest, messageBatch);
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+
+        //cleanup
+        DMAAP_PROXY.toxics().get(toxic).remove();
+    }
 }
index 1268a16..b0a07ed 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
-
 import com.google.gson.JsonElement;
 import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
-
-import java.time.Duration;
-
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
@@ -44,6 +38,11 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.time.Duration;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since May 2019
@@ -69,18 +68,19 @@ class MessageRouterPublisherTest {
 
     @BeforeAll
     static void setUp() {
-        server = DummyHttpServer.start(routes ->
-                routes.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
-                        .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
-                                sendError(resp, 400, ERROR_MESSAGE))
-                        .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
-                                sendError(resp, 401, ERROR_MESSAGE))
-                        .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
-                                sendError(resp, 403, ERROR_MESSAGE))
-                        .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
-                                sendError(resp, 404, ERROR_MESSAGE))
-                        .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
-                                sendError(resp, 500, ERROR_MESSAGE))
+        server = DummyHttpServer.start(routes -> routes
+                .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) ->
+                        sendString(resp, Mono.just("OK")))
+                .post(FAILING_WITH_400_RESP_PATH, (req, resp) ->
+                        sendError(resp, 400, ERROR_MESSAGE))
+                .post(FAILING_WITH_401_RESP_PATH, (req, resp) ->
+                        sendError(resp, 401, ERROR_MESSAGE))
+                .post(FAILING_WITH_403_RESP_PATH, (req, resp) ->
+                        sendError(resp, 403, ERROR_MESSAGE))
+                .post(FAILING_WITH_404_RESP_PATH, (req, resp) ->
+                        sendError(resp, 404, ERROR_MESSAGE))
+                .post(FAILING_WITH_500_TOPIC_PATH, (req, resp) ->
+                        sendError(resp, 500, ERROR_MESSAGE))
         );
     }
 
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/error/ClientErrorReasonPresenterTest.java
new file mode 100644 (file)
index 0000000..9b318d7
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class ClientErrorReasonPresenterTest {
+
+    @Test
+    void shouldSuccessfullyPresent() {
+        //given
+        ClientErrorReason clientErrorReason = createSimple();
+        String expected = "header\n"
+                + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\"}}}";
+
+        //when
+        String actual = ClientErrorReasonPresenter.present(clientErrorReason);
+
+        //then
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    void shouldSuccessfullyPresentWithVariables() {
+        //given
+        ClientErrorReason clientErrorReason = createSimple().withVariables("v1", "v2");
+        String expected = "header\n"
+                + "{\"requestError\":{\"serviceException\":{\"messageId\":\"messageId\",\"text\":\"text\",\"variables\":[\"v1\",\"v2\"]}}}";
+
+        //when
+        String actual = ClientErrorReasonPresenter.present(clientErrorReason);
+
+        //then
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private ImmutableClientErrorReason createSimple() {
+        return ImmutableClientErrorReason.builder()
+                .header("header")
+                .messageId("messageId")
+                .text("text")
+                .build();
+    }
+}
index 38659ac..f29bfa2 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.*;
-
+import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import com.google.gson.Gson;
 import com.google.gson.JsonPrimitive;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.timeout.ReadTimeoutException;
 import io.vavr.collection.List;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
@@ -54,6 +45,22 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonElements;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonObjects;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.getAsJsonPrimitives;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.jsonBatch;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.plainBatch;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since April 2019
@@ -62,6 +69,7 @@ class MessageRouterPublisherImplTest {
     private static final Duration TIMEOUT = Duration.ofSeconds(5);
     private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
     private static final int MAX_BATCH_SIZE = 3;
+    public static final String TIMEOUT_ERROR_MESSAGE_HEADER = "408 Request Timeout";
     private final RxHttpClient httpClient = mock(RxHttpClient.class);
     private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1));
     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
@@ -87,11 +95,11 @@ class MessageRouterPublisherImplTest {
         assertThat(httpRequest.method()).isEqualTo(HttpMethod.POST);
         assertThat(httpRequest.url()).isEqualTo(TOPIC_URL);
         assertThat(httpRequest.body()).isNotNull();
-        assertThat(httpRequest.body().length()).isGreaterThan(0);
+        assertThat(httpRequest.body().length()).isPositive();
     }
 
     @Test
-    void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+    void onPut_givenJsonMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
         // given
         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
@@ -115,9 +123,8 @@ class MessageRouterPublisherImplTest {
     }
 
 
-
     @Test
-    void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+    void onPut_givenJsonMessagesWithPlainContentType_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
         // given
         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
         final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
@@ -143,7 +150,7 @@ class MessageRouterPublisherImplTest {
     }
 
     @Test
-    void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest(){
+    void onPut_givenPlainMessages_whenTheirAmountIsNotAboveMaxBatchSize_shouldSendSingleHttpRequest() {
         // given
         final List<String> threePlainMessages = List.of("I", "like", "cookies");
         final List<JsonPrimitive> parsedThreeMessages = getAsJsonPrimitives(threePlainMessages);
@@ -168,7 +175,7 @@ class MessageRouterPublisherImplTest {
     }
 
     @Test
-    void puttingElementsWithoutContentTypeSetShouldUseApplicationJson(){
+    void puttingElementsWithoutContentTypeSetShouldUseApplicationJson() {
         // given
         final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
         final Flux<JsonObject> singleJsonMessageBatch = jsonBatch(threeJsonMessages);
@@ -267,7 +274,7 @@ class MessageRouterPublisherImplTest {
 
         final JsonArray secondRequest = extractNonEmptyJsonRequestBody(httpRequests.get(1));
         assertThat(secondRequest.size()).describedAs("Http request second batch size")
-                .isEqualTo(MAX_BATCH_SIZE-1);
+                .isEqualTo(MAX_BATCH_SIZE - 1);
         assertListsContainSameElements(secondRequest, parsedTwoMessages);
     }
 
@@ -303,7 +310,7 @@ class MessageRouterPublisherImplTest {
         final List<JsonObject> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
                 .map(JsonElement::getAsJsonObject);
         assertThat(secondRequest.size()).describedAs("Http request second batch size")
-                .isEqualTo(MAX_BATCH_SIZE-1);
+                .isEqualTo(MAX_BATCH_SIZE - 1);
         assertListsContainSameElements(secondRequest, parsedTwoMessages);
     }
 
@@ -339,7 +346,7 @@ class MessageRouterPublisherImplTest {
         final List<JsonPrimitive> secondRequest = extractNonEmptyPlainRequestBody(httpRequests.get(1))
                 .map(JsonElement::getAsJsonPrimitive);
         assertThat(secondRequest.size()).describedAs("Http request second batch size")
-                .isEqualTo(MAX_BATCH_SIZE-1);
+                .isEqualTo(MAX_BATCH_SIZE - 1);
         assertListsContainSameElements(secondRequest, parsedTwoPlainMessages);
     }
 
@@ -404,12 +411,79 @@ class MessageRouterPublisherImplTest {
         verifyDoubleResponse(parsedThreeMessages, parsedTwoMessages, responses);
     }
 
-    private static List<String> getAsMRJsonMessages(List<String> plainTextMessages){
+    @Test
+    void onPut_whenReadTimeoutExceptionOccurs_shouldReturnOneTimeoutError() {
+        // given
+        final List<String> plainMessage = List.of("I", "like", "cookies");
+
+        final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(plainPublishRequest, plainMessagesMaxBatch);
+
+        // then
+        StepVerifier.create(responses)
+                .consumeNextWith(this::assertTimeoutError)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void onPut_whenReadTimeoutExceptionOccursForSecondBatch_shouldReturnOneCorrectResponseAndThenOneTimeoutError() {
+        // given
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+        final List<JsonObject> parsedThreeMessages = getAsJsonObjects(threeJsonMessages);
+
+        final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.just(successHttpResponse))
+                .willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(jsonPublishRequest, doubleJsonMessageBatch);
+
+        // then
+        StepVerifier.create(responses)
+                .consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
+                .consumeNextWith(this::assertTimeoutError)
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    @Test
+    void onPut_whenReadTimeoutExceptionOccursForFirstBatch_shouldReturnOneTimeoutErrorAndThenOneCorrectResponse() {
+        // given
+        final List<String> threeJsonMessages = getAsMRJsonMessages(List.of("I", "like", "cookies"));
+        final List<String> twoJsonMessages = getAsMRJsonMessages(List.of("and", "pierogi"));
+
+        final List<JsonObject> parsedTwoMessages = getAsJsonObjects(twoJsonMessages);
+
+        final Flux<JsonObject> doubleJsonMessageBatch = jsonBatch(threeJsonMessages.appendAll(twoJsonMessages));
+        given(httpClient.call(any(HttpRequest.class)))
+                .willReturn(Mono.error(ReadTimeoutException.INSTANCE))
+                .willReturn(Mono.just(successHttpResponse));
+        // when
+        final Flux<MessageRouterPublishResponse> responses = cut
+                .put(jsonPublishRequest, doubleJsonMessageBatch);
+
+        // then
+        StepVerifier.create(responses)
+                .consumeNextWith(this::assertTimeoutError)
+                .consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+
+    private static List<String> getAsMRJsonMessages(List<String> plainTextMessages) {
         return plainTextMessages
                 .map(message -> String.format("{\"message\":\"%s\"}", message));
     }
 
-    private static HttpResponse createHttpResponse(String statusReason, int statusCode){
+    private static HttpResponse createHttpResponse(String statusReason, int statusCode) {
         return ImmutableHttpResponse.builder()
                 .statusCode(statusCode)
                 .url(TOPIC_URL)
@@ -418,7 +492,7 @@ class MessageRouterPublisherImplTest {
                 .build();
     }
 
-    private String collectNonEmptyRequestBody(HttpRequest httpRequest){
+    private String collectNonEmptyRequestBody(HttpRequest httpRequest) {
         final String body = Flux.from(httpRequest.body().contents())
                 .collect(ByteBufAllocator.DEFAULT::compositeBuffer,
                         (byteBufs, buffer) -> byteBufs.addComponent(true, buffer))
@@ -429,11 +503,11 @@ class MessageRouterPublisherImplTest {
         return body;
     }
 
-    private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest){
+    private JsonArray extractNonEmptyJsonRequestBody(HttpRequest httpRequest) {
         return new Gson().fromJson(collectNonEmptyRequestBody(httpRequest), JsonArray.class);
     }
 
-    private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest){
+    private List<JsonElement> extractNonEmptyPlainRequestBody(HttpRequest httpRequest) {
         return getAsJsonElements(
                 List.of(
                         collectNonEmptyRequestBody(httpRequest)
@@ -442,8 +516,8 @@ class MessageRouterPublisherImplTest {
         );
     }
 
-    private void assertListsContainSameElements(List<? extends  JsonElement> actualMessages,
-            List<? extends JsonElement> expectedMessages){
+    private void assertListsContainSameElements(List<? extends JsonElement> actualMessages,
+                                                List<? extends JsonElement> expectedMessages) {
         for (int i = 0; i < actualMessages.size(); i++) {
             assertThat(actualMessages.get(i))
                     .describedAs(String.format("Http request element at position %d", i))
@@ -452,7 +526,7 @@ class MessageRouterPublisherImplTest {
     }
 
     private void assertListsContainSameElements(JsonArray actualMessages,
-            List<? extends JsonElement> expectedMessages){
+                                                List<? extends JsonElement> expectedMessages) {
         assertThat(actualMessages.size()).describedAs("Http request batch size")
                 .isEqualTo(expectedMessages.size());
 
@@ -463,38 +537,32 @@ class MessageRouterPublisherImplTest {
         }
     }
 
+    private void assertTimeoutError(MessageRouterPublishResponse response) {
+        assertThat(response.failed()).isTrue();
+        assertThat(response.items()).isEmpty();
+        assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE_HEADER);
+    }
+
     private void verifySingleResponse(List<? extends JsonElement> threeMessages,
-            Flux<MessageRouterPublishResponse> responses) {
+                                      Flux<MessageRouterPublishResponse> responses) {
         StepVerifier.create(responses)
-                .consumeNextWith(response -> {
-                    assertThat(response.successful()).describedAs("successful").isTrue();
-                    assertThat(response.items()).containsExactly(
-                            threeMessages.get(0),
-                            threeMessages.get(1),
-                            threeMessages.get(2));
-                })
+                .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
                 .expectComplete()
                 .verify(TIMEOUT);
     }
 
     private void verifyDoubleResponse(List<? extends JsonElement> threeMessages,
-            List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
-
+                                      List<? extends JsonElement> twoMessages, Flux<MessageRouterPublishResponse> responses) {
         StepVerifier.create(responses)
-                .consumeNextWith(response -> {
-                    assertThat(response.successful()).describedAs("successful").isTrue();
-                    assertThat(response.items()).containsExactly(
-                            threeMessages.get(0),
-                            threeMessages.get(1),
-                            threeMessages.get(2));
-                })
-                .consumeNextWith(response -> {
-                    assertThat(response.successful()).describedAs("successful").isTrue();
-                    assertThat(response.items()).containsExactly(
-                            twoMessages.get(0),
-                            twoMessages.get(1));
-                })
+                .consumeNextWith(response -> verifySuccessfulResponses(threeMessages, response))
+                .consumeNextWith(response -> verifySuccessfulResponses(twoMessages, response))
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-}
\ No newline at end of file
+
+    private void verifySuccessfulResponses(List<? extends JsonElement> threeMessages, MessageRouterPublishResponse response) {
+        assertThat(response.successful()).describedAs("successful").isTrue();
+        JsonElement[] jsonElements = threeMessages.toJavaStream().toArray(JsonElement[]::new);
+        assertThat(response.items()).containsExactly(jsonElements);
+    }
+}
index 20cade0..ab6641c 100644 (file)
@@ -66,6 +66,17 @@ services:
     depends_on:
       - zookeeper
       - kafka
+
+  toxiproxy:
+    image: shopify/toxiproxy:2.1.4
+    ports:
+      - "8474:8474"
+      - "8666:8666"
+    networks:
+      - net
+    depends_on:
+      - dmaap
+
 networks:
   net:
     driver: bridge
index bae2f66..d9a1106 100644 (file)
@@ -28,7 +28,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
         <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
index 33060c9..9d2d1ee 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,6 +26,8 @@ import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
+import java.time.Duration;
+
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since March 2019
@@ -37,6 +39,8 @@ public interface HttpRequest {
 
     HttpMethod method();
 
+    @Nullable Duration timeout();
+
     @Nullable RequestBody body();
 
     @Value.Default
index 7ac02bf..77b842d 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 
 import io.vavr.collection.Stream;
+import io.vavr.control.Option;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,11 +53,15 @@ public class RxHttpClient {
     }
 
     ResponseReceiver<?> prepareRequest(HttpRequest request) {
-        final HttpClient theClient = httpClient
+        final HttpClient simpleClient = httpClient
                 .doOnRequest((req, conn) -> logRequest(request.diagnosticContext(), req))
                 .doOnResponse((rsp, conn) -> logResponse(request.diagnosticContext(), rsp))
                 .headers(hdrs -> request.headers().forEach(hdr -> hdrs.set(hdr._1, hdr._2)));
 
+        final HttpClient theClient = Option.of(request.timeout())
+                .map(simpleClient::responseTimeout)
+                .getOrElse(simpleClient);
+
         return prepareBody(request, theClient);
     }
 
index cdddaef..6f3a090 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2020 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
-
 import io.netty.handler.codec.http.HttpResponseStatus;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.time.Duration;
+import io.netty.handler.timeout.ReadTimeoutException;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -35,6 +30,13 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttp
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+
 class RxHttpClientIT {
 
     private static final Duration TIMEOUT = Duration.ofHours(5);
@@ -43,12 +45,13 @@ class RxHttpClientIT {
 
     @BeforeAll
     static void setUpClass() {
-        httpServer = DummyHttpServer.start(routes ->
-                routes.get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK")))
-                        .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
-                        .post("/headers-post", (req, resp) -> resp
-                                .sendString(Mono.just(req.requestHeaders().toString())))
-                        .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
+        httpServer = DummyHttpServer.start(routes -> routes
+                .get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK")))
+                .get("/delayed-get", (req, resp) -> sendString(resp, Mono.just("OK").delayElement(Duration.ofMinutes(1))))
+                .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send())
+                .post("/headers-post", (req, resp) -> resp
+                        .sendString(Mono.just(req.requestHeaders().toString())))
+                .post("/echo-post", (req, resp) -> resp.send(req.receive().retain()))
         );
     }
 
@@ -65,7 +68,9 @@ class RxHttpClientIT {
     @Test
     void simpleGet() throws Exception {
         // given
-        final HttpRequest httpRequest = requestFor("/sample-get").method(HttpMethod.GET).build();
+        final HttpRequest httpRequest = requestFor("/sample-get")
+                .method(HttpMethod.GET)
+                .build();
 
         // when
         final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -73,13 +78,18 @@ class RxHttpClientIT {
                 .map(HttpResponse::bodyAsString);
 
         // then
-        StepVerifier.create(bodyAsString).expectNext("OK").expectComplete().verify(TIMEOUT);
+        StepVerifier.create(bodyAsString)
+                .expectNext("OK")
+                .expectComplete()
+                .verify(TIMEOUT);
     }
 
     @Test
     void getWithError() throws Exception {
         // given
-        final HttpRequest httpRequest = requestFor("/sample-get-500").method(HttpMethod.GET).build();
+        final HttpRequest httpRequest = requestFor("/sample-get-500")
+                .method(HttpMethod.GET)
+                .build();
 
         // when
         final Mono<String> bodyAsString = cut.call(httpRequest)
@@ -87,7 +97,9 @@ class RxHttpClientIT {
                 .map(HttpResponse::bodyAsString);
 
         // then
-        StepVerifier.create(bodyAsString).expectError(HttpException.class).verify(TIMEOUT);
+        StepVerifier.create(bodyAsString)
+                .expectError(HttpException.class)
+                .verify(TIMEOUT);
     }
 
     @Test
@@ -158,4 +170,21 @@ class RxHttpClientIT {
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-}
\ No newline at end of file
+
+    @Test
+    void getWithTimeoutError() throws Exception {
+        // given
+        final HttpRequest httpRequest = requestFor("/delayed-get")
+                .method(HttpMethod.GET)
+                .timeout(Duration.ofSeconds(1))
+                .build();
+
+        // when
+        final Mono<HttpResponse> response = cut.call(httpRequest);
+
+        // then
+        StepVerifier.create(response)
+                .expectError(ReadTimeoutException.class)
+                .verify(TIMEOUT);
+    }
+}
index 52887c4..d75c2e1 100644 (file)
@@ -27,7 +27,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
         <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
index dff5a10..5e22eea 100644 (file)
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>sdk</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.services.sdk</groupId>
index 911545e..959e0f7 100644 (file)
@@ -6,7 +6,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.services.sdk.security</groupId>
         <artifactId>dcaegen2-services-sdk-security</artifactId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
index f717c33..be9a0b6 100644 (file)
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>sdk</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.services.sdk.security</groupId>
index a2406f5..0ced1a4 100644 (file)
@@ -6,7 +6,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services.sdk.security</groupId>
     <artifactId>dcaegen2-services-sdk-security</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>ssl</artifactId>
index 7bbb664..e270254 100644 (file)
@@ -7,7 +7,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
         <artifactId>dcaegen2-services-sdk-services</artifactId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <artifactId>dcaegen2-services-sdk-services-external-schema-manager</artifactId>
index bb57225..8ec6d39 100644 (file)
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services.sdk</groupId>
     <artifactId>dcaegen2-services-sdk-services</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>dcaegen2-services-sdk-services-hvvesclient</artifactId>
index eb15bb7..1900f73 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
         <artifactId>hvvesclient-producer</artifactId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <artifactId>hvvesclient-producer-api</artifactId>
index 7b2ab31..980570f 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
         <artifactId>hvvesclient-producer</artifactId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <artifactId>hvvesclient-producer-ct</artifactId>
index 9abb2c4..7d1bd74 100644 (file)
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services.sdk</groupId>
     <artifactId>hvvesclient-producer</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>hvvesclient-producer-impl</artifactId>
index 66c5ff0..2601eb0 100644 (file)
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services.sdk</groupId>
     <artifactId>dcaegen2-services-sdk-services-hvvesclient</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <artifactId>hvvesclient-producer</artifactId>
index c792121..2e0d87e 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <artifactId>dcaegen2-services-sdk-services-hvvesclient</artifactId>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <name>High Volume VES Collector Client :: Protobuf</name>
index 6275672..7d9a37d 100644 (file)
@@ -26,7 +26,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>sdk</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.services.sdk</groupId>
index c4c2aa4..e9792d2 100644 (file)
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services.sdk</groupId>
     <artifactId>dcaegen2-services-sdk-standardization</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
   
index 3ec0f3a..76ab81e 100644 (file)
@@ -25,7 +25,7 @@
     <parent>
         <artifactId>dcaegen2-sdk-moher-api</artifactId>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <name>Monitoring and Healthcheck :: Health state</name>
index 3794d17..0cd8f55 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <artifactId>dcaegen2-sdk-moher-api</artifactId>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <name>Monitoring and Healthcheck :: Metrics</name>
index 6351711..37090d6 100644 (file)
@@ -26,7 +26,7 @@
     <parent>
         <artifactId>dcaegen2-services-sdk-standardization</artifactId>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <name>Monitoring and Healthcheck</name>
index 42a9b45..283d816 100644 (file)
@@ -25,7 +25,7 @@
     <parent>
         <artifactId>dcaegen2-sdk-moher-api</artifactId>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <name>Monitoring and Healthcheck :: Server Adapters</name>
index c731124..cb9efb9 100644 (file)
@@ -25,7 +25,7 @@
     <parent>
         <artifactId>dcaegen2-sdk-moher-server-adapters</artifactId>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <name>Monitoring and Healthcheck :: Server Adapters :: Reactor Netty</name>
index 48a9743..34ea3a0 100644 (file)
@@ -25,7 +25,7 @@
     <parent>
         <artifactId>dcaegen2-sdk-moher-server-adapters</artifactId>
         <groupId>org.onap.dcaegen2.services.sdk</groupId>
-        <version>1.5.0-SNAPSHOT</version>
+        <version>1.6.0-SNAPSHOT</version>
     </parent>
 
     <name>Monitoring and Healthcheck :: Server Adapters :: Spring Webflux</name>
index dcf781f..e780834 100644 (file)
@@ -8,7 +8,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>sdk</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>