X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdbcapi%2Fservice%2FMirrorMakerService.java;h=5d695f44d3ede846c5daec6be43f8a32a43bc754;hb=27b65d2fadd596e224256ac68d76d67acc2603b7;hp=a73d98126d474627d29c8ea157307339fabdfb49;hpb=ad29261e05ff057134d48b7d6a99da1cd07849e0;p=dmaap%2Fdbcapi.git diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java index a73d981..5d695f4 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java @@ -51,60 +51,90 @@ public class MirrorMakerService extends BaseLoggingClass { private static MrTopicConnection prov; private static AafDecrypt decryptor; + private static String provUser; + private static String provUserPwd; + private static String defaultProducerPort; + private static String defaultConsumerPort; + private static String centralFqdn; + private int maxTopicsPerMM; + private boolean mmPerMR; + public MirrorMakerService() { super(); - decryptor = new AafDecrypt(); + DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); + provUser = p.getProperty("MM.ProvUserMechId"); + provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" )); + defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092"); + defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181"); + centralFqdn = p.getProperty("MR.CentralCname", "notSet"); + maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5")); + mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); } // will create a MM on MMagent if needed // will update the MMagent whitelist with all topics for this MM public MirrorMaker updateMirrorMaker( MirrorMaker mm ) { logger.info( "updateMirrorMaker"); - DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - String provUser = p.getProperty("MM.ProvUserMechId"); - String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" )); - String defaultProducerPort = p.getProperty( "MM.KafkaProducerPort", "9092"); - String defaultConsumerPort = p.getProperty( "MM.KafkaConsumerPort", "2181"); prov = new MrTopicConnection( provUser, provUserPwd ); - - String centralFqdn = p.getProperty("MR.CentralCname", "notSet"); - + DmaapService dmaap = new DmaapService(); MR_ClusterService clusters = new MR_ClusterService(); - - // in 1610, MM should only exist for edge-to-central - // we use a cname for the central MR cluster that is active, and provision on agent topic on that target - // but only send 1 message so MM Agents can read it relying on kafka delivery - for( MR_Cluster central: clusters.getCentralClusters() ) { - prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort )); - if ( ! resp.is2xx() ) { - - errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage()); + MR_Cluster target_cluster = null; + String override = null; + + if ( ! mmPerMR ) { + // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility + // we use a cname for the central MR cluster that is active, and provision on agent topic on that target + // but only send 1 message so MM Agents can read it relying on kafka delivery + for( MR_Cluster cluster: clusters.getCentralClusters() ) { + + target_cluster = cluster; + override = centralFqdn; + // we only want to send one message even if there are multiple central clusters + break; + + } + } else { + // In ONAP deployment architecture, the MM Agent is deployed with each target MR + target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster()); + override = null; + } + + prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override ); + ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort )); + if ( ! resp.is2xx() ) { + + errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage()); + mm.setStatus(DmaapObject_Status.INVALID); + } else { + prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override ); + resp = prov.doPostMessage(mm.getWhitelistUpdateJSON()); + if ( ! resp.is2xx()) { + errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage()); mm.setStatus(DmaapObject_Status.INVALID); } else { - prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - resp = prov.doPostMessage(mm.updateWhiteList()); - if ( ! resp.is2xx()) { - errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage()); - mm.setStatus(DmaapObject_Status.INVALID); - } else { - mm.setStatus(DmaapObject_Status.VALID); - } + mm.setStatus(DmaapObject_Status.VALID); } - - // we only want to send one message even if there are multiple central clusters - break; - - } - - + } mm.setLastMod(); return mirrors.put( mm.getMmName(), mm); } + public MirrorMaker getMirrorMaker( String part1, String part2, int index ) { + String targetPart; + + // original mm names did not have any index, so leave off index 0 for + // backwards compatibility + if ( index == 0 ) { + targetPart = part2; + } else { + targetPart = part2 + "_" + index; + } + logger.info( "getMirrorMaker using " + part1 + " and " + targetPart ); + return mirrors.get(MirrorMaker.genKey(part1, targetPart)); + } public MirrorMaker getMirrorMaker( String part1, String part2 ) { logger.info( "getMirrorMaker using " + part1 + " and " + part2 ); return mirrors.get(MirrorMaker.genKey(part1, part2)); @@ -114,11 +144,6 @@ public class MirrorMakerService extends BaseLoggingClass { return mirrors.get(key); } - /*public MirrorMaker updateMirrorMaker( MirrorMaker mm ) { - logger.info( "updateMirrorMaker"); - return mirrors.put( mm.getMmName(), mm); - } - */ public void delMirrorMaker( MirrorMaker mm ) { logger.info( "delMirrorMaker"); @@ -140,5 +165,51 @@ public class MirrorMakerService extends BaseLoggingClass { return ret; } + + public MirrorMaker getNextMM( String source, String target, String fqtn ) { + int i = 0; + MirrorMaker mm = null; + while( mm == null ) { + + mm = this.getMirrorMaker( source, target, i); + if ( mm == null ) { + mm = new MirrorMaker(source, target, i); + } + if ( mm.getTopics().contains(fqtn) ) { + break; + } + if ( mm.getTopicCount() >= maxTopicsPerMM ) { + logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics. Moving to next MM"); + i++; + mm = null; + } + } + + + return mm; + } + + public MirrorMaker splitMM( MirrorMaker orig ) { + + String source = orig.getSourceCluster(); + String target = orig.getTargetCluster(); + + + ArrayList whitelist = orig.getTopics(); + while( whitelist.size() > maxTopicsPerMM ) { + + int last = whitelist.size() - 1; + String topic = whitelist.get(last); + whitelist.remove(last); + MirrorMaker mm = this.getNextMM( source, target, "aValueThatShouldNotMatchAnything" ); + mm.addTopic(topic); + this.updateMirrorMaker(mm); + } + + orig.setTopics(whitelist); + + return orig; + + } }