<modelVersion>4.0.0</modelVersion>
<groupId>org.onap.dmaap.messagerouter.msgrtr</groupId>
<artifactId>msgrtr</artifactId>
- <version>1.1.14-SNAPSHOT</version>
+ <version>1.1.15-SNAPSHOT</version>
<packaging>jar</packaging>
<name>dmaap-messagerouter-msgrtr</name>
<description>Message Router - Restful interface built for kafka</description>
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
"consumer.timeout");
- if (null != consumerTimeOut) {
+ if (StringUtils.isNotEmpty(consumerTimeOut)) {
consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
}
synchronized (kConsumer) {
import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpStatus;
import org.json.JSONArray;
import org.json.JSONException;
final String desc = topicBean.getTopicDescription();
int partition = topicBean.getPartitionCount();
// int replica = topicBean.getReplicationCount();
+ String defaultPartitions = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "default.partitions");
+ String defaultReplicas = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "default.replicas");
if (partition == 0) {
+ if(StringUtils.isNotEmpty(defaultPartitions)){
+ partition=Integer.parseInt(defaultPartitions);
+ }
+ else{
partition = 1;
+ }
}
final int partitions = partition;
int replica = topicBean.getReplicationCount();
if (replica == 0) {
+ if(StringUtils.isNotEmpty(defaultReplicas)){
+ replica=Integer.parseInt(defaultReplicas);
+ }
+ else{
replica = 1;
+ }
}
final int replicas = replica;
boolean transactionEnabled = topicBean.isTransactionEnabled();
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
when(dmaapContext.getRequest()).thenReturn(httpServReq);
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
-
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
+
when(dmaapContext.getRequest()).thenReturn(httpServReq);
when(dmaapContext.getResponse()).thenReturn(httpServRes);
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("Authorization")).thenReturn(null);
when(dmaapContext.getRequest()).thenReturn(httpServReq);
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("Authorization")).thenReturn("Authorization");
when(dmaapContext.getRequest()).thenReturn(httpServReq);
major=1
minor=1
-patch=14
+patch=15
base_version=${major}.${minor}.${patch}