From: Ulka Ingale Date: Mon, 2 Mar 2026 13:29:47 +0000 (+0100) Subject: Flushing should not fail distribution X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=7fdd8c6640d62a0d1a3c7e0945337438edb2182c;p=sdc%2Fsdc-distribution-client.git Flushing should not fail distribution - Fix NotificationSender logic - Ensure correct handling of Kafka notification - Improve stability and error handling - Code refactored and handled test cases - Bump version to 2.2.1 Issue-ID: SDC-4795 Change-Id: I486aae3fe796d9706102c66b85e894aee5a6c5ae Signed-off-by: Ulka Ingale --- diff --git a/docs/tox.ini b/docs/tox.ini index 46075fa..096294e 100644 --- a/docs/tox.ini +++ b/docs/tox.ini @@ -7,7 +7,6 @@ skipsdist = true basepython = python3.8 deps = -r{toxinidir}/requirements-docs.txt - -chttps://releases.openstack.org/constraints/upper/yoga -chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master commands = sphinx-build -W -q -b html -n -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/html @@ -16,7 +15,6 @@ commands = basepython = python3.8 deps = -r{toxinidir}/requirements-docs.txt - -chttps://releases.openstack.org/constraints/upper/yoga -chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master commands = sphinx-build -W -q -b linkcheck -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/linkcheck @@ -25,7 +23,6 @@ commands = basepython = python3.8 deps = -r{toxinidir}/requirements-docs.txt - -chttps://releases.openstack.org/constraints/upper/yoga -chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master commands = sphinx-build -W -q -b spelling -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/spellcheck diff --git a/pom.xml b/pom.xml index 09a03b6..b6f69be 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.onap.sdc.sdc-distribution-client sdc-main-distribution-client - 2.2.0-SNAPSHOT + 2.2.1-SNAPSHOT pom sdc-sdc-distribution-client diff --git a/sdc-distribution-ci/pom.xml b/sdc-distribution-ci/pom.xml index 2144063..59af6d0 100644 --- a/sdc-distribution-ci/pom.xml +++ b/sdc-distribution-ci/pom.xml @@ -7,7 +7,7 @@ org.onap.sdc.sdc-distribution-client sdc-main-distribution-client - 2.2.0-SNAPSHOT + 2.2.1-SNAPSHOT sdc-distribution-ci diff --git a/sdc-distribution-client-api/pom.xml b/sdc-distribution-client-api/pom.xml index 724abcb..89c83ed 100644 --- a/sdc-distribution-client-api/pom.xml +++ b/sdc-distribution-client-api/pom.xml @@ -6,7 +6,7 @@ org.onap.sdc.sdc-distribution-client sdc-main-distribution-client - 2.2.0-SNAPSHOT + 2.2.1-SNAPSHOT sdc-distribution-client-api diff --git a/sdc-distribution-client/pom.xml b/sdc-distribution-client/pom.xml index 732d659..1447902 100644 --- a/sdc-distribution-client/pom.xml +++ b/sdc-distribution-client/pom.xml @@ -6,7 +6,7 @@ org.onap.sdc.sdc-distribution-client sdc-main-distribution-client - 2.2.0-SNAPSHOT + 2.2.1-SNAPSHOT sdc-distribution-client @@ -26,7 +26,7 @@ org.onap.sdc.sdc-distribution-client sdc-distribution-client-api - 2.2.0-SNAPSHOT + 2.2.1-SNAPSHOT diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java index 6638cea..fe7e9ba 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java @@ -20,7 +20,12 @@ package org.onap.sdc.utils; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.onap.sdc.api.results.DistributionActionResultEnum; import org.onap.sdc.api.results.IDistributionClientResult; @@ -32,7 +37,6 @@ import org.slf4j.LoggerFactory; public class NotificationSender { private static final Logger log = LoggerFactory.getLogger(NotificationSender.class); - private static final long SLEEP_TIME = 1; private final SdcKafkaProducer producer; public NotificationSender(SdcKafkaProducer producer) { @@ -42,27 +46,40 @@ public class NotificationSender { public IDistributionClientResult send(String topic, String status) { log.info("DistributionClient - sendStatus"); DistributionClientResultImpl distributionResult; + boolean sendFailed = false; try { log.debug("Publisher server list: {}", producer.getMsgBusAddresses()); - log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName()); - producer.send(topic, "MyPartitionKey", status); - TimeUnit.SECONDS.sleep(SLEEP_TIME); - } catch (KafkaException | InterruptedException e) { - log.error("DistributionClient - sendStatus. Failed to send status", e); - } finally { - distributionResult = closeProducer(); - } - return distributionResult; - } + log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName()); + Future future = producer.send(topic, "MyPartitionKey", status); + RecordMetadata md = future.get(10, TimeUnit.SECONDS); + log.debug("Kafka ack received. topic={}, partition={}, offset={}, ts={}", + md.topic(), md.partition(), md.offset(), md.timestamp()); + + } catch ( InterruptedException ie) { + Thread.currentThread().interrupt(); + sendFailed = true; + log.error("DistributionClient - sendStatus interrupted while waiting for Kafka ack", ie); + } catch (TimeoutException | ExecutionException | KafkaException e) { + sendFailed = true; + log.error("DistributionClient - sendStatus failed to send to Kafka", e); + } finally { + + try { + producer.flush(); + } catch (KafkaException e) { + log.error("DistributionClient - flush encountered an error", e); + } + + if (sendFailed) { + distributionResult = new DistributionClientResultImpl( + DistributionActionResultEnum.FAIL, "Failed to send status"); + } else { + distributionResult = new DistributionClientResultImpl( + DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); + } - private DistributionClientResultImpl closeProducer() { - DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); - try { - producer.flush(); - distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); - } catch (KafkaException | IllegalArgumentException e) { - log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e); } return distributionResult; } + } diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java index 3f4b545..5cba31b 100644 --- a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java @@ -20,15 +20,18 @@ package org.onap.sdc.utils; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Matchers.anyString; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import nl.altindag.log.LogCaptor; + +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.junit.jupiter.api.Test; import org.onap.sdc.api.results.DistributionActionResultEnum; @@ -42,14 +45,15 @@ class NotificationSenderTest { private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); - private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); private final SdcKafkaProducer producer = mock(SdcKafkaProducer.class); private final NotificationSender validNotificationSender = new NotificationSender(producer); @Test void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() { - //given - when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class)); + //given + RecordMetadata md = mock(RecordMetadata.class); + Future future = CompletableFuture.completedFuture(md); + when(producer.send(anyString(), anyString(), anyString())).thenReturn(future); //when IDistributionClientResult result = validNotificationSender.send("mytopic", status); @@ -59,18 +63,26 @@ class NotificationSenderTest { } @Test - void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() { - //given - when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class)); - doThrow(KafkaException.class) + void shouldReturnSuccessWhenSendSucceedsEvenIfFlushFails() throws Exception { + //given + RecordMetadata md = mock(RecordMetadata.class); + Future future = CompletableFuture.completedFuture(md); + when(producer.send(anyString(), anyString(), anyString())).thenReturn(future); + doThrow(new KafkaException("flush failed")) .when(producer) .flush(); - //when + + //when IDistributionClientResult result = validNotificationSender.send("mytopic", status); - //then - assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); + // then + // As send() already succeeded and Kafka acknowledged the message, a failed flush should NOT cause a failure. + // flush() is best-effort; failing it does not mean the notification failed. + + assertEquals( + DistributionActionResultEnum.SUCCESS,result.getDistributionActionResult() + ); } @Test @@ -78,13 +90,19 @@ class NotificationSenderTest { LogCaptor logCaptor = LogCaptor.forClass(NotificationSender.class); //given - when(producer.send(anyString(), anyString(), anyString())).thenThrow(new KafkaException()); + when(producer.send(anyString(), anyString(), anyString())).thenThrow(new KafkaException("send failed")); //when validNotificationSender.send("mytopic", status); //then - assertThat(logCaptor.getLogs()).contains("DistributionClient - sendStatus. Failed to send status"); + + assertTrue( + logCaptor.getLogs().stream().anyMatch( + msg -> msg.contains("DistributionClient - sendStatus failed to send to Kafka") + ), + "Expected log message not found" + ); } } diff --git a/version.properties b/version.properties index d9cd58b..d17bbdf 100644 --- a/version.properties +++ b/version.properties @@ -5,7 +5,7 @@ major=2 minor=2 -patch=0 +patch=1 base_version=${major}.${minor}.${patch}