[SDC-DISTRO-CLIENT] Add additional kafka consumer config 01/134001/4
authorefiacor <fiachra.corcoran@est.tech>
Tue, 4 Apr 2023 10:32:00 +0000 (11:32 +0100)
committerefiacor <fiachra.corcoran@est.tech>
Tue, 4 Apr 2023 12:08:10 +0000 (13:08 +0100)
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: Iecd443479258b31d3a4e38699eea0a6e0f423f05
Issue-ID: SDC-4465

docs/conf.yaml [deleted file]
pom.xml
sdc-distribution-ci/pom.xml
sdc-distribution-client/pom.xml
sdc-distribution-client/src/main/java/org/onap/sdc/api/consumer/IConfiguration.java
sdc-distribution-client/src/main/java/org/onap/sdc/impl/Configuration.java
sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java
sdc-distribution-client/src/test/java/org/onap/sdc/utils/SdcKafkaTest.java
sdc-distribution-client/src/test/java/org/onap/sdc/utils/TestConfiguration.java
version.properties

diff --git a/docs/conf.yaml b/docs/conf.yaml
deleted file mode 100644 (file)
index ab59281..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
----
-project_cfg: onap
-project: onap
-
-# Change this to ReleaseBranchName to modify the header
-default-version: latest
-#
diff --git a/pom.xml b/pom.xml
index 5bf46b2..069999c 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
 
        <groupId>org.onap.sdc.sdc-distribution-client</groupId>
        <artifactId>sdc-main-distribution-client</artifactId>
-       <version>2.0.1-SNAPSHOT</version>
+       <version>2.0.2-SNAPSHOT</version>
        <packaging>pom</packaging>
        <name>sdc-sdc-distribution-client</name>
 
index 48e0c9c..30bfa67 100644 (file)
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.onap.sdc.sdc-distribution-client</groupId>
     <artifactId>sdc-main-distribution-client</artifactId>
-        <version>2.0.1-SNAPSHOT</version>
+        <version>2.0.2-SNAPSHOT</version>
   </parent>
 
   <artifactId>sdc-distribution-ci</artifactId>
index e46c61b..eb72205 100644 (file)
@@ -6,7 +6,7 @@
     <parent>
         <groupId>org.onap.sdc.sdc-distribution-client</groupId>
         <artifactId>sdc-main-distribution-client</artifactId>
-        <version>2.0.1-SNAPSHOT</version>
+        <version>2.0.2-SNAPSHOT</version>
     </parent>
 
     <artifactId>sdc-distribution-client</artifactId>
index 184dca4..a8ce1c7 100644 (file)
@@ -66,6 +66,24 @@ public interface IConfiguration {
         }
     }
 
+    /**
+     * Kafka consumer max.poll.interval.ms
+     *
+     * @return Kafka max.poll.interval.ms. Default is 300 seconds
+     */
+    default int getKafkaConsumerMaxPollInterval() {
+        return 300;
+    }
+
+    /**
+     * Kafka consumer session.timeout.ms
+     *
+     * @return Kafka session.timeout.ms. Default is 45 seconds
+     */
+    default int getKafkaConsumerSessionTimeout() {
+        return 45;
+    }
+
     /**
      * User Name for SDC distribution consumer authentication.
      *
index 24f3225..dd67656 100644 (file)
@@ -30,6 +30,8 @@ public class Configuration implements IConfiguration {
     private final String kafkaSecurityProtocolConfig;
     private final String kafkaSaslMechanism;
     private final String kafkaSaslJaasConfig;
+    private final int kafkaConsumerMaxPollInterval;
+    private final int kafkaConsumerSessionTimeout;
     private String sdcStatusTopicName;
     private String sdcNotificationTopicName;
     private String sdcAddress;
@@ -77,6 +79,8 @@ public class Configuration implements IConfiguration {
         this.httpsProxyHost = other.getHttpsProxyHost();
         this.httpsProxyPort = other.getHttpsProxyPort();
         this.useSystemProxy = other.isUseSystemProxy();
+        this.kafkaConsumerMaxPollInterval = other.getKafkaConsumerMaxPollInterval();
+        this.kafkaConsumerSessionTimeout = other.getKafkaConsumerSessionTimeout();
     }
 
     @Override
@@ -99,6 +103,16 @@ public class Configuration implements IConfiguration {
         return kafkaSaslJaasConfig;
     }
 
+    @Override
+    public int getKafkaConsumerMaxPollInterval() {
+        return kafkaConsumerMaxPollInterval;
+    }
+
+    @Override
+    public int getKafkaConsumerSessionTimeout() {
+        return kafkaConsumerSessionTimeout;
+    }
+
     @Override
     public Boolean isUseHttpsWithSDC() {
         return useHttpsWithSDC;
index 91b41a9..f87b7aa 100644 (file)
@@ -61,6 +61,8 @@ public class SdcKafkaConsumer {
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID());
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
         props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configuration.getKafkaConsumerMaxPollInterval() * 1000);
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, configuration.getKafkaConsumerSessionTimeout() * 1000);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringDeserializer");
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         consumer = new KafkaConsumer<>(props);
index 744e9cc..2037be6 100644 (file)
@@ -31,11 +31,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junitpioneer.jupiter.SetEnvironmentVariable;
-import org.onap.sdc.api.consumer.IConfiguration;
 import org.onap.sdc.impl.Configuration;
 import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
 import org.onap.sdc.utils.kafka.SdcKafkaProducer;
@@ -75,6 +74,9 @@ class SdcKafkaTest {
         consumer.subscribe(topicName);
         consumer.poll();
 
+        Assertions.assertEquals(configuration.getKafkaConsumerMaxPollInterval(), 600);
+        Assertions.assertEquals(configuration.getKafkaConsumerSessionTimeout(), 50);
+
         SdcKafkaProducer producer = new SdcKafkaProducer(configuration);
         producer.send(topicName, "blah", "blah");
         producer.send(topicName, "blah", "blah");
index 86f29be..529124e 100644 (file)
@@ -38,6 +38,8 @@ public class TestConfiguration implements IConfiguration {
        private final String kafkaSecurityProtocolConfig;
        private final String kafkaSaslMechanism;
        private final String kafkaSaslJaasConfig;
+       private final int kafkaConsumerMaxPollInterval;
+       private final int kafkaConsumerSessionTimeout;
        private String keyStorePath;
        private String keyStorePassword;
        private boolean activateServerTLSAuth;
@@ -52,30 +54,6 @@ public class TestConfiguration implements IConfiguration {
        private String sdcStatusTopicName;
        private String sdcNotificationTopicName;
 
-       public TestConfiguration(IConfiguration other) {
-               this.sdcAddress = other.getSdcAddress();
-               this.comsumerID = other.getConsumerID();
-               this.consumerGroup = other.getConsumerGroup();
-               this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig();
-               this.kafkaSaslMechanism = other.getKafkaSaslMechanism();
-               this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig();
-               this.environmentName = other.getEnvironmentName();
-               this.password = other.getPassword();
-               this.pollingInterval = other.getPollingInterval();
-               this.pollingTimeout = other.getPollingTimeout();
-               this.relevantArtifactTypes = other.getRelevantArtifactTypes();
-               this.user = other.getUser();
-               this.keyStorePath = other.getKeyStorePath();
-               this.keyStorePassword = other.getKeyStorePassword();
-               this.activateServerTLSAuth = other.activateServerTLSAuth();
-               this.isFilterInEmptyResources = other.isFilterInEmptyResources();
-               this.httpProxyHost = other.getHttpProxyHost();
-               this.httpProxyPort = other.getHttpProxyPort();
-               this.httpsProxyHost = other.getHttpsProxyHost();
-               this.httpsProxyPort = other.getHttpsProxyPort();
-               this.useSystemProxy = other.isUseSystemProxy();
-       }
-
        public TestConfiguration() {
                this.sdcAddress = "localhost:8443";
                this.comsumerID = "mso-123456";
@@ -101,6 +79,8 @@ public class TestConfiguration implements IConfiguration {
                this.kafkaSecurityProtocolConfig = "SASL_PLAINTEXT";
                this.kafkaSaslMechanism = "PLAIN";
                this.kafkaSaslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;";
+               this.kafkaConsumerMaxPollInterval = 600;
+               this.kafkaConsumerSessionTimeout = 50;
                this.httpProxyHost = "proxy";
                this.httpProxyPort = 8080;
        }
@@ -125,6 +105,16 @@ public class TestConfiguration implements IConfiguration {
                return kafkaSaslJaasConfig;
        }
 
+       @Override
+       public int getKafkaConsumerMaxPollInterval() {
+               return kafkaConsumerMaxPollInterval;
+       }
+
+       @Override
+       public int getKafkaConsumerSessionTimeout() {
+               return kafkaConsumerSessionTimeout;
+       }
+
        @Override
        public String getUser() {
                return user;
index b81f487..4354315 100644 (file)
@@ -5,7 +5,7 @@
 
 major=2
 minor=0
-patch=1
+patch=2
 
 base_version=${major}.${minor}.${patch}