basepython = python3.8
deps =
-r{toxinidir}/requirements-docs.txt
- -chttps://releases.openstack.org/constraints/upper/yoga
-chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master
commands =
sphinx-build -W -q -b html -n -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/html
basepython = python3.8
deps =
-r{toxinidir}/requirements-docs.txt
- -chttps://releases.openstack.org/constraints/upper/yoga
-chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master
commands =
sphinx-build -W -q -b linkcheck -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/linkcheck
basepython = python3.8
deps =
-r{toxinidir}/requirements-docs.txt
- -chttps://releases.openstack.org/constraints/upper/yoga
-chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master
commands =
sphinx-build -W -q -b spelling -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/spellcheck
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.2.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>sdc-sdc-distribution-client</name>
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.2.1-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-ci</artifactId>
<parent>\r
<groupId>org.onap.sdc.sdc-distribution-client</groupId>\r
<artifactId>sdc-main-distribution-client</artifactId>\r
- <version>2.2.0-SNAPSHOT</version>\r
+ <version>2.2.1-SNAPSHOT</version>\r
</parent>\r
\r
<name>sdc-distribution-client-api</name>\r
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.2.1-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-client</artifactId>
<dependency>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-distribution-client-api</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.2.1-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
package org.onap.sdc.utils;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.onap.sdc.api.results.DistributionActionResultEnum;
import org.onap.sdc.api.results.IDistributionClientResult;
public class NotificationSender {
private static final Logger log = LoggerFactory.getLogger(NotificationSender.class);
- private static final long SLEEP_TIME = 1;
private final SdcKafkaProducer producer;
public NotificationSender(SdcKafkaProducer producer) {
public IDistributionClientResult send(String topic, String status) {
log.info("DistributionClient - sendStatus");
DistributionClientResultImpl distributionResult;
+ boolean sendFailed = false;
try {
log.debug("Publisher server list: {}", producer.getMsgBusAddresses());
- log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName());
- producer.send(topic, "MyPartitionKey", status);
- TimeUnit.SECONDS.sleep(SLEEP_TIME);
- } catch (KafkaException | InterruptedException e) {
- log.error("DistributionClient - sendStatus. Failed to send status", e);
- } finally {
- distributionResult = closeProducer();
- }
- return distributionResult;
- }
+ log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName());
+ Future<RecordMetadata> future = producer.send(topic, "MyPartitionKey", status);
+ RecordMetadata md = future.get(10, TimeUnit.SECONDS);
+ log.debug("Kafka ack received. topic={}, partition={}, offset={}, ts={}",
+ md.topic(), md.partition(), md.offset(), md.timestamp());
+
+ } catch ( InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ sendFailed = true;
+ log.error("DistributionClient - sendStatus interrupted while waiting for Kafka ack", ie);
+ } catch (TimeoutException | ExecutionException | KafkaException e) {
+ sendFailed = true;
+ log.error("DistributionClient - sendStatus failed to send to Kafka", e);
+ } finally {
+
+ try {
+ producer.flush();
+ } catch (KafkaException e) {
+ log.error("DistributionClient - flush encountered an error", e);
+ }
+
+ if (sendFailed) {
+ distributionResult = new DistributionClientResultImpl(
+ DistributionActionResultEnum.FAIL, "Failed to send status");
+ } else {
+ distributionResult = new DistributionClientResultImpl(
+ DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+ }
- private DistributionClientResultImpl closeProducer() {
- DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
- try {
- producer.flush();
- distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
- } catch (KafkaException | IllegalArgumentException e) {
- log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e);
}
return distributionResult;
}
+
}
package org.onap.sdc.utils;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Matchers.anyString;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import nl.altindag.log.LogCaptor;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.junit.jupiter.api.Test;
import org.onap.sdc.api.results.DistributionActionResultEnum;
private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
"Messages successfully sent");
- private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
private final SdcKafkaProducer producer = mock(SdcKafkaProducer.class);
private final NotificationSender validNotificationSender = new NotificationSender(producer);
@Test
void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() {
- //given
- when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class));
+ //given
+ RecordMetadata md = mock(RecordMetadata.class);
+ Future<RecordMetadata> future = CompletableFuture.completedFuture(md);
+ when(producer.send(anyString(), anyString(), anyString())).thenReturn(future);
//when
IDistributionClientResult result = validNotificationSender.send("mytopic", status);
}
@Test
- void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() {
- //given
- when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class));
- doThrow(KafkaException.class)
+ void shouldReturnSuccessWhenSendSucceedsEvenIfFlushFails() throws Exception {
+ //given
+ RecordMetadata md = mock(RecordMetadata.class);
+ Future<RecordMetadata> future = CompletableFuture.completedFuture(md);
+ when(producer.send(anyString(), anyString(), anyString())).thenReturn(future);
+ doThrow(new KafkaException("flush failed"))
.when(producer)
.flush();
- //when
+
+ //when
IDistributionClientResult result = validNotificationSender.send("mytopic", status);
- //then
- assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ // then
+ // As send() already succeeded and Kafka acknowledged the message, a failed flush should NOT cause a failure.
+ // flush() is best-effort; failing it does not mean the notification failed.
+
+ assertEquals(
+ DistributionActionResultEnum.SUCCESS,result.getDistributionActionResult()
+ );
}
@Test
LogCaptor logCaptor = LogCaptor.forClass(NotificationSender.class);
//given
- when(producer.send(anyString(), anyString(), anyString())).thenThrow(new KafkaException());
+ when(producer.send(anyString(), anyString(), anyString())).thenThrow(new KafkaException("send failed"));
//when
validNotificationSender.send("mytopic", status);
//then
- assertThat(logCaptor.getLogs()).contains("DistributionClient - sendStatus. Failed to send status");
+
+ assertTrue(
+ logCaptor.getLogs().stream().anyMatch(
+ msg -> msg.contains("DistributionClient - sendStatus failed to send to Kafka")
+ ),
+ "Expected log message not found"
+ );
}
}
major=2
minor=2
-patch=0
+patch=1
base_version=${major}.${minor}.${patch}