Add timeout for Subscriber(dmaap-client) 74/116674/2
authorPawel <pawel.kasperkiewicz@nokia.com>
Tue, 5 Jan 2021 12:40:47 +0000 (13:40 +0100)
committerPawel <pawel.kasperkiewicz@nokia.com>
Thu, 7 Jan 2021 14:43:58 +0000 (15:43 +0100)
Issue-ID: DCAEGEN2-1483
Signed-off-by: Pawel <pawel.kasperkiewicz@nokia.com>
Change-Id: Ib733af0541a1aad84691a2db97c1e495f0162866

rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/MessageRouterTestsUtils.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java

index 72c0bad..f7ccf4f 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,8 +26,11 @@ import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+import io.netty.handler.timeout.ReadTimeoutException;
 import io.vavr.collection.List;
 import java.nio.charset.StandardCharsets;
+
+import io.vavr.control.Option;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -35,6 +38,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
@@ -59,16 +65,22 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
     @Override
     public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
         LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
-        return httpClient.call(buildGetHttpRequest(request)).map(this::buildGetResponse);
+        return httpClient.call(buildGetHttpRequest(request))
+                .map(this::buildGetResponse)
+                .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
+                .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
     }
 
 
     private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
-        return ImmutableHttpRequest.builder()
+        ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
                 .method(HttpMethod.GET)
                 .url(buildSubscribeUrl(request))
-                .diagnosticContext(request.diagnosticContext().withNewInvocationId())
-                .build();
+                .diagnosticContext(request.diagnosticContext().withNewInvocationId());
+
+        return Option.of(request.timeoutConfig())
+                .map(timeoutConfig -> requestBuilder.timeout(timeoutConfig.getTimeout()).build())
+                .getOrElse(requestBuilder::build);
     }
 
     private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) {
@@ -90,4 +102,11 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
         return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(),
                 request.consumerId());
     }
+
+    private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+        String failReason = ClientErrorReasonPresenter.present(clientErrorReason);
+        return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+                .failReason(failReason)
+                .build());
+    }
 }
index 212d8f2..95c5e7d 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,6 +21,8 @@
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
 
 import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
 /**
@@ -32,4 +34,6 @@ public interface DmaapRequest {
     default RequestDiagnosticContext diagnosticContext() {
         return RequestDiagnosticContext.create();
     }
+
+    @Nullable TimeoutConfig timeoutConfig();
 }
index 4490c79..ccd516d 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model;
 
 import org.immutables.value.Value;
-import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.TimeoutConfig;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -35,8 +33,6 @@ public interface MessageRouterPublishRequest extends DmaapRequest {
 
     MessageRouterSink sinkDefinition();
 
-    @Nullable TimeoutConfig timeoutConfig();
-
     @Value.Default
     default ContentType contentType() {
         return ContentType.APPLICATION_JSON;
index d94639a..2b8027c 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019-2020 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -71,19 +71,37 @@ public final class MessageRouterTestsUtils {
 
     public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
                                                                          String consumerGroup, String consumerId) {
-        ImmutableMessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
-                .name("the topic")
-                .topicUrl(topicUrl)
+
+        return ImmutableMessageRouterSubscribeRequest
+                .builder()
+                .sourceDefinition(getImmutableMessageRouterSource(topicUrl))
+                .consumerGroup(consumerGroup)
+                .consumerId(consumerId)
                 .build();
+    }
+
+    public static MessageRouterSubscribeRequest createMRSubscribeRequest(String topicUrl,
+                                                                         String consumerGroup, String consumerId,
+                                                                         Duration timeout) {
 
         return ImmutableMessageRouterSubscribeRequest
                 .builder()
-                .sourceDefinition(sourceDefinition)
+                .timeoutConfig(ImmutableTimeoutConfig.builder()
+                        .timeout(timeout)
+                        .build())
+                .sourceDefinition(getImmutableMessageRouterSource(topicUrl))
                 .consumerGroup(consumerGroup)
                 .consumerId(consumerId)
                 .build();
     }
 
+    private static ImmutableMessageRouterSource getImmutableMessageRouterSource(String topicUrl) {
+        return ImmutableMessageRouterSource.builder()
+                .name("the topic")
+                .topicUrl(topicUrl)
+                .build();
+    }
+
     public static List<JsonElement> getAsJsonElements(List<String> messages) {
         return messages.map(JsonParser::parseString);
     }
@@ -109,9 +127,10 @@ public final class MessageRouterTestsUtils {
     }
 
     public static MessageRouterSubscribeResponse errorSubscribeResponse(String failReasonFormat, Object... formatArgs) {
+        String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
         return ImmutableMessageRouterSubscribeResponse
                 .builder()
-                .failReason(String.format(failReasonFormat, formatArgs))
+                .failReason(failReason)
                 .build();
     }
 
index 7a31209..bd161aa 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+import eu.rekawek.toxiproxy.Proxy;
+import eu.rekawek.toxiproxy.ToxiproxyClient;
 import io.vavr.collection.List;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -37,8 +39,11 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.io.IOException;
 import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
+import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createPublishRequest;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.errorSubscribeResponse;
@@ -48,11 +53,18 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.registerTopic;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.successSubscribeResponse;
 
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.DMAAP_SERVICE_NAME;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.LOCALHOST;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_SERVICE_EXPOSED_PORT;
+
 @Testcontainers
 class MessageRouterSubscriberIT {
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final String CONSUMER_GROUP = "group1";
     private static final String CONSUMER_ID = "consumer200";
+    private static String PROXY_EVENTS_PATH;
+    private static Proxy DMAAP_PROXY;
     private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
             "{" +
             "\"mrstatus\":3001," +
@@ -60,6 +72,18 @@ class MessageRouterSubscriberIT {
             "\"message\":\"No such topic exists.-[%s]\"," +
             "\"status\":404" +
             "}";
+    private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+            + "{"
+            + "\"requestError\":"
+            + "{"
+            + "\"serviceException\":"
+            + "{"
+            + "\"messageId\":\"SVC0001\","
+            + "\"text\":\"Client timeout exception occurred, Error code is %1\","
+            + "\"variables\":[\"408\"]"
+            + "}"
+            + "}"
+            + "}";
 
     @Container
     private static final DockerComposeContainer CONTAINER = DMaapContainer.createContainerInstance();
@@ -73,14 +97,16 @@ class MessageRouterSubscriberIT {
 
 
     @BeforeAll
-    static void setUp() {
-        EVENTS_PATH = String.format("http://%s:%d/events",
-                CONTAINER.getServiceHost(DMaapContainer.DMAAP_SERVICE_NAME,
-                        DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT),
-                CONTAINER.getServicePort(DMaapContainer.DMAAP_SERVICE_NAME,
-                        DMaapContainer.DMAAP_SERVICE_EXPOSED_PORT));
+    static void setUp() throws IOException {
+        EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
+        PROXY_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_SERVICE_EXPOSED_PORT);
+
+        DMAAP_PROXY = new ToxiproxyClient().createProxy("dmaapProxy",
+                String.format("[::]:%s", PROXY_SERVICE_EXPOSED_PORT),
+                String.format("%s:%d", DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT));
     }
 
+
     @Test
     void subscriber_shouldHandleNoSuchTopicException() {
         //given
@@ -207,6 +233,30 @@ class MessageRouterSubscriberIT {
                 .verify(TIMEOUT);
     }
 
+    @Test
+    void subscriber_shouldHandleTimeoutException() throws IOException {
+        //given
+        final String topic = "newTopic";
+        final MessageRouterSubscribeRequest mrSubscribeRequest = createMRSubscribeRequest(
+                String.format("%s/%s", PROXY_EVENTS_PATH, topic), CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+        final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+                TIMEOUT_ERROR_MESSAGE);
 
+        final String toxic = "latency-toxic";
+        DMAAP_PROXY.toxics()
+                .latency(toxic, DOWNSTREAM, TimeUnit.SECONDS.toMillis(5));
 
+        //when
+        Mono<MessageRouterSubscribeResponse> response = subscriber
+                .get(mrSubscribeRequest);
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(TIMEOUT);
+
+        //cleanup
+        DMAAP_PROXY.toxics().get(toxic).remove();
+    }
 }
index ef2cb5e..1f97001 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import com.google.gson.JsonSyntaxException;
+import io.netty.handler.timeout.ReadTimeoutException;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
@@ -48,7 +49,8 @@ class MessageRouterSubscriberImplTest {
 
     private final RxHttpClient httpClient = mock(RxHttpClient.class);
     private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault();
-    private final MessageRouterSubscriber cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance());
+    private final MessageRouterSubscriber
+            cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance());
 
     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
     private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
@@ -129,4 +131,29 @@ class MessageRouterSubscriberImplTest {
         // then
         assertThatExceptionOfType(JsonSyntaxException.class).isThrownBy(() -> cut.get(mrRequest).block());
     }
+
+    @Test
+    void getWithProperRequest_shouldReturnTimeoutError() {
+
+        // given
+        given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.error(ReadTimeoutException.INSTANCE));
+
+        // when
+        final Mono<MessageRouterSubscribeResponse> responses = cut
+                .get(mrRequest);
+        final MessageRouterSubscribeResponse response = responses.block();
+
+        // then
+        assertThat(response.failed()).isTrue();
+        assertThat(response.failReason()).contains("408 Request Timeout");
+        assertThat(response.hasElements()).isFalse();
+
+
+        verify(httpClient).call(httpRequestArgumentCaptor.capture());
+        final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+        assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET);
+        assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(),
+                mrRequest.consumerGroup(), mrRequest.consumerId()));
+        assertThat(httpRequest.body()).isNull();
+    }
 }
\ No newline at end of file