The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## [1.8.6] - 07/06/2021
+### Added
+ - [DCAEGEN2-2827] (https://jira.onap.org/browse/DCAEGEN2-2827) - Handle 429 error Too Many Requests
+
## [1.8.5] - 02/06/2021
### Added
- [DCAEGEN2-2752] (https://jira.onap.org/browse/DCAEGEN2-2752) - Update CBS-Client to read policy configuration from a file exposed by policy-sidecar container
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>sdk</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
<name>dcaegen2-services-sdk</name>
<description>Common SDK repo for all DCAE Services</description>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-rest-services</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-rest-services</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
.header("408 Request Timeout")
.text("Client timeout exception occurred, Error code is %1")
.messageId("SVC0001")
- .variables(Collections.singletonList("408")).build();
+ .variables(Collections.singletonList("408"))
+ .build();
public static final ClientErrorReason SERVICE_UNAVAILABLE = ImmutableClientErrorReason.builder()
.header("503 Service unavailable")
.messageId("SVC2001")
.build();
+ public static final ClientErrorReason CONNECTION_POLL_LIMIT = ImmutableClientErrorReason.builder()
+ .header("429 Too Many Requests")
+ .text("Pending acquire queue has reached its maximum size")
+ .messageId("SVC2000")
+ .variables(Collections.singletonList("429"))
+ .build();
}
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException;
import java.net.ConnectException;
import java.time.Duration;
e -> LOGGER.error("Timeout exception occurred when sending items to DMaaP MR", e))
.onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
.doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+ .onErrorResume(PoolAcquirePendingLimitException.class, e -> buildErrorResponse(ClientErrorReasons.CONNECTION_POLL_LIMIT))
.onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
.onErrorResume(RetryableException.class, e -> Mono.just(buildResponse(e.getResponse(), batch)));
}
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
import org.immutables.value.Value;
+import reactor.netty.resources.ConnectionProvider;
@Value.Immutable
public interface DmaapConnectionPoolConfig {
@Value.Default
default int connectionPool(){
- return 16;
+ return ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS;
}
@Value.Default
default int maxLifeTime(){
+ "}"
+ "}"
+ "}";
+ private static final String CONNECTION_POLL_LIMIT_MESSAGE = "429 Too Many Requests\n"
+ + "{"
+ + "\"requestError\":"
+ + "{"
+ + "\"serviceException\":"
+ + "{"
+ + "\"messageId\":\"SVC2000\","
+ + "\"text\":\"Pending acquire queue has reached its maximum size\","
+ + "\"variables\":[\"429\"]"
+ + "}"
+ + "}"
+ + "}";
private final MessageRouterPublisher publisher = DmaapClientFactory
.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
.withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1));
}
+ @Test
+ void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() {
+ //given
+ final String topic = "TOPIC17";
+ final String topicUrl = String.format("%s/%s", PROXY_MOCK_EVENTS_PATH, topic);
+
+ final List<String> twoJsonMessages = List.of("{\"message\":\"message1\"}",
+ "{\"differentMessage\":\"message2\"}");
+ final Flux<JsonElement> plainBatch = plainBatch(twoJsonMessages);
+
+ final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl, Duration.ofSeconds(1));
+
+ final MessageRouterPublishResponse expectedResponse = errorPublishResponse(
+ CONNECTION_POLL_LIMIT_MESSAGE);
+
+ final String path = String.format("/events/%s", topic);
+
+ //maxConnectionPoll + pendingAcquireMaxCount(default 2*maxConnectionPoll)
+ final int maxNumberOfConcurrentRequest = 3;
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.exactly(maxNumberOfConcurrentRequest))
+ .respond(response().withStatusCode(429).withDelay(TimeUnit.SECONDS,1));
+
+ MOCK_SERVER_CLIENT
+ .when(request().withPath(path), Times.once())
+ .respond(response().withStatusCode(200));
+
+ final MessageRouterPublisher publisher = DmaapClientFactory.createMessageRouterPublisher(connectionPoolConfiguration());
+
+ //when
+ final Flux<MessageRouterPublishResponse> result = publisher.put(publishRequest, plainBatch);
+
+ for(int i = 0; i < maxNumberOfConcurrentRequest; i++) {
+ publisher.put(publishRequest, plainBatch).subscribe();
+ }
+
+ //then
+ StepVerifier.create(result)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify();
+ }
private MessageRouterPublisherConfig retryConfig(int retryInterval, int retryCount) {
return ImmutableMessageRouterPublisherConfig.builder()
private MessageRouterPublisherConfig connectionPoolConfiguration() {
return ImmutableMessageRouterPublisherConfig.builder()
.connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
- .connectionPool(10)
+ .connectionPool(1)
.maxIdleTime(10)
.maxLifeTime(20)
.build())
private MessageRouterPublisherConfig connectionPoolAndRetryConfiguration() {
return ImmutableMessageRouterPublisherConfig.builder()
.connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
- .connectionPool(10)
+ .connectionPool(1)
.maxIdleTime(10)
.maxLifeTime(20)
.build())
private static final String FAILING_WITH_403_RESP_PATH = "/events/TOPIC403";
private static final String FAILING_WITH_404_RESP_PATH = "/events/TOPIC404";
private static final String FAILING_WITH_500_RESP_PATH = "/events/TOPIC500";
+ private static final String FAILING_WITH_429_RESP_PATH = "/events/TOPIC429";
private static final Duration TIMEOUT = Duration.ofSeconds(10);
private static final Flux<JsonPrimitive> messageBatch = Flux.just("ala", "ma", "kota")
.map(JsonPrimitive::new);
.post(FAILING_WITH_403_RESP_PATH, (req, resp) -> sendError(resp, 403, ERROR_MESSAGE))
.post(FAILING_WITH_404_RESP_PATH, (req, resp) -> sendError(resp, 404, ERROR_MESSAGE))
.post(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE))
+ .post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
);
}
FAILING_WITH_401_RESP_PATH + "," + "401 Unauthorized",
FAILING_WITH_403_RESP_PATH + "," + "403 Forbidden",
FAILING_WITH_404_RESP_PATH + "," + "404 Not Found",
- FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error"
+ FAILING_WITH_500_RESP_PATH + "," + "500 Internal Server Error",
+ FAILING_WITH_429_RESP_PATH + "," + "429 Too Many Requests"
})
void publisher_shouldHandleError(String failingPath, String failReason) {
//given
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-rest-services</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-rest-services</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<parent>
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>sdk</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk.security</groupId>
<artifactId>dcaegen2-services-sdk-security</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>sdk</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.services.sdk.security</groupId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk.security</groupId>
<artifactId>dcaegen2-services-sdk-security</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<artifactId>ssl</artifactId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-services</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<artifactId>dcaegen2-services-sdk-services-common</artifactId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-services</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<artifactId>dcaegen2-services-sdk-services-external-schema-manager</artifactId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-services</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<artifactId>dcaegen2-services-sdk-services-hvvesclient</artifactId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>hvvesclient-producer</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<artifactId>hvvesclient-producer-api</artifactId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>hvvesclient-producer</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<artifactId>hvvesclient-producer-ct</artifactId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>hvvesclient-producer</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<artifactId>hvvesclient-producer-impl</artifactId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-services-hvvesclient</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<artifactId>hvvesclient-producer</artifactId>
<parent>
<artifactId>dcaegen2-services-sdk-services-hvvesclient</artifactId>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<name>High Volume VES Collector Client :: Protobuf</name>
<parent>
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>sdk</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<parent>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>dcaegen2-services-sdk-standardization</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<parent>
<artifactId>dcaegen2-sdk-moher-api</artifactId>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<name>Monitoring and Healthcheck :: Health state</name>
<parent>
<artifactId>dcaegen2-sdk-moher-api</artifactId>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<name>Monitoring and Healthcheck :: Metrics</name>
<parent>
<artifactId>dcaegen2-services-sdk-standardization</artifactId>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<name>Monitoring and Healthcheck</name>
<parent>
<artifactId>dcaegen2-sdk-moher-api</artifactId>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<name>Monitoring and Healthcheck :: Server Adapters</name>
<parent>
<artifactId>dcaegen2-sdk-moher-server-adapters</artifactId>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<name>Monitoring and Healthcheck :: Server Adapters :: Reactor Netty</name>
<parent>
<artifactId>dcaegen2-sdk-moher-server-adapters</artifactId>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
</parent>
<name>Monitoring and Healthcheck :: Server Adapters :: Spring Webflux</name>
<parent>
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>sdk</artifactId>
- <version>1.8.5-SNAPSHOT</version>
+ <version>1.8.6-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
major=1
minor=8
-patch=5
+patch=6
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT