import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockserver.client.MockServerClient;
-import org.mockserver.matchers.TimeToLive;
import org.mockserver.matchers.Times;
import org.mockserver.verify.VerificationTimes;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
.respond(response().withStatusCode(404));
- final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+ retryConfig(1, 1));
//when
final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
final String path = String.format("/events/%s", topic);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 10));
- final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig());
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(
+ retryConfig(1, 1));
//when
final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
- private MessageRouterPublisherConfig retryConfig() {
+ @Test
+ void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
+ final String topic = "TOPIC13";
+ final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(twoJsonMessages);
+ final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+ final MessageRouterPublishResponse expectedResponse = successPublishResponse(expectedItems);
+
+ final String path = String.format("/events/%s", topic);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500));
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(retryConfig(1, 5));
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+ }
+
+ private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterPublisherConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
- .retryIntervalInSeconds(1)
- .retryCount(1)
+ .retryIntervalInSeconds(retryInterval)
+ .retryCount(retryCount)
.build())
.build();
}
final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 5));
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
//when
Mono<MessageRouterSubscribeResponse> response = subscriber
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
.respond(response().withStatusCode(404));
- final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
//when
registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
MOCK_SERVER_CLIENT
.when(request().withPath(path), Times.once())
- .respond(response().withDelay(TimeUnit.SECONDS, 10));
- final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(retryConfig());
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 1));
//when
registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
}
- private MessageRouterSubscriberConfig retryConfig() {
+ @Test
+ void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
+ //given
+ final String topic = "TOPIC8";
+ final String proxyTopicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+ final String topicUrl = String.format("%s/%s", EVENTS_PATH, topic);
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(proxyTopicUrl);
+ final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(
+ proxyTopicUrl, CONSUMER_GROUP, CONSUMER_ID, Duration.ofSeconds(1));
+
+ final List<String> singleJsonMessage = List.of("{\"message\":\"message1\"}");
+ final List<JsonElement> expectedItems = getAsJsonElements(singleJsonMessage);
+ final Flux<JsonObject> jsonMessageBatch = jsonBatch(singleJsonMessage);
+ final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
+
+ final String path = String.format("/events/%s/%s/%s", topic, CONSUMER_GROUP, CONSUMER_ID);
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(404));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withDelay(TimeUnit.SECONDS, 2));
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(500));
+ final MessageRouterSubscriber subscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ retryConfig(1, 5));
+
+ //when
+ registerTopic(publisher, createPublishRequest(topicUrl), subscriber,
+ createMRSubscribeRequest(topicUrl, CONSUMER_GROUP, CONSUMER_ID));
+ Mono<MessageRouterSubscribeResponse> response = publisher
+ .put(publishRequest, jsonMessageBatch)
+ .then(subscriber.get(subscribeRequest));
+
+ //then
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+
+ MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
+ }
+
+ private MessageRouterSubscriberConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterSubscriberConfig.builder()
.retryConfig(ImmutableDmaapRetryConfig.builder()
- .retryIntervalInSeconds(1)
- .retryCount(1)
+ .retryIntervalInSeconds(retryInterval)
+ .retryCount(retryCount)
.build())
.build();
}
* ============LICENSE_START====================================
* DCAEGEN2-SERVICES-SDK
* =========================================================
- * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
* =========================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
+import reactor.netty.ByteBufMono;
/**
* @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
}
static RequestBody fromString(String contents, Charset charset) {
- ByteBuf encodedContents = ByteBufAllocator.DEFAULT.buffer();
- encodedContents.writeCharSequence(contents, charset);
-
return ImmutableRequestBody.builder()
- .length(encodedContents.readableBytes())
- .contents(Mono.just(encodedContents.retain()))
+ .length(contents.length())
+ .contents(ByteBufMono.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT))
.build();
}
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClient.ResponseReceiver;
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.TRANSFER_ENCODING_TYPE, HttpHeaders.CHUNKED))
.request(request.method().asNetty())
- .send(Flux.from(request.body().contents()))
+ .send(request.body().contents())
.uri(request.url());
}
return theClient
.headers(hdrs -> hdrs.set(HttpHeaders.CONTENT_LENGTH, request.body().length().toString()))
.request(request.method().asNetty())
- .send(Flux.from(request.body().contents()))
+ .send(request.body().contents())
.uri(request.url());
}