Return last exception instead of timeout exception when retry exhausted.
Handle no connection exception when sending requests to dmaap-mr.
Issue-ID: DCAEGEN2-1483
Signed-off-by: tkogut <tomasz.kogut@nokia.com>
Change-Id: Ibe318fa349b79999a5c8054e04e72e444a42ea78
import java.time.Duration;
-import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.ON_RETRY_EXHAUSTED_EXCEPTION;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_EXCEPTIONS;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapRetryConfig.RETRYABLE_HTTP_CODES;
.retryCount(rc.retryCount())
.retryableHttpResponseCodes(RETRYABLE_HTTP_CODES)
.customRetryableExceptions(RETRYABLE_EXCEPTIONS)
- .onRetryExhaustedException(ON_RETRY_EXHAUSTED_EXCEPTION)
.build())
.getOrNull();
}
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
.messageId("SVC0001")
.variables(Collections.singletonList("408")).build();
+ public static final ClientErrorReason SERVICE_UNAVAILABLE = ImmutableClientErrorReason.builder()
+ .header("503 Service unavailable")
+ .text("DMaaP MR is unavailable")
+ .messageId("SVC2001")
+ .build();
+
}
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.net.ConnectException;
import java.time.Duration;
import java.util.stream.Collectors;
LOGGER.trace("The items to be sent: {}", batch);
return httpClient.call(buildHttpRequest(request, createBody(batch, request.contentType())))
.map(httpResponse -> buildResponse(httpResponse, batch))
- .doOnError(ReadTimeoutException.class, e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
- .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
+ .doOnError(ReadTimeoutException.class,
+ e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
+ .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+ .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+ .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+ .onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch)));
}
private @NotNull RequestBody createBody(List<? extends JsonElement> subItems, ContentType contentType) {
: builder.failReason(extractFailReason(httpResponse)).build();
}
- private Mono<MessageRouterPublishResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ private Mono<MessageRouterPublishResponse> buildErrorResponse(ClientErrorReason clientErrorReason) {
String failReason = clientErrorReasonPresenter.present(clientErrorReason);
return Mono.just(ImmutableMessageRouterPublishResponse.builder()
.failReason(failReason)
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
+import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
.map(this::buildGetResponse)
.doOnError(ReadTimeoutException.class,
e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
- .onErrorResume(ReadTimeoutException.class, e -> createErrorResponse(ClientErrorReasons.TIMEOUT));
+ .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+ .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+ .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+ .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
}
-
private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
ImmutableHttpRequest.Builder requestBuilder = ImmutableHttpRequest.builder()
.method(HttpMethod.GET)
request.consumerId());
}
- private Mono<MessageRouterSubscribeResponse> createErrorResponse(ClientErrorReason clientErrorReason) {
+ private Mono<MessageRouterSubscribeResponse> buildErrorResponse(ClientErrorReason clientErrorReason) {
String failReason = clientErrorReasonPresenter.present(clientErrorReason);
return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
.failReason(failReason)
public interface DmaapRetryConfig {
Set<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = HashSet.of(ReadTimeoutException.class, ConnectException.class);
- RuntimeException ON_RETRY_EXHAUSTED_EXCEPTION = ReadTimeoutException.INSTANCE;
Set<Integer> RETRYABLE_HTTP_CODES = HashSet.of(404, 408, 413, 429, 500, 502, 503, 504);
@Value.Default
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
}
+ @Test
+ void publisher_shouldHandleLastRetryError500() {
+ final String topic = "TOPIC14";
+ final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+ final String responseMessage = "Response Message";
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
+ "500 Internal Server Error\n%s", responseMessage);
+
+ final String path = String.format("/events/%s", topic);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500).withBody(responseMessage));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 1));
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+ }
+
private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterPublisherConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
private static final String ERROR_MESSAGE = "Something went wrong";
private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
+ private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable";
private static final String SUCCESS_RESP_TOPIC_PATH = "/events/TOPIC";
private static final String DELAY_RESP_TOPIC_PATH = "/events/DELAY";
private static final String FAILING_WITH_400_RESP_PATH = "/events/TOPIC400";
private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
.map(JsonPrimitive::new);
private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
-
- private static DummyHttpServer server;
+ private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+ private static final DummyHttpServer SERVER = initialize();
private MessageRouterPublisher sut = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
- @BeforeAll
- static void setUp() {
- server = DummyHttpServer.start(routes -> routes
+ private static DummyHttpServer initialize() {
+ return DummyHttpServer.start(routes -> routes
.post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
.post(DELAY_RESP_TOPIC_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
.post(FAILING_WITH_400_RESP_PATH, (req, resp) -> sendError(resp, 400, ERROR_MESSAGE))
@Test
void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
//given
- final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
//when
})
void publisher_shouldHandleError(String failingPath, String failReason) {
//given
- final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath);
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(failingPath, SERVER);
final MessageRouterPublishResponse expectedResponse = createErrorResponse(failReason);
//when
.verify(TIMEOUT);
}
- private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath) {
- final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+ @Test
+ void publisher_shouldHandleConnectionError() {
+ //given
+ final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(
+ SUCCESS_RESP_TOPIC_PATH, DISPOSED_HTTP_SERVER);
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+
+ //then
+ StepVerifier.create(result)
+ .consumeNextWith(this::assertConnectionError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, DummyHttpServer dummyHttpServer) {
+ final MessageRouterSink sinkDefinition = createMRSink(topicPath, dummyHttpServer);
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(sinkDefinition)
.contentType(ContentType.TEXT_PLAIN)
}
private static MessageRouterPublishRequest createTextPlainMRRequest(String topicPath, Duration timeout) {
- final MessageRouterSink sinkDefinition = createMRSink(topicPath);
+ final MessageRouterSink sinkDefinition = createMRSink(topicPath, SERVER);
return ImmutableMessageRouterPublishRequest.builder()
.sinkDefinition(sinkDefinition)
.contentType(ContentType.TEXT_PLAIN)
.build();
}
- private static MessageRouterSink createMRSink(String topicPath) {
+ private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) {
return ImmutableMessageRouterSink.builder()
.name("the topic")
.topicUrl(String.format("http://%s:%d%s",
- server.host(),
- server.port(),
+ dummyHttpServer.host(),
+ dummyHttpServer.port(),
topicPath)
)
.build();
assertThat(response.failed()).isTrue();
assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
}
+
+ private void assertConnectionError(DmaapResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE);
+ }
}
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.vavr.collection.List;
+import io.vavr.control.Try;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
}
+ @Test
+ void subscriber_shouldHandleLastRetryError500() {
+ //given
+ final String topic = "TOPIC9";
+ final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+ proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID);
+ final String responseMessage = "Response Message";
+ final MessageRouterSubscribeResponse expectedResponse = errorSubscribeResponse(
+ "500 Internal Server Error\n%s", responseMessage);
+
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500).withBody(responseMessage));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
+
+ //when
+ Mono<MessageRouterSubscribeResponse> response = subscriber.get(subscribeRequest);
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
+ }
+
private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterSubscriberConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
import com.google.gson.JsonElement;
import com.google.gson.JsonPrimitive;
import io.vavr.collection.List;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
class MessageRouterSubscriberTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final String ERROR_MESSAGE = "Something went wrong";
+ private static final String CONNECTION_ERROR_MESSAGE = "503 Service unavailable";
private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout";
private static final String CONSUMER_GROUP = "group1";
private static final String SUCCESS_CONSUMER_ID = "consumer200";
private static final String FAILING_WITH_500_RESP_PATH = String
.format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
- private static MessageRouterSubscribeRequest mrSuccessRequest;
- private static MessageRouterSubscribeRequest mrFailingRequest;
+ private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
+ private static final DummyHttpServer SERVER = initialize();
+
private MessageRouterSubscriber sut = DmaapClientFactory
.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
- private static MessageRouterSource sourceDefinition;
-
+ private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER);
+ private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER);
+ private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition);
+ private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
- @BeforeAll
- static void setUp() {
- DummyHttpServer server = DummyHttpServer.start(routes -> routes
+ private static DummyHttpServer initialize() {
+ return DummyHttpServer.start(routes -> routes
.get(SUCCESS_RESP_PATH, (req, resp) ->
sendResource(resp, "/sample-mr-subscribe-response.json"))
.get(DELAY_RESP_PATH, (req, resp) -> sendWithDelay(resp, 200, TIMEOUT))
.get(FAILING_WITH_409_RESP_PATH, (req, resp) -> sendError(resp, 409, ERROR_MESSAGE))
.get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
.get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)));
-
- sourceDefinition = createMessageRouterSource(server);
-
- mrSuccessRequest = createSuccessRequest();
-
- mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
}
@Test
.verify(TIMEOUT);
}
+ @Test
+ void subscriber_shouldHandleConnectionError() {
+ MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition);
+ Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+ StepVerifier.create(response)
+ .consumeNextWith(this::assertConnectionError)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) {
return ImmutableMessageRouterSource.builder()
.name("the topic")
.build();
}
- private static MessageRouterSubscribeRequest createSuccessRequest() {
+ private static MessageRouterSubscribeRequest createSuccessRequest(MessageRouterSource source) {
return ImmutableMessageRouterSubscribeRequest.builder()
- .sourceDefinition(sourceDefinition)
+ .sourceDefinition(source)
.consumerGroup(CONSUMER_GROUP)
.consumerId(SUCCESS_CONSUMER_ID)
.build();
assertThat(response.failed()).isTrue();
assertThat(response.failReason()).startsWith(TIMEOUT_ERROR_MESSAGE);
}
+
+ private void assertConnectionError(DmaapResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith(CONNECTION_ERROR_MESSAGE);
+ }
}
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
+ private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
@Test
void puttingElementsShouldYieldNonChunkedHttpRequest() {
// then
StepVerifier.create(responses)
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenConnectionExceptionOccurs_shouldReturnOneConnectionException() {
+ // given
+ final List<String> plainMessage = List.of("I", "like", "cookies");
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(new ConnectException()));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertFailedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void onPut_whenRetryableExceptionOccurs_shouldReturnCertainFailedResponse() {
+ // given
+ final List<String> plainMessage = List.of("I", "like", "cookies");
+
+ final Flux<JsonElement> plainMessagesMaxBatch = plainBatch(plainMessage);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.just(retryableHttpResponse));
+
+ // when
+ final Flux<MessageRouterPublishResponse> responses = cut
+ .put(plainPublishRequest, plainMessagesMaxBatch);
+
+ // then
+ StepVerifier.create(responses)
+ .consumeNextWith(this::assertRetryableFailedResponse)
.expectComplete()
.verify(TIMEOUT);
}
// then
StepVerifier.create(responses)
.consumeNextWith(response -> verifySuccessfulResponses(parsedThreeMessages, response))
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
.expectComplete()
.verify(TIMEOUT);
}
// then
StepVerifier.create(responses)
- .consumeNextWith(this::assertTimeoutError)
+ .consumeNextWith(this::assertFailedResponse)
.consumeNextWith(response -> verifySuccessfulResponses(parsedTwoMessages, response))
.expectComplete()
.verify(TIMEOUT);
}
}
- private void assertTimeoutError(MessageRouterPublishResponse response) {
+ private void assertFailedResponse(MessageRouterPublishResponse response) {
assertThat(response.failed()).isTrue();
assertThat(response.items()).isEmpty();
assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
}
+ private void assertRetryableFailedResponse(MessageRouterPublishResponse response) {
+ assertThat(response.failed()).isTrue();
+ assertThat(response.items()).isEmpty();
+ assertThat(response.failReason()).startsWith("500 ERROR");
+ }
+
private void verifySingleResponse(List<? extends JsonElement> threeMessages,
Flux<MessageRouterPublishResponse> responses) {
StepVerifier.create(responses)
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.gson.JsonSyntaxException;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import reactor.core.publisher.Mono;
+import java.net.ConnectException;
+
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
* @since May 2019
.url(sourceDefinition.topicUrl())
.rawBody("[]".getBytes())
.build();
+ private final HttpResponse retryableHttpResponse = ImmutableHttpResponse.builder()
+ .statusCode(500)
+ .statusReason("Something braked")
+ .url(sourceDefinition.topicUrl())
+ .rawBody("[]".getBytes())
+ .build();
private final HttpResponse httpResponseWithWrongStatusCode = ImmutableHttpResponse.builder()
.statusCode(301)
.statusReason("Something braked")
assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
assertThat(response.hasElements()).isFalse();
+ verify(clientErrorReasonPresenter, times(1)).present(any());
+ verify(httpClient).call(httpRequestArgumentCaptor.capture());
+ final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+ assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET);
+ assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(),
+ mrRequest.consumerGroup(), mrRequest.consumerId()));
+ assertThat(httpRequest.body()).isNull();
+ }
+
+ @Test
+ void getWithProperRequest_shouldReturnConnectionException() {
+ given(clientErrorReasonPresenter.present(any()))
+ .willReturn(ERROR_MESSAGE);
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.error(new ConnectException()));
+
+ // when
+ final Mono<MessageRouterSubscribeResponse> responses = cut
+ .get(mrRequest);
+ final MessageRouterSubscribeResponse response = responses.block();
+
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).isEqualTo(ERROR_MESSAGE);
+ assertThat(response.hasElements()).isFalse();
+
+ verify(clientErrorReasonPresenter, times(1)).present(any());
+ verify(httpClient).call(httpRequestArgumentCaptor.capture());
+ final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
+ assertThat(httpRequest.method()).isEqualTo(HttpMethod.GET);
+ assertThat(httpRequest.url()).isEqualTo(String.format("%s/%s/%s", sourceDefinition.topicUrl(),
+ mrRequest.consumerGroup(), mrRequest.consumerId()));
+ assertThat(httpRequest.body()).isNull();
+ }
+
+ @Test
+ void getWithProperRequest_shouldReturnCertainFailedResponse() {
+ given(httpClient.call(any(HttpRequest.class)))
+ .willReturn(Mono.just(retryableHttpResponse));
+
+ // when
+ final Mono<MessageRouterSubscribeResponse> responses = cut
+ .get(mrRequest);
+ final MessageRouterSubscribeResponse response = responses.block();
+
+ assertThat(response.failed()).isTrue();
+ assertThat(response.failReason()).startsWith("500 Something braked");
+ assertThat(response.hasElements()).isFalse();
verify(httpClient).call(httpRequestArgumentCaptor.capture());
final HttpRequest httpRequest = httpRequestArgumentCaptor.getValue();
import io.vavr.collection.Stream;
import io.vavr.control.Option;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.RetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
private Mono<HttpResponse> mapResponse(String url, HttpResponseStatus status, reactor.netty.ByteBufMono content) {
- if (shouldRetry(status.code())) {
- return Mono.error(new RetryConfig.RetryableException());
- }
return content.asByteArray()
.defaultIfEmpty(new byte[0])
- .map(bytes -> new NettyHttpResponse(url, status, bytes));
+ .map(bytes -> new NettyHttpResponse(url, status, bytes))
+ .map(this::validatedResponse);
+ }
+
+ private HttpResponse validatedResponse(HttpResponse response) {
+ if (shouldRetry(response.statusCode())) {
+ throw new RetryableException(response);
+ }
+ return response;
}
private boolean shouldRetry(int code) {
}
private RetryBackoffSpec retryConfig(RetryConfig retryConfig, RequestDiagnosticContext context) {
- RetryBackoffSpec retry = Retry
+ return Retry
.fixedDelay(retryConfig.retryCount(), retryConfig.retryInterval())
.doBeforeRetry(retrySignal -> context.withSlf4jMdc(
LOGGER.isTraceEnabled(), () -> LOGGER.trace("Retry: {}", retrySignal)))
- .filter(ex -> isRetryable(retryConfig, ex));
-
- return Option.of(retryConfig.onRetryExhaustedException())
- .map(ex -> retry.onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> ex))
- .getOrElse(retry);
+ .filter(ex -> isRetryable(retryConfig, ex))
+ .onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> retrySignal.failure());
}
private boolean isRetryable(RetryConfig retryConfig, Throwable ex) {
import io.vavr.collection.HashSet;
import io.vavr.collection.Set;
import org.immutables.value.Value;
-import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
import java.time.Duration;
}
return result;
}
-
- @Nullable RuntimeException onRetryExhaustedException();
-
- class RetryableException extends RuntimeException {}
}
--- /dev/null
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions;
+
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+
+public class RetryableException extends RuntimeException {
+
+ private final HttpResponse response;
+
+ public RetryableException(HttpResponse response) {
+ this.response = response;
+ }
+
+ public HttpResponse getResponse() {
+ return response;
+ }
+}
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRetryConfig;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.config.ImmutableRxHttpClientConfig;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
}
@Test
- void getWithRetryExhaustedExceptionWhenClosedServer() throws Exception {
+ void getWithConnectExceptionWhenClosedServer() throws Exception {
// given
REQUEST_COUNTER = new AtomicInteger();
final HttpRequest httpRequest = requestForClosedServer("/sample-get")
// then
StepVerifier.create(response)
- .expectError(IllegalStateException.class)
+ .expectError(ConnectException.class)
.verify(TIMEOUT);
assertNoServerResponse();
}
@Test
- void getWithCustomRetryExhaustedExceptionWhenClosedServer() throws Exception {
- // given
- REQUEST_COUNTER = new AtomicInteger();
- final HttpRequest httpRequest = requestForClosedServer("/sample-get")
- .method(HttpMethod.GET)
- .build();
- final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
- .retryConfig(defaultRetryConfig()
- .customRetryableExceptions(HashSet.of(ConnectException.class))
- .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
- .build())
- .build());
-
- // when
- final Mono<HttpResponse> response = cut.call(httpRequest);
-
- // then
- StepVerifier.create(response)
- .expectError(ReadTimeoutException.class)
- .verify(TIMEOUT);
- assertNoServerResponse();
- }
-
- @Test
- void getWithRetryExhaustedExceptionWhen500() throws Exception {
+ void getWithRetryableExceptionWhen500() throws Exception {
// given
REQUEST_COUNTER = new AtomicInteger();
final HttpRequest httpRequest = requestFor("/retry-get-500")
// then
StepVerifier.create(response)
- .expectError(IllegalStateException.class)
- .verify(TIMEOUT);
- assertRetry();
- }
-
- @Test
- void getWithCustomRetryExhaustedExceptionWhen500() throws Exception {
- // given
- REQUEST_COUNTER = new AtomicInteger();
- final HttpRequest httpRequest = requestFor("/retry-get-500")
- .method(HttpMethod.GET)
- .build();
- final RxHttpClient cut = RxHttpClientFactory.create(ImmutableRxHttpClientConfig.builder()
- .retryConfig(defaultRetryConfig()
- .onRetryExhaustedException(ReadTimeoutException.INSTANCE)
- .retryableHttpResponseCodes(HashSet.of(500))
- .build())
- .build());
-
- // when
- final Mono<HttpResponse> response = cut.call(httpRequest);
-
- // then
- StepVerifier.create(response)
- .expectError(ReadTimeoutException.class)
+ .expectError(RetryableException.class)
.verify(TIMEOUT);
assertRetry();
}