Changes for configurable kafka ports 15/49015/1
authordglFromAtt <dgl@research.att.com>
Thu, 24 May 2018 08:29:30 +0000 (04:29 -0400)
committerdglFromAtt <dgl@research.att.com>
Thu, 24 May 2018 08:29:43 +0000 (04:29 -0400)
Change-Id: I6b646785584aea75809aa2dae4a36e701e313058
Signed-off-by: dglFromAtt <dgl@research.att.com>
Issue-ID: DMAAP-506

pom.xml
src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java
src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
version.properties

diff --git a/pom.xml b/pom.xml
index 34cce77..a22096d 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <jettyVersion>9.3.9.v20160517</jettyVersion> 
                <eelf.version>0.0.1</eelf.version>
-               <artifact.version>1.0.7-SNAPSHOT</artifact.version>
+               <artifact.version>1.0.8-SNAPSHOT</artifact.version>
                <!-- SONAR -->
                <jacoco.version>0.7.7.201606060606</jacoco.version>
                <sonar-jacoco-listeners.version>3.2</sonar-jacoco-listeners.version>
index a6827a9..166fc21 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dmaap.dbcapi.model;
 
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.onap.dmaap.dbcapi.util.DmaapConfig;
 import org.onap.dmaap.dbcapi.util.DmaapTimestamp;
 
 
@@ -34,32 +35,53 @@ public class MR_Cluster extends DmaapObject {
        private DmaapTimestamp lastMod;
        private String  topicProtocol;
        private String  topicPort;
+       private String  replicationGroup;
+       private String  sourceReplicationPort;
+       private String  targetReplicationPort;
 
        
        // TODO: make this a system property
-       private static String defaultTopicProtocol = "https";
-       private static String defaultTopicPort = "3905";
+       private static  String defaultTopicProtocol;
+       private static   String defaultTopicPort;
+       private static  String defaultReplicationGroup;
+       private static  String defaultSourceReplicationPort;
+       private static  String defaultTargetReplicationPort;
        
-
+       private static void setDefaults() {
+               boolean been_here = false;
+               if ( been_here ) {
+                       return;
+               }
+               DmaapConfig dc = (DmaapConfig)DmaapConfig.getConfig();
+               defaultTopicProtocol = dc.getProperty("MR.TopicProtocol", "https");
+               defaultTopicPort = dc.getProperty( "MR.TopicPort", "3905");
+               defaultReplicationGroup = dc.getProperty( "MR.ReplicationGroup", "" );
+               defaultSourceReplicationPort = dc.getProperty( "MR.SourceReplicationPort", "2181");
+               defaultTargetReplicationPort = dc.getProperty( "MR.TargetReplicationPort", "9092");
+               been_here = true;
+       }
 
 
        public MR_Cluster() {
+               setDefaults();
                this.topicProtocol = defaultTopicProtocol;
                this.topicPort = defaultTopicPort;
+               this.replicationGroup = null;
+               this.sourceReplicationPort = defaultSourceReplicationPort;
+               this.targetReplicationPort = defaultTargetReplicationPort;
                this.lastMod = new DmaapTimestamp();
                this.lastMod.mark();
 
                debugLogger.debug( "MR_Cluster constructor " + this.lastMod );
                
        }
-       
 
-       
        // new style constructor
        public MR_Cluster( String dLN,
                        String f,
                        String prot,
-                       String port ) {
+                       String port) {
+               setDefaults();
                this.dcaeLocationName = dLN;
                this.fqdn = f;
 
@@ -73,13 +95,59 @@ public class MR_Cluster extends DmaapObject {
                } else {
                        this.topicPort = port;
                }
+
+               this.replicationGroup = defaultReplicationGroup;
+               this.sourceReplicationPort = defaultSourceReplicationPort;
+               this.targetReplicationPort = defaultTargetReplicationPort;
+
+               this.lastMod = new DmaapTimestamp();
+               this.lastMod.mark();
                
-               
+               debugLogger.debug( "MR_Cluster constructor w initialization complete" + this.lastMod.getVal() );
+       }
+
+       public MR_Cluster( String dLN,
+                       String f,
+                       String prot,
+                       String port,
+                       String repGroup,
+                       String sourceRepPort,
+                       String targetRepPort ) {
+               setDefaults();
+               this.dcaeLocationName = dLN;
+               this.fqdn = f;
+
+               if ( prot == null || prot.isEmpty() ) {
+                       this.topicProtocol = defaultTopicProtocol;
+               } else {
+                       this.topicProtocol = prot;
+               }
+               if ( port == null || port.isEmpty() ) {
+                       this.topicPort = defaultTopicPort;
+               } else {
+                       this.topicPort = port;
+               }
+               if ( repGroup == null || repGroup.isEmpty() ) {
+                       this.replicationGroup = defaultReplicationGroup;
+               } else {
+                       this.replicationGroup = repGroup;
+               }
+               if ( sourceRepPort == null || sourceRepPort.isEmpty()) {
+                       this.sourceReplicationPort = defaultSourceReplicationPort;
+               } else {
+                       this.sourceReplicationPort = sourceRepPort;
+               }
+               if ( targetRepPort == null || targetRepPort.isEmpty()) {
+                       this.targetReplicationPort = defaultTargetReplicationPort;
+               } else {
+                       this.targetReplicationPort = targetRepPort;
+               }
+                               
                this.lastMod = new DmaapTimestamp();
                this.lastMod.mark();
                
                debugLogger.debug( "MR_Cluster constructor w initialization complete" + this.lastMod.getVal() );
-}
+       }
        public String getDcaeLocationName() {
                return dcaeLocationName;
        }
@@ -113,6 +181,39 @@ public class MR_Cluster extends DmaapObject {
                this.topicPort = topicPort;
        }
 
+       public String getReplicationGroup() {
+               return replicationGroup;
+       }
+
+       public void setReplicationGroup(String replicationGroup) {
+               this.replicationGroup = replicationGroup;
+       }
+
+
+
+
+       public String getSourceReplicationPort() {
+               return sourceReplicationPort;
+       }
+
+
+
+       public void setSourceReplicationPort(String sourceReplicationPort) {
+               this.sourceReplicationPort = sourceReplicationPort;
+       }
+
+
+
+       public String getTargetReplicationPort() {
+               return targetReplicationPort;
+       }
+
+
+
+       public void setTargetReplicationPort(String targetReplicationPort) {
+               this.targetReplicationPort = targetReplicationPort;
+       }
+
 
 
        public String genTopicURL(String overideFqdn, String topic) {
index b1a2d3c..1e381b8 100644 (file)
@@ -137,11 +137,11 @@ public class MirrorMaker extends DmaapObject {
                                }
                        }
         */
-       public String createMirrorMaker() {
+       public String createMirrorMaker( String consumerPort, String producerPort ) {
                StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"createMirrorMaker\": {"  );
                str.append( " \"name\": \"" + this.getMmName() + "\", " );
-               str.append( " \"consumer\": \"" + this.sourceCluster + ":2181\", " );
-               str.append( " \"producer\": \"" + this.targetCluster + ":9092\" ");
+               str.append( " \"consumer\": \"" + this.sourceCluster + ":" + consumerPort + "\", " );
+               str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\" ");
                
                str.append( " } }" );
                
index 29010b6..a73d981 100644 (file)
@@ -64,6 +64,9 @@ public class MirrorMakerService extends BaseLoggingClass {
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                String provUser = p.getProperty("MM.ProvUserMechId");
                String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+               String defaultProducerPort = p.getProperty( "MM.KafkaProducerPort", "9092");
+               String defaultConsumerPort = p.getProperty( "MM.KafkaConsumerPort", "2181");
+       
                prov = new MrTopicConnection( provUser, provUserPwd );
 
                String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
@@ -76,7 +79,7 @@ public class MirrorMakerService extends BaseLoggingClass {
                // but only send 1 message so MM Agents can read it relying on kafka delivery
                for( MR_Cluster central: clusters.getCentralClusters() ) {
                        prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn  );
-                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker());
+                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
                        if ( ! resp.is2xx() ) {
        
                                errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
index ba4b028..8b632ed 100644 (file)
@@ -79,6 +79,8 @@ public class MirrorMakerTest {
                String f = "org.onap.interestingTopic";
                String c1 =  "cluster1.onap.org";
                String c2 =  "cluster2.onap.org";
+               String p1 = "9092";
+               String p2 = "2081";
                MirrorMaker t = new MirrorMaker( c1, c2 );
                String m = t.getMmName();
 
@@ -94,7 +96,7 @@ public class MirrorMakerTest {
 
                s = t.updateWhiteList();
 
-               s = t.createMirrorMaker();
+               s = t.createMirrorMaker(p1, p2);
 
                t.delVector( f, c1, c2 );
 
index 618e64c..f4cdd52 100644 (file)
@@ -27,7 +27,7 @@
 
 major=1
 minor=0
-patch=7
+patch=8
 base_version=${major}.${minor}.${patch}
 
 # Release must be completed with git revision # in Jenkins