+++ /dev/null
----
-project_cfg: onap
-project: onap
-
-# Change this to ReleaseBranchName to modify the header
-default-version: latest
-#
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.0.1-SNAPSHOT</version>
+ <version>2.0.2-SNAPSHOT</version>
<packaging>pom</packaging>
<name>sdc-sdc-distribution-client</name>
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.0.1-SNAPSHOT</version>
+ <version>2.0.2-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-ci</artifactId>
<parent>
<groupId>org.onap.sdc.sdc-distribution-client</groupId>
<artifactId>sdc-main-distribution-client</artifactId>
- <version>2.0.1-SNAPSHOT</version>
+ <version>2.0.2-SNAPSHOT</version>
</parent>
<artifactId>sdc-distribution-client</artifactId>
}
}
+ /**
+ * Kafka consumer max.poll.interval.ms
+ *
+ * @return Kafka max.poll.interval.ms. Default is 300 seconds
+ */
+ default int getKafkaConsumerMaxPollInterval() {
+ return 300;
+ }
+
+ /**
+ * Kafka consumer session.timeout.ms
+ *
+ * @return Kafka session.timeout.ms. Default is 45 seconds
+ */
+ default int getKafkaConsumerSessionTimeout() {
+ return 45;
+ }
+
/**
* User Name for SDC distribution consumer authentication.
*
private final String kafkaSecurityProtocolConfig;
private final String kafkaSaslMechanism;
private final String kafkaSaslJaasConfig;
+ private final int kafkaConsumerMaxPollInterval;
+ private final int kafkaConsumerSessionTimeout;
private String sdcStatusTopicName;
private String sdcNotificationTopicName;
private String sdcAddress;
this.httpsProxyHost = other.getHttpsProxyHost();
this.httpsProxyPort = other.getHttpsProxyPort();
this.useSystemProxy = other.isUseSystemProxy();
+ this.kafkaConsumerMaxPollInterval = other.getKafkaConsumerMaxPollInterval();
+ this.kafkaConsumerSessionTimeout = other.getKafkaConsumerSessionTimeout();
}
@Override
return kafkaSaslJaasConfig;
}
+ @Override
+ public int getKafkaConsumerMaxPollInterval() {
+ return kafkaConsumerMaxPollInterval;
+ }
+
+ @Override
+ public int getKafkaConsumerSessionTimeout() {
+ return kafkaConsumerSessionTimeout;
+ }
+
@Override
public Boolean isUseHttpsWithSDC() {
return useHttpsWithSDC;
props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configuration.getKafkaConsumerMaxPollInterval() * 1000);
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, configuration.getKafkaConsumerSessionTimeout() * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
import java.util.List;
import java.util.Properties;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junitpioneer.jupiter.SetEnvironmentVariable;
-import org.onap.sdc.api.consumer.IConfiguration;
import org.onap.sdc.impl.Configuration;
import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
import org.onap.sdc.utils.kafka.SdcKafkaProducer;
consumer.subscribe(topicName);
consumer.poll();
+ Assertions.assertEquals(configuration.getKafkaConsumerMaxPollInterval(), 600);
+ Assertions.assertEquals(configuration.getKafkaConsumerSessionTimeout(), 50);
+
SdcKafkaProducer producer = new SdcKafkaProducer(configuration);
producer.send(topicName, "blah", "blah");
producer.send(topicName, "blah", "blah");
private final String kafkaSecurityProtocolConfig;
private final String kafkaSaslMechanism;
private final String kafkaSaslJaasConfig;
+ private final int kafkaConsumerMaxPollInterval;
+ private final int kafkaConsumerSessionTimeout;
private String keyStorePath;
private String keyStorePassword;
private boolean activateServerTLSAuth;
private String sdcStatusTopicName;
private String sdcNotificationTopicName;
- public TestConfiguration(IConfiguration other) {
- this.sdcAddress = other.getSdcAddress();
- this.comsumerID = other.getConsumerID();
- this.consumerGroup = other.getConsumerGroup();
- this.kafkaSecurityProtocolConfig = other.getKafkaSecurityProtocolConfig();
- this.kafkaSaslMechanism = other.getKafkaSaslMechanism();
- this.kafkaSaslJaasConfig = other.getKafkaSaslJaasConfig();
- this.environmentName = other.getEnvironmentName();
- this.password = other.getPassword();
- this.pollingInterval = other.getPollingInterval();
- this.pollingTimeout = other.getPollingTimeout();
- this.relevantArtifactTypes = other.getRelevantArtifactTypes();
- this.user = other.getUser();
- this.keyStorePath = other.getKeyStorePath();
- this.keyStorePassword = other.getKeyStorePassword();
- this.activateServerTLSAuth = other.activateServerTLSAuth();
- this.isFilterInEmptyResources = other.isFilterInEmptyResources();
- this.httpProxyHost = other.getHttpProxyHost();
- this.httpProxyPort = other.getHttpProxyPort();
- this.httpsProxyHost = other.getHttpsProxyHost();
- this.httpsProxyPort = other.getHttpsProxyPort();
- this.useSystemProxy = other.isUseSystemProxy();
- }
-
public TestConfiguration() {
this.sdcAddress = "localhost:8443";
this.comsumerID = "mso-123456";
this.kafkaSecurityProtocolConfig = "SASL_PLAINTEXT";
this.kafkaSaslMechanism = "PLAIN";
this.kafkaSaslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username=admin password=admin-secret;";
+ this.kafkaConsumerMaxPollInterval = 600;
+ this.kafkaConsumerSessionTimeout = 50;
this.httpProxyHost = "proxy";
this.httpProxyPort = 8080;
}
return kafkaSaslJaasConfig;
}
+ @Override
+ public int getKafkaConsumerMaxPollInterval() {
+ return kafkaConsumerMaxPollInterval;
+ }
+
+ @Override
+ public int getKafkaConsumerSessionTimeout() {
+ return kafkaConsumerSessionTimeout;
+ }
+
@Override
public String getUser() {
return user;
major=2
minor=0
-patch=1
+patch=2
base_version=${major}.${minor}.${patch}