From: dglFromAtt Date: Sat, 10 Nov 2018 18:29:08 +0000 (-0500) Subject: Limit number of topics per mmagent whitelist X-Git-Tag: 1.0.26~93 X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fdbcapi.git;a=commitdiff_plain;h=1360b9df89a422d51ef40644ea5f9cf52cb84c6f Limit number of topics per mmagent whitelist Change-Id: Id91106072aa2cc843414db78d568ccd1ecd69657 Signed-off-by: dglFromAtt Issue-ID: DMAAP-880 --- diff --git a/src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java b/src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java index 59d610c..5ec55d4 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java +++ b/src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java @@ -24,6 +24,8 @@ import java.lang.reflect.*; import java.sql.*; import java.util.*; +import org.onap.dmaap.dbcapi.model.*; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -47,7 +49,7 @@ public class DBFieldHandler { try { objget = c.getMethod("is" + camelcase); } catch (Exception e) { - errorLogger.error("Error", e); + errorLogger.warn("No 'is' method for " + c.getName() + " so trying 'get' method"); objget = c.getMethod("get" + camelcase); } objset = c.getMethod("set" + camelcase, objget.getReturnType()); diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java b/src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java index a7041d0..7de42f1 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java +++ b/src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java @@ -30,6 +30,7 @@ public class BrTopic { private String brSource; private String brTarget; + private String mmAgentName; private int topicCount; // no-op constructor used by framework @@ -60,6 +61,14 @@ public class BrTopic { this.topicCount = topicCount; } + public String getMmAgentName() { + return mmAgentName; + } + + public void setMmAgentName(String mmAgentName) { + this.mmAgentName = mmAgentName; + } + } diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java index 6447123..e693afe 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java +++ b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java @@ -29,7 +29,7 @@ import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum; import org.onap.dmaap.dbcapi.service.MirrorMakerService; public class MirrorMaker extends DmaapObject { - static final Logger logger = Logger.getLogger(MirrorMaker.class); + private String sourceCluster; private String targetCluster; @@ -41,8 +41,20 @@ public class MirrorMaker extends DmaapObject { public MirrorMaker(){ } - + public MirrorMaker(String source, String target, int i) { + initMM( source, target ); + // original mm names did not have any index, so leave off index 0 for + // backwards compatibility + if ( i != 0 ) { + String n = this.getMmName() + "_" + i; + this.setMmName(n); + } + } public MirrorMaker(String source, String target) { + initMM( source, target ); + } + + private void initMM(String source, String target) { sourceCluster = source; targetCluster = target; mmName = genKey(source, target); @@ -72,7 +84,7 @@ public class MirrorMaker extends DmaapObject { } } */ - public String updateWhiteList() { + public String getWhitelistUpdateJSON() { StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"updateWhiteList\": {" ); str.append( " \"name\": \"" + this.getMmName() + "\", \"whitelist\": \"" ); int numTargets = 0; @@ -109,9 +121,9 @@ public class MirrorMaker extends DmaapObject { StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"createMirrorMaker\": {" ); str.append( " \"name\": \"" + this.getMmName() + "\", " ); str.append( " \"consumer\": \"" + this.sourceCluster + ":" + consumerPort + "\", " ); - str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\" "); + str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\", "); - str.append( " } }" ); + str.append( " \"numStreams\": \"10\" } }" ); return str.toString(); } diff --git a/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java b/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java index aab1cac..f1466ee 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java +++ b/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java @@ -56,108 +56,126 @@ public class BridgeResource extends BaseLoggingClass { @GET @ApiOperation( value = "return BrTopic details", - notes = "Returns array of `BrTopic` objects. If source and target query params are specified, only report on that bridge. If detail param is true, list topics names, else just a count is returned", + notes = "Returns array of `BrTopic` objects. If source and target query params are specified, only report on that bridge. " + + "If detail param is true, list topics names, else just a count is returned.", response = BrTopic.class) @ApiResponses( value = { @ApiResponse( code = 200, message = "Success", response = Dmaap.class), @ApiResponse( code = 400, message = "Error", response = ApiError.class ) }) - public Response getBridgedTopics(@QueryParam("source") String source, - @QueryParam("target") String target, + public Response getBridgedTopics(@QueryParam("mmagent") String mmagent, @QueryParam("detail") Boolean detailFlag ){ ApiService check = new ApiService(); + + if ( mmagent == null ) { + return check.success(getMMcounts(Boolean.TRUE.equals(detailFlag))); + + } + logger.info( "getBridgeTopics():" + " mmagent=" + mmagent); if ( ! Boolean.TRUE.equals(detailFlag)) { BrTopic brTopic = new BrTopic(); - logger.info( "getBridgeTopics():" + " source=" + source + ", target=" + target); - - if (source != null && target != null) { // get topics between 2 bridged locations - brTopic.setBrSource(source); - brTopic.setBrTarget(target); - MirrorMaker mm = mmService.getMirrorMaker(source, target); - if ( mm != null ) { - brTopic.setTopicCount( mm.getTopicCount() ); - } - - logger.info( "topicCount [2 locations]: " + brTopic.getTopicCount() ); - } - else if (source == null && target == null ) { - List mmList = mmService.getAllMirrorMakers(); - brTopic.setBrSource("all"); - brTopic.setBrTarget("all"); - int totCnt = 0; - for( String key: mmList ) { - int mCnt = 0; - MirrorMaker mm = mmService.getMirrorMaker(key); - if ( mm != null ) { - mCnt = mm.getTopicCount(); - } - logger.info( "Count for "+ key + ": " + mCnt); - totCnt += mCnt; - } - - logger.info( "topicCount [all locations]: " + totCnt ); - brTopic.setTopicCount(totCnt); - - } - else { - - logger.error( "source or target is missing"); - check.setCode(Status.BAD_REQUEST.getStatusCode()); - check.setMessage("Either both source and target or neither must be provided"); - return check.error(); - } - return check.success(brTopic); - } else { - + // get topics between 2 bridged locations + + MirrorMaker mm = mmService.getMirrorMaker(mmagent); + if ( mm == null ) { + return check.notFound(); + } + + brTopic.setTopicCount( mm.getTopicCount() ); + brTopic.setBrSource( mm.getSourceCluster()); + brTopic.setBrTarget( mm.getTargetCluster()); + brTopic.setMmAgentName(mm.getMmName()); - logger.info( "getBridgeTopics() detail:" + " source=" + source + ", target=" + target); - - if (source != null && target != null) { // get topics between 2 bridged locations - - MirrorMaker mm = mmService.getMirrorMaker(source, target); - if ( mm == null ) { - return check.notFound(); - } - - return check.success(mm); - } + logger.info( "topicCount [2 locations]: " + brTopic.getTopicCount() ); + + return check.success(brTopic); + } else { + logger.info( "getBridgeTopics() detail:" + " mmagent=" + mmagent); + // get topics between 2 bridged locations + MirrorMaker mm = mmService.getMirrorMaker(mmagent); + if ( mm == null ) { + return check.notFound(); + } - else { + return check.success(mm); + } + } - logger.error( "source and target are required when detail=true"); - check.setCode(Status.BAD_REQUEST.getStatusCode()); - check.setMessage("source and target are required when detail=true"); - return check.error(); + private BrTopic[] getMMcounts( Boolean showDetail ) { + + List mmList = mmService.getAllMirrorMakers(); + int s = 1; + if ( showDetail ) { + s = mmList.size() + 1; + } + BrTopic[] brTopic = new BrTopic[s]; + + int totCnt = 0; + s = 0; + for( String key: mmList ) { + int mCnt = 0; + MirrorMaker mm = mmService.getMirrorMaker(key); + if ( mm != null ) { + mCnt = mm.getTopicCount(); + } + logger.info( "Count for "+ key + ": " + mCnt); + totCnt += mCnt; + if (showDetail) { + brTopic[s] = new BrTopic(); + brTopic[s].setBrSource( mm.getSourceCluster()); + brTopic[s].setBrTarget(mm.getTargetCluster()); + brTopic[s].setMmAgentName(mm.getMmName()); + brTopic[s].setTopicCount(mm.getTopicCount()); + s++; } } + + logger.info( "topicCount [all locations]: " + totCnt ); + brTopic[s] = new BrTopic(); + brTopic[s].setBrSource("all"); + brTopic[s].setBrTarget("all"); + brTopic[s].setMmAgentName("n/a"); + brTopic[s].setTopicCount(totCnt); + return brTopic; } @PUT @ApiOperation( value = "update MirrorMaker details", - notes = "replace the topic list for a specific Bridge. Use JSON Body for value to replace whitelist, but if refreshFlag param is true, simply refresh using existing whitelist", + notes = "replace the topic list for a specific Bridge. Use JSON Body for value to replace whitelist, " + + "but if refreshFlag param is true, simply refresh using existing whitelist." + + "If split param is true, spread whitelist over smaller mmagents.", response = MirrorMaker.class) @ApiResponses( value = { @ApiResponse( code = 200, message = "Success", response = Dmaap.class), @ApiResponse( code = 400, message = "Error", response = ApiError.class ) }) - public Response putBridgedTopics(@QueryParam("source") String source, - @QueryParam("target") String target, + public Response putBridgedTopics(@QueryParam("mmagent") String mmagent, @QueryParam("refresh") Boolean refreshFlag, + @QueryParam("split") Boolean splitFlag, MirrorMaker newBridge ){ ApiService check = new ApiService(); - logger.info( "putBridgeTopics() detail:" + " source=" + source + ", target=" + target); + logger.info( "putBridgeTopics() mmagent:" + mmagent ); - if (source != null && target != null) { // get topics between 2 bridged locations + if ( mmagent != null ) { // put topics between 2 bridged locations - MirrorMaker mm = mmService.getMirrorMaker(source, target); + MirrorMaker mm = mmService.getMirrorMaker(mmagent); if ( mm == null ) { return check.notFound(); } - if ( refreshFlag != null && refreshFlag == false ) { - logger.info( "setting whitelist from message body"); + + if ( splitFlag != null && splitFlag == true ) { + mm = mmService.splitMM( mm ); + } else if ( refreshFlag == null || refreshFlag == false ) { + logger.info( "setting whitelist from message body containing mmName=" + newBridge.getMmName()); + if ( ! mmagent.equals(newBridge.getMmName()) ){ + logger.error( "mmagent query param does not match mmName in body"); + check.setCode(Status.BAD_REQUEST.getStatusCode()); + check.setMessage("mmagent query param does not match mmName in body"); + return check.error(); + } mm.setTopics( newBridge.getTopics() ); } else { logger.info( "refreshing whitelist from memory"); @@ -168,9 +186,9 @@ public class BridgeResource extends BaseLoggingClass { else { - logger.error( "source and target are required when detail=true"); + logger.error( "mmagent is required for PUT"); check.setCode(Status.BAD_REQUEST.getStatusCode()); - check.setMessage("source and target are required when detail=true"); + check.setMessage("mmagent is required for PUT"); return check.error(); } diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java index 413590f..8acc4f3 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java @@ -56,6 +56,7 @@ public class MirrorMakerService extends BaseLoggingClass { private static String defaultProducerPort; private static String defaultConsumerPort; private static String centralFqdn; + private int maxTopicsPerMM; public MirrorMakerService() { super(); @@ -66,6 +67,7 @@ public class MirrorMakerService extends BaseLoggingClass { defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092"); defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181"); centralFqdn = p.getProperty("MR.CentralCname", "notSet"); + maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5")); } // will create a MM on MMagent if needed @@ -90,7 +92,7 @@ public class MirrorMakerService extends BaseLoggingClass { mm.setStatus(DmaapObject_Status.INVALID); } else { prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - resp = prov.doPostMessage(mm.updateWhiteList()); + resp = prov.doPostMessage(mm.getWhitelistUpdateJSON()); if ( ! resp.is2xx()) { errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage()); mm.setStatus(DmaapObject_Status.INVALID); @@ -109,6 +111,19 @@ public class MirrorMakerService extends BaseLoggingClass { mm.setLastMod(); return mirrors.put( mm.getMmName(), mm); } + public MirrorMaker getMirrorMaker( String part1, String part2, int index ) { + String targetPart; + + // original mm names did not have any index, so leave off index 0 for + // backwards compatibility + if ( index == 0 ) { + targetPart = part2; + } else { + targetPart = part2 + "_" + index; + } + logger.info( "getMirrorMaker using " + part1 + " and " + targetPart ); + return mirrors.get(MirrorMaker.genKey(part1, targetPart)); + } public MirrorMaker getMirrorMaker( String part1, String part2 ) { logger.info( "getMirrorMaker using " + part1 + " and " + part2 ); return mirrors.get(MirrorMaker.genKey(part1, part2)); @@ -139,5 +154,48 @@ public class MirrorMakerService extends BaseLoggingClass { return ret; } + + public MirrorMaker getNextMM( String source, String target ) { + int i = 0; + MirrorMaker mm = null; + while( mm == null ) { + + mm = this.getMirrorMaker( source, target, i); + if ( mm == null ) { + mm = new MirrorMaker(source, target, i); + } + if ( mm.getTopicCount() >= maxTopicsPerMM ) { + logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics. Moving to next MM"); + i++; + mm = null; + } + } + + + return mm; + } + + public MirrorMaker splitMM( MirrorMaker orig ) { + + int index = 1; + String source = orig.getSourceCluster(); + String target = orig.getTargetCluster(); + + + ArrayList whitelist = orig.getTopics(); + while( whitelist.size() > maxTopicsPerMM ) { + MirrorMaker mm = this.getNextMM( source, target ); + int last = whitelist.size() - 1; + String topic = whitelist.get(last); + whitelist.remove(last); + mm.addTopic(topic); + this.updateMirrorMaker(mm); + } + + orig.setTopics(whitelist); + + return orig; + + } } diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index e4bc96c..a633982 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -66,10 +66,12 @@ public class TopicService extends BaseLoggingClass { private static String centralCname; + public TopicService(){ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); centralCname = p.getProperty("MR.CentralCname"); + logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost ); @@ -431,10 +433,7 @@ public class TopicService extends BaseLoggingClass { if ( source != null && target != null ) { try { logger.info( "Create a MM from " + source + " to " + target ); - MirrorMaker mm = bridge.getMirrorMaker( source, target); - if ( mm == null ) { - mm = new MirrorMaker(source, target); - } + MirrorMaker mm = bridge.getNextMM( source, target); mm.addTopic(topic.getFqtn()); bridge.updateMirrorMaker(mm); } catch ( Exception ex ) { @@ -452,6 +451,7 @@ public class TopicService extends BaseLoggingClass { } + /* * Prior to 1707, we only supported EDGE_TO_CENTRAL replication. * This was determined automatically based on presence of edge publishers and central subscribers. diff --git a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java index 547bfc9..39de2be 100644 --- a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java +++ b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java @@ -92,7 +92,7 @@ public class MirrorMakerTest { int i = t.getTopicCount(); - String s = t.updateWhiteList(); + String s = t.getWhitelistUpdateJSON(); s = t.createMirrorMaker(p1, p2);