X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fdbcapi.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdbcapi%2Fservice%2FMirrorMakerService.java;h=8acc4f3b281c4e163fdf2b5b7f50f6ceff4c3e84;hp=413590f7349fb02212b693b7f0fcf4bcc17da076;hb=1360b9df89a422d51ef40644ea5f9cf52cb84c6f;hpb=0e39c2d9a88a26693de1cd522766df5894917b3f diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java index 413590f..8acc4f3 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java @@ -56,6 +56,7 @@ public class MirrorMakerService extends BaseLoggingClass { private static String defaultProducerPort; private static String defaultConsumerPort; private static String centralFqdn; + private int maxTopicsPerMM; public MirrorMakerService() { super(); @@ -66,6 +67,7 @@ public class MirrorMakerService extends BaseLoggingClass { defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092"); defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181"); centralFqdn = p.getProperty("MR.CentralCname", "notSet"); + maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5")); } // will create a MM on MMagent if needed @@ -90,7 +92,7 @@ public class MirrorMakerService extends BaseLoggingClass { mm.setStatus(DmaapObject_Status.INVALID); } else { prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - resp = prov.doPostMessage(mm.updateWhiteList()); + resp = prov.doPostMessage(mm.getWhitelistUpdateJSON()); if ( ! resp.is2xx()) { errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage()); mm.setStatus(DmaapObject_Status.INVALID); @@ -109,6 +111,19 @@ public class MirrorMakerService extends BaseLoggingClass { mm.setLastMod(); return mirrors.put( mm.getMmName(), mm); } + public MirrorMaker getMirrorMaker( String part1, String part2, int index ) { + String targetPart; + + // original mm names did not have any index, so leave off index 0 for + // backwards compatibility + if ( index == 0 ) { + targetPart = part2; + } else { + targetPart = part2 + "_" + index; + } + logger.info( "getMirrorMaker using " + part1 + " and " + targetPart ); + return mirrors.get(MirrorMaker.genKey(part1, targetPart)); + } public MirrorMaker getMirrorMaker( String part1, String part2 ) { logger.info( "getMirrorMaker using " + part1 + " and " + part2 ); return mirrors.get(MirrorMaker.genKey(part1, part2)); @@ -139,5 +154,48 @@ public class MirrorMakerService extends BaseLoggingClass { return ret; } + + public MirrorMaker getNextMM( String source, String target ) { + int i = 0; + MirrorMaker mm = null; + while( mm == null ) { + + mm = this.getMirrorMaker( source, target, i); + if ( mm == null ) { + mm = new MirrorMaker(source, target, i); + } + if ( mm.getTopicCount() >= maxTopicsPerMM ) { + logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics. Moving to next MM"); + i++; + mm = null; + } + } + + + return mm; + } + + public MirrorMaker splitMM( MirrorMaker orig ) { + + int index = 1; + String source = orig.getSourceCluster(); + String target = orig.getTargetCluster(); + + + ArrayList whitelist = orig.getTopics(); + while( whitelist.size() > maxTopicsPerMM ) { + MirrorMaker mm = this.getNextMM( source, target ); + int last = whitelist.size() - 1; + String topic = whitelist.get(last); + whitelist.remove(last); + mm.addTopic(topic); + this.updateMirrorMaker(mm); + } + + orig.setTopics(whitelist); + + return orig; + + } }