Flushing should not fail distribution 22/143422/4
authorUlka Ingale <ulka.mahadeo-ingale@capgemini.com>
Mon, 2 Mar 2026 13:29:47 +0000 (14:29 +0100)
committerUlka Ingale <ulka.mahadeo-ingale@capgemini.com>
Tue, 3 Mar 2026 06:33:52 +0000 (07:33 +0100)
- Fix NotificationSender logic
- Ensure correct handling of Kafka notification
- Improve stability and error handling
- Code refactored and handled test cases
- Bump version to 2.2.1

Issue-ID: SDC-4795
Change-Id: I486aae3fe796d9706102c66b85e894aee5a6c5ae
Signed-off-by: Ulka Ingale <ulka.mahadeo-ingale@capgemini.com>
docs/tox.ini
pom.xml
sdc-distribution-ci/pom.xml
sdc-distribution-client-api/pom.xml
sdc-distribution-client/pom.xml
sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java
sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java
version.properties

index 46075fa..096294e 100644 (file)
@@ -7,7 +7,6 @@ skipsdist = true
 basepython = python3.8
 deps =
     -r{toxinidir}/requirements-docs.txt
-    -chttps://releases.openstack.org/constraints/upper/yoga
     -chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master
 commands =
     sphinx-build -W -q -b html -n -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/html
@@ -16,7 +15,6 @@ commands =
 basepython = python3.8
 deps =
     -r{toxinidir}/requirements-docs.txt
-    -chttps://releases.openstack.org/constraints/upper/yoga
     -chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master
 commands =
     sphinx-build -W -q -b linkcheck -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/linkcheck
@@ -25,7 +23,6 @@ commands =
 basepython = python3.8
 deps =
     -r{toxinidir}/requirements-docs.txt
-    -chttps://releases.openstack.org/constraints/upper/yoga
     -chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt?h=master
 commands =
     sphinx-build -W -q -b spelling -d {envtmpdir}/doctrees {toxinidir} {toxinidir}/_build/spellcheck
diff --git a/pom.xml b/pom.xml
index 09a03b6..b6f69be 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
 
        <groupId>org.onap.sdc.sdc-distribution-client</groupId>
        <artifactId>sdc-main-distribution-client</artifactId>
-       <version>2.2.0-SNAPSHOT</version>
+       <version>2.2.1-SNAPSHOT</version>
        <packaging>pom</packaging>
        <name>sdc-sdc-distribution-client</name>
 
index 2144063..59af6d0 100644 (file)
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.onap.sdc.sdc-distribution-client</groupId>
     <artifactId>sdc-main-distribution-client</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>2.2.1-SNAPSHOT</version>
   </parent>
 
   <artifactId>sdc-distribution-ci</artifactId>
index 724abcb..89c83ed 100644 (file)
@@ -6,7 +6,7 @@
     <parent>\r
         <groupId>org.onap.sdc.sdc-distribution-client</groupId>\r
         <artifactId>sdc-main-distribution-client</artifactId>\r
-        <version>2.2.0-SNAPSHOT</version>\r
+        <version>2.2.1-SNAPSHOT</version>\r
     </parent>\r
 \r
     <name>sdc-distribution-client-api</name>\r
index 732d659..1447902 100644 (file)
@@ -6,7 +6,7 @@
     <parent>
         <groupId>org.onap.sdc.sdc-distribution-client</groupId>
         <artifactId>sdc-main-distribution-client</artifactId>
-        <version>2.2.0-SNAPSHOT</version>
+        <version>2.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>sdc-distribution-client</artifactId>
@@ -26,7 +26,7 @@
         <dependency>
             <groupId>org.onap.sdc.sdc-distribution-client</groupId>
             <artifactId>sdc-distribution-client-api</artifactId>
-            <version>2.2.0-SNAPSHOT</version>
+            <version>2.2.1-SNAPSHOT</version>
         </dependency>
         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
         <dependency>
index 6638cea..fe7e9ba 100644 (file)
 
 package org.onap.sdc.utils;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.onap.sdc.api.results.DistributionActionResultEnum;
 import org.onap.sdc.api.results.IDistributionClientResult;
@@ -32,7 +37,6 @@ import org.slf4j.LoggerFactory;
 public class NotificationSender {
 
     private static final Logger log = LoggerFactory.getLogger(NotificationSender.class);
-    private static final long SLEEP_TIME = 1;
     private final SdcKafkaProducer producer;
 
     public NotificationSender(SdcKafkaProducer producer) {
@@ -42,27 +46,40 @@ public class NotificationSender {
     public IDistributionClientResult send(String topic, String status) {
         log.info("DistributionClient - sendStatus");
         DistributionClientResultImpl distributionResult;
+        boolean sendFailed = false;
         try {
             log.debug("Publisher server list: {}", producer.getMsgBusAddresses());
-            log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName());
-            producer.send(topic, "MyPartitionKey", status);
-            TimeUnit.SECONDS.sleep(SLEEP_TIME);
-        } catch (KafkaException | InterruptedException e) {
-            log.error("DistributionClient - sendStatus. Failed to send status", e);
-        } finally {
-            distributionResult = closeProducer();
-        }
-        return distributionResult;
-    }
+            log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName());    
+            Future<RecordMetadata> future = producer.send(topic, "MyPartitionKey", status);
+            RecordMetadata md = future.get(10, TimeUnit.SECONDS);
+            log.debug("Kafka ack received. topic={}, partition={}, offset={}, ts={}",
+                  md.topic(), md.partition(), md.offset(), md.timestamp());
+
+        } catch ( InterruptedException ie) {         
+            Thread.currentThread().interrupt();
+            sendFailed = true;
+            log.error("DistributionClient - sendStatus interrupted while waiting for Kafka ack", ie);
+        } catch (TimeoutException | ExecutionException | KafkaException e) {
+            sendFailed = true;
+            log.error("DistributionClient - sendStatus failed to send to Kafka", e);
+       }  finally {
+            
+                try {
+                        producer.flush();
+                    } catch (KafkaException e) {
+                        log.error("DistributionClient - flush encountered an error", e);
+                    }
+                
+                if (sendFailed) {
+                            distributionResult = new DistributionClientResultImpl(
+                                        DistributionActionResultEnum.FAIL, "Failed to send status");
+                        } else {
+                            distributionResult = new DistributionClientResultImpl(
+                                    DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+                        }
 
-    private DistributionClientResultImpl closeProducer() {
-        DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
-        try {
-            producer.flush();
-            distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
-        } catch (KafkaException | IllegalArgumentException e) {
-            log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e);
         }
         return distributionResult;
     }
+
 }
index 3f4b545..5cba31b 100644 (file)
 
 package org.onap.sdc.utils;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Matchers.anyString;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import nl.altindag.log.LogCaptor;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.junit.jupiter.api.Test;
 import org.onap.sdc.api.results.DistributionActionResultEnum;
@@ -42,14 +45,15 @@ class NotificationSenderTest {
 
     private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS,
         "Messages successfully sent");
-    private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
     private final SdcKafkaProducer producer = mock(SdcKafkaProducer.class);
     private final NotificationSender validNotificationSender = new NotificationSender(producer);
 
     @Test
     void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() {
-        //given
-        when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class));
+        //given       
+        RecordMetadata md = mock(RecordMetadata.class);
+        Future<RecordMetadata> future = CompletableFuture.completedFuture(md);
+        when(producer.send(anyString(), anyString(), anyString())).thenReturn(future);
 
         //when
         IDistributionClientResult result = validNotificationSender.send("mytopic", status);
@@ -59,18 +63,26 @@ class NotificationSenderTest {
     }
 
     @Test
-    void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() {
-        //given
-        when(producer.send(anyString(), anyString(), anyString())).thenReturn(mock(Future.class));
-        doThrow(KafkaException.class)
+    void shouldReturnSuccessWhenSendSucceedsEvenIfFlushFails() throws Exception {
+        //given 
+        RecordMetadata md = mock(RecordMetadata.class);
+        Future<RecordMetadata> future = CompletableFuture.completedFuture(md);
+        when(producer.send(anyString(), anyString(), anyString())).thenReturn(future);      
+        doThrow(new KafkaException("flush failed"))
             .when(producer)
             .flush();
 
-        //when
+        
+       //when
         IDistributionClientResult result = validNotificationSender.send("mytopic", status);
 
-        //then
-        assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+        // then
+        // As send() already succeeded and Kafka acknowledged the message, a failed flush should NOT cause a failure.
+        // flush() is best-effort; failing it does not mean the notification failed.
+
+        assertEquals(
+            DistributionActionResultEnum.SUCCESS,result.getDistributionActionResult()
+        );
     }
 
     @Test
@@ -78,13 +90,19 @@ class NotificationSenderTest {
         LogCaptor logCaptor = LogCaptor.forClass(NotificationSender.class);
 
         //given
-        when(producer.send(anyString(), anyString(), anyString())).thenThrow(new KafkaException());
+        when(producer.send(anyString(), anyString(), anyString())).thenThrow(new KafkaException("send failed"));
 
         //when
         validNotificationSender.send("mytopic", status);
 
         //then
-        assertThat(logCaptor.getLogs()).contains("DistributionClient - sendStatus. Failed to send status");
+
+        assertTrue(
+                logCaptor.getLogs().stream().anyMatch(
+                        msg -> msg.contains("DistributionClient - sendStatus failed to send to Kafka")
+                ),
+                "Expected log message not found"
+        );
     }
 
 }
index d9cd58b..d17bbdf 100644 (file)
@@ -5,7 +5,7 @@
 
 major=2
 minor=2
-patch=0
+patch=1
 
 base_version=${major}.${minor}.${patch}