From ad29261e05ff057134d48b7d6a99da1cd07849e0 Mon Sep 17 00:00:00 2001 From: dglFromAtt Date: Thu, 24 May 2018 04:29:30 -0400 Subject: [PATCH] Changes for configurable kafka ports Change-Id: I6b646785584aea75809aa2dae4a36e701e313058 Signed-off-by: dglFromAtt Issue-ID: DMAAP-506 --- pom.xml | 2 +- .../org/onap/dmaap/dbcapi/model/MR_Cluster.java | 117 +++++++++++++++++++-- .../org/onap/dmaap/dbcapi/model/MirrorMaker.java | 6 +- .../dmaap/dbcapi/service/MirrorMakerService.java | 5 +- .../onap/dmaap/dbcapi/model/MirrorMakerTest.java | 4 +- version.properties | 2 +- 6 files changed, 121 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index 34cce77..a22096d 100644 --- a/pom.xml +++ b/pom.xml @@ -344,7 +344,7 @@ UTF-8 9.3.9.v20160517 0.0.1 - 1.0.7-SNAPSHOT + 1.0.8-SNAPSHOT 0.7.7.201606060606 3.2 diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java b/src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java index a6827a9..166fc21 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java +++ b/src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java @@ -22,6 +22,7 @@ package org.onap.dmaap.dbcapi.model; import javax.xml.bind.annotation.XmlRootElement; +import org.onap.dmaap.dbcapi.util.DmaapConfig; import org.onap.dmaap.dbcapi.util.DmaapTimestamp; @@ -34,32 +35,53 @@ public class MR_Cluster extends DmaapObject { private DmaapTimestamp lastMod; private String topicProtocol; private String topicPort; + private String replicationGroup; + private String sourceReplicationPort; + private String targetReplicationPort; // TODO: make this a system property - private static String defaultTopicProtocol = "https"; - private static String defaultTopicPort = "3905"; + private static String defaultTopicProtocol; + private static String defaultTopicPort; + private static String defaultReplicationGroup; + private static String defaultSourceReplicationPort; + private static String defaultTargetReplicationPort; - + private static void setDefaults() { + boolean been_here = false; + if ( been_here ) { + return; + } + DmaapConfig dc = (DmaapConfig)DmaapConfig.getConfig(); + defaultTopicProtocol = dc.getProperty("MR.TopicProtocol", "https"); + defaultTopicPort = dc.getProperty( "MR.TopicPort", "3905"); + defaultReplicationGroup = dc.getProperty( "MR.ReplicationGroup", "" ); + defaultSourceReplicationPort = dc.getProperty( "MR.SourceReplicationPort", "2181"); + defaultTargetReplicationPort = dc.getProperty( "MR.TargetReplicationPort", "9092"); + been_here = true; + } public MR_Cluster() { + setDefaults(); this.topicProtocol = defaultTopicProtocol; this.topicPort = defaultTopicPort; + this.replicationGroup = null; + this.sourceReplicationPort = defaultSourceReplicationPort; + this.targetReplicationPort = defaultTargetReplicationPort; this.lastMod = new DmaapTimestamp(); this.lastMod.mark(); debugLogger.debug( "MR_Cluster constructor " + this.lastMod ); } - - // new style constructor public MR_Cluster( String dLN, String f, String prot, - String port ) { + String port) { + setDefaults(); this.dcaeLocationName = dLN; this.fqdn = f; @@ -73,13 +95,59 @@ public class MR_Cluster extends DmaapObject { } else { this.topicPort = port; } + + this.replicationGroup = defaultReplicationGroup; + this.sourceReplicationPort = defaultSourceReplicationPort; + this.targetReplicationPort = defaultTargetReplicationPort; + + this.lastMod = new DmaapTimestamp(); + this.lastMod.mark(); - + debugLogger.debug( "MR_Cluster constructor w initialization complete" + this.lastMod.getVal() ); + } + + public MR_Cluster( String dLN, + String f, + String prot, + String port, + String repGroup, + String sourceRepPort, + String targetRepPort ) { + setDefaults(); + this.dcaeLocationName = dLN; + this.fqdn = f; + + if ( prot == null || prot.isEmpty() ) { + this.topicProtocol = defaultTopicProtocol; + } else { + this.topicProtocol = prot; + } + if ( port == null || port.isEmpty() ) { + this.topicPort = defaultTopicPort; + } else { + this.topicPort = port; + } + if ( repGroup == null || repGroup.isEmpty() ) { + this.replicationGroup = defaultReplicationGroup; + } else { + this.replicationGroup = repGroup; + } + if ( sourceRepPort == null || sourceRepPort.isEmpty()) { + this.sourceReplicationPort = defaultSourceReplicationPort; + } else { + this.sourceReplicationPort = sourceRepPort; + } + if ( targetRepPort == null || targetRepPort.isEmpty()) { + this.targetReplicationPort = defaultTargetReplicationPort; + } else { + this.targetReplicationPort = targetRepPort; + } + this.lastMod = new DmaapTimestamp(); this.lastMod.mark(); debugLogger.debug( "MR_Cluster constructor w initialization complete" + this.lastMod.getVal() ); -} + } public String getDcaeLocationName() { return dcaeLocationName; } @@ -113,6 +181,39 @@ public class MR_Cluster extends DmaapObject { this.topicPort = topicPort; } + public String getReplicationGroup() { + return replicationGroup; + } + + public void setReplicationGroup(String replicationGroup) { + this.replicationGroup = replicationGroup; + } + + + + + public String getSourceReplicationPort() { + return sourceReplicationPort; + } + + + + public void setSourceReplicationPort(String sourceReplicationPort) { + this.sourceReplicationPort = sourceReplicationPort; + } + + + + public String getTargetReplicationPort() { + return targetReplicationPort; + } + + + + public void setTargetReplicationPort(String targetReplicationPort) { + this.targetReplicationPort = targetReplicationPort; + } + public String genTopicURL(String overideFqdn, String topic) { diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java index b1a2d3c..1e381b8 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java +++ b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java @@ -137,11 +137,11 @@ public class MirrorMaker extends DmaapObject { } } */ - public String createMirrorMaker() { + public String createMirrorMaker( String consumerPort, String producerPort ) { StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"createMirrorMaker\": {" ); str.append( " \"name\": \"" + this.getMmName() + "\", " ); - str.append( " \"consumer\": \"" + this.sourceCluster + ":2181\", " ); - str.append( " \"producer\": \"" + this.targetCluster + ":9092\" "); + str.append( " \"consumer\": \"" + this.sourceCluster + ":" + consumerPort + "\", " ); + str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\" "); str.append( " } }" ); diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java index 29010b6..a73d981 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java @@ -64,6 +64,9 @@ public class MirrorMakerService extends BaseLoggingClass { DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); String provUser = p.getProperty("MM.ProvUserMechId"); String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" )); + String defaultProducerPort = p.getProperty( "MM.KafkaProducerPort", "9092"); + String defaultConsumerPort = p.getProperty( "MM.KafkaConsumerPort", "2181"); + prov = new MrTopicConnection( provUser, provUserPwd ); String centralFqdn = p.getProperty("MR.CentralCname", "notSet"); @@ -76,7 +79,7 @@ public class MirrorMakerService extends BaseLoggingClass { // but only send 1 message so MM Agents can read it relying on kafka delivery for( MR_Cluster central: clusters.getCentralClusters() ) { prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - ApiError resp = prov.doPostMessage(mm.createMirrorMaker()); + ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort )); if ( ! resp.is2xx() ) { errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage()); diff --git a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java index ba4b028..8b632ed 100644 --- a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java +++ b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java @@ -79,6 +79,8 @@ public class MirrorMakerTest { String f = "org.onap.interestingTopic"; String c1 = "cluster1.onap.org"; String c2 = "cluster2.onap.org"; + String p1 = "9092"; + String p2 = "2081"; MirrorMaker t = new MirrorMaker( c1, c2 ); String m = t.getMmName(); @@ -94,7 +96,7 @@ public class MirrorMakerTest { s = t.updateWhiteList(); - s = t.createMirrorMaker(); + s = t.createMirrorMaker(p1, p2); t.delVector( f, c1, c2 ); diff --git a/version.properties b/version.properties index 618e64c..f4cdd52 100644 --- a/version.properties +++ b/version.properties @@ -27,7 +27,7 @@ major=1 minor=0 -patch=7 +patch=8 base_version=${major}.${minor}.${patch} # Release must be completed with git revision # in Jenkins -- 2.16.6