Fix problem with resource releases when retry more than twice 14/117114/1
authortkogut <tomasz.kogut@nokia.com>
Mon, 25 Jan 2021 16:56:56 +0000 (17:56 +0100)
committertkogut <tomasz.kogut@nokia.com>
Mon, 25 Jan 2021 16:56:56 +0000 (17:56 +0100)
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: I8fc58e035226c7c2d499b23f0f31df7bbdb147d4

rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java

index f6ef94b..159fc59 100644 (file)
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockserver.client.MockServerClient;
-import org.mockserver.matchers.TimeToLive;
 import org.mockserver.matchers.Times;
 import org.mockserver.verify.VerificationTimes;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
@@ -352,7 +351,9 @@ class MessageRouterPublisherIT {
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
                 .respond(response().withStatusCode(404));
-        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+                retryConfig(1, 1));
 
         //when
         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
@@ -381,8 +382,9 @@ class MessageRouterPublisherIT {
         final String path = String.format("/events/%s", topic);
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
-                .respond(response().withDelay(TimeUnit.SECONDS, 10));
-        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+                retryConfig(1, 1));
 
         //when
         final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
@@ -396,11 +398,51 @@ class MessageRouterPublisherIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
 
-    private MessageRouterPublisherConfig retryConfig() {
+    @Test
+    void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
+        final String topic = "TOPIC13";
+        final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+        final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+                "{\"differentMessage\":\"message2\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
+        final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+        final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+        final String path = String.format("/events/%s", topic);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(404));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(500));
+        final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+    }
+
+    private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
         return ImmutableMessageRouterPublisherConfig.builder()
                 .retryConfig(ImmutableDmaapRetryConfig.builder()
-                        .retryIntervalInSeconds(1)
-                        .retryCount(1)
+                        .retryIntervalInSeconds(retryInterval)
+                        .retryCount(retryCount)
                         .build())
                 .build();
     }
index 1f4e499..15c3bd8 100644 (file)
@@ -245,7 +245,7 @@ class MessageRouterSubscriberIT {
         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
-                .respond(response().withDelay(TimeUnit.SECONDS, 5));
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
 
         //when
         Mono<MessageRouterSubscribeResponse> response = subscriber
@@ -278,7 +278,8 @@ class MessageRouterSubscriberIT {
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
                 .respond(response().withStatusCode(404));
-        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+                retryConfig(1, 1));
 
         //when
         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
@@ -314,8 +315,9 @@ class MessageRouterSubscriberIT {
         final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
         MOCK_SERVER_CLIENT
                 .when(request().withPath(path), Times.once())
-                .respond(response().withDelay(TimeUnit.SECONDS, 10));
-        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+                retryConfig(1, 1));
 
         //when
         registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
@@ -333,11 +335,58 @@ class MessageRouterSubscriberIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
 
-    private MessageRouterSubscriberConfig retryConfig() {
+    @Test
+    void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
+        //given
+        final String topic = "TOPIC8";
+        final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+        final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+        final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+        final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+                proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+
+        final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+        final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+        final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+        final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(404));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        MOCK_SERVER_CLIENT
+                .when(request().withPath(path), Times.once())
+                .respond(response().withStatusCode(500));
+        final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+                retryConfig(1, 5));
+
+        //when
+        registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+                createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+        Mono<MessageRouterSubscribeResponse> response = publisher
+                .put(publishRequest, jsonMessageBatch)
+                .then(subscriber.get(subscribeRequest));
+
+        //then
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+
+        MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+    }
+
+    private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
         return ImmutableMessageRouterSubscriberConfig.builder()
                 .retryConfig(ImmutableDmaapRetryConfig.builder()
-                        .retryIntervalInSeconds(1)
-                        .retryCount(1)
+                        .retryIntervalInSeconds(retryInterval)
+                        .retryCount(retryCount)
                         .build())
                 .build();
     }
index d427ee5..46f9431 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@ import org.jetbrains.annotations.Nullable;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.Mono;
 import reactor.netty.ByteBufFlux;
+import reactor.netty.ByteBufMono;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -58,12 +59,9 @@ public interface RequestBody {
     }
 
     static RequestBody fromString(String contents, Charset charset) {
-        ByteBuf encodedContents = ByteBufAllocator.DEFAULT.buffer();
-        encodedContents.writeCharSequence(contents, charset);
-
         return ImmutableRequestBody.builder()
-                .length(encodedContents.readableBytes())
-                .contents(Mono.just(encodedContents.retain()))
+                .length(contents.length())
+                .contents(ByteBufMono.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT))
                 .build();
     }
 
index d0bdf41..76bde27 100644 (file)
@@ -28,7 +28,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryCo
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClient;
 import reactor.netty.http.client.HttpClient.ResponseReceiver;
@@ -113,7 +112,7 @@ public class RxHttpClient {
         return theClient
                 .headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED))
                 .request(request.method().asNetty())
-                .send(Flux.from(request.body().contents()))
+                .send(request.body().contents())
                 .uri(request.url());
     }
 
@@ -121,7 +120,7 @@ public class RxHttpClient {
         return theClient
                 .headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString()))
                 .request(request.method().asNetty())
-                .send(Flux.from(request.body().contents()))
+                .send(request.body().contents())
                 .uri(request.url());
     }