add configurable default partitions and replicas 42/74542/2 1.1.15
authorsunil unnava <sunil.unnava@att.com>
Tue, 11 Dec 2018 21:25:26 +0000 (16:25 -0500)
committersunil unnava <su622b@att.com>
Tue, 11 Dec 2018 21:37:38 +0000 (21:37 +0000)
Issue-ID: DMAAP-903
Change-Id: Iabb3da85c3e42ddf68d049e6a8164449f7be8296
Signed-off-by: sunil unnava <sunil.unnava@att.com>
pom.xml
src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java
version.properties

diff --git a/pom.xml b/pom.xml
index c213101..cb47037 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,7 @@
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.onap.dmaap.messagerouter.msgrtr</groupId>
        <artifactId>msgrtr</artifactId>
-       <version>1.1.14-SNAPSHOT</version>
+       <version>1.1.15-SNAPSHOT</version>
        <packaging>jar</packaging>
        <name>dmaap-messagerouter-msgrtr</name>
        <description>Message Router - Restful interface built for kafka</description>
index 2ec323e..93374fb 100644 (file)
@@ -34,6 +34,7 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -85,7 +86,7 @@ public class Kafka011Consumer implements Consumer {
                
                String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
                                "consumer.timeout");
-               if (null != consumerTimeOut) {
+               if (StringUtils.isNotEmpty(consumerTimeOut)) {
                        consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
                }
                synchronized (kConsumer) {
index f2ba222..626828b 100644 (file)
@@ -26,6 +26,7 @@ package org.onap.dmaap.dmf.mr.service.impl;
 
 import java.io.IOException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.http.HttpStatus;
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -281,14 +282,28 @@ public class TopicServiceImpl implements TopicService {
                        final String desc = topicBean.getTopicDescription();
                        int partition = topicBean.getPartitionCount();
                        // int replica = topicBean.getReplicationCount();
+                       String defaultPartitions = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                                       "default.partitions");
+                       String defaultReplicas = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+                                       "default.replicas");
                        if (partition == 0) {
+                               if(StringUtils.isNotEmpty(defaultPartitions)){
+                                       partition=Integer.parseInt(defaultPartitions);  
+                               }
+                               else{
                                partition = 1;
+                               }
                        }
                        final int partitions = partition;
 
                        int replica = topicBean.getReplicationCount();
                        if (replica == 0) {
+                               if(StringUtils.isNotEmpty(defaultReplicas)){
+                                       replica=Integer.parseInt(defaultReplicas);      
+                               }
+                               else{
                                replica = 1;
+                               }
                        }
                        final int replicas = replica;
                        boolean transactionEnabled = topicBean.isTransactionEnabled();
index e5d3233..7cbdf79 100644 (file)
@@ -141,6 +141,10 @@ public class TopicServiceImplTest {
 
                when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
                                .thenReturn("enfTopicName");
+               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+               .thenReturn("1");
+               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+               .thenReturn("1");
 
                when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
                when(dmaapContext.getRequest()).thenReturn(httpServReq);
@@ -165,7 +169,11 @@ public class TopicServiceImplTest {
 
                when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
                                .thenReturn("enfTopicName");
-
+               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+               .thenReturn("1");
+               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+               .thenReturn("1");
+               
                when(dmaapContext.getRequest()).thenReturn(httpServReq);
                when(dmaapContext.getResponse()).thenReturn(httpServRes);
 
@@ -208,6 +216,10 @@ public class TopicServiceImplTest {
 
                when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
                                .thenReturn("enfTopicName");
+               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+               .thenReturn("1");
+               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+               .thenReturn("1");
 
                when(httpServReq.getHeader("Authorization")).thenReturn(null);
                when(dmaapContext.getRequest()).thenReturn(httpServReq);
@@ -232,6 +244,10 @@ public class TopicServiceImplTest {
 
                when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
                                .thenReturn("enfTopicName");
+               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+               .thenReturn("1");
+               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+               .thenReturn("1");
 
                when(httpServReq.getHeader("Authorization")).thenReturn("Authorization");
                when(dmaapContext.getRequest()).thenReturn(httpServReq);
index c9b51c7..aabaad9 100644 (file)
@@ -27,7 +27,7 @@
 
 major=1
 minor=1
-patch=14
+patch=15
 
 base_version=${major}.${minor}.${patch}