X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdbcapi%2Fservice%2FMirrorMakerService.java;h=7c4b2cee298810749a0eee8e567a18eede7f526e;hb=479c7a5645b6f3f9bf478f925fa2009597871a7b;hp=382e5369cb0e13d47b6edfcd71918350a84642b7;hpb=1813f193a734bf77629d9b9bda7ecf6582581569;p=dmaap%2Fdbcapi.git diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java index 382e536..7c4b2ce 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java @@ -51,23 +51,33 @@ public class MirrorMakerService extends BaseLoggingClass { private static MrTopicConnection prov; private static AafDecrypt decryptor; + static final String PROV_USER_PROPERTY = "MM.ProvUserMechId"; + static final String PROV_PWD_PROPERTY = "MM.ProvUserPwd"; + static final String PROV_PWD_DEFAULT = "pwdNotSet"; + static final String SOURCE_REPLICATION_PORT_PROPERTY = "MR.SourceReplicationPort"; + static final String SOURCE_REPLICATION_PORT_DEFAULT = "9092"; + static final String TARGET_REPLICATION_PORT_PROPERTY = "MR.TargetReplicationPort"; + static final String TARGET_REPLICATION_PORT_DEFAULT = "2181"; + private static String provUser; private static String provUserPwd; private static String defaultProducerPort; private static String defaultConsumerPort; private static String centralFqdn; private int maxTopicsPerMM; + private boolean mmPerMR; public MirrorMakerService() { super(); decryptor = new AafDecrypt(); DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - provUser = p.getProperty("MM.ProvUserMechId"); - provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" )); - defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092"); - defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181"); + provUser = p.getProperty(PROV_USER_PROPERTY); + provUserPwd = decryptor.decrypt(p.getProperty( PROV_PWD_PROPERTY, PROV_PWD_DEFAULT )); + defaultProducerPort = p.getProperty( SOURCE_REPLICATION_PORT_PROPERTY, SOURCE_REPLICATION_PORT_DEFAULT ); + defaultConsumerPort = p.getProperty( TARGET_REPLICATION_PORT_PROPERTY, TARGET_REPLICATION_PORT_DEFAULT ); centralFqdn = p.getProperty("MR.CentralCname", "notSet"); maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5")); + mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); } // will create a MM on MMagent if needed @@ -79,34 +89,43 @@ public class MirrorMakerService extends BaseLoggingClass { DmaapService dmaap = new DmaapService(); MR_ClusterService clusters = new MR_ClusterService(); - - // in 1610, MM should only exist for edge-to-central - // we use a cname for the central MR cluster that is active, and provision on agent topic on that target - // but only send 1 message so MM Agents can read it relying on kafka delivery - for( MR_Cluster central: clusters.getCentralClusters() ) { - prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort )); - if ( ! resp.is2xx() ) { - - errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage()); - mm.setStatus(DmaapObject_Status.INVALID); - } else { - prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - resp = prov.doPostMessage(mm.getWhitelistUpdateJSON()); - if ( ! resp.is2xx()) { - errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage()); - mm.setStatus(DmaapObject_Status.INVALID); - } else { - mm.setStatus(DmaapObject_Status.VALID); - } - } - - // we only want to send one message even if there are multiple central clusters - break; + MR_Cluster target_cluster = null; + String override = null; - } + if ( ! mmPerMR ) { + // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility + // we use a cname for the central MR cluster that is active, and provision on agent topic on that target + // but only send 1 message so MM Agents can read it relying on kafka delivery + for( MR_Cluster cluster: clusters.getCentralClusters() ) { + + target_cluster = cluster; + override = centralFqdn; + // we only want to send one message even if there are multiple central clusters + break; + + } + } else { + // In ONAP deployment architecture, the MM Agent is deployed with each target MR + target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster()); + override = null; + } + prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override ); + ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort )); + if ( ! resp.is2xx() ) { + errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage()); + mm.setStatus(DmaapObject_Status.INVALID); + } else { + prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override ); + resp = prov.doPostMessage(mm.getWhitelistUpdateJSON()); + if ( ! resp.is2xx()) { + errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage()); + mm.setStatus(DmaapObject_Status.INVALID); + } else { + mm.setStatus(DmaapObject_Status.VALID); + } + } mm.setLastMod(); return mirrors.put( mm.getMmName(), mm); @@ -155,7 +174,7 @@ public class MirrorMakerService extends BaseLoggingClass { return ret; } - public MirrorMaker getNextMM( String source, String target, String fqtn ) { + public MirrorMaker findNextMM( String source, String target, String fqtn ) { int i = 0; MirrorMaker mm = null; while( mm == null ) { @@ -190,7 +209,7 @@ public class MirrorMakerService extends BaseLoggingClass { int last = whitelist.size() - 1; String topic = whitelist.get(last); whitelist.remove(last); - MirrorMaker mm = this.getNextMM( source, target, topic ); + MirrorMaker mm = this.findNextMM( source, target, "aValueThatShouldNotMatchAnything" ); mm.addTopic(topic); this.updateMirrorMaker(mm); } @@ -200,5 +219,37 @@ public class MirrorMakerService extends BaseLoggingClass { return orig; } + + public static String getProvUser() { + return provUser; + } + + public static void setProvUser(String provUser) { + MirrorMakerService.provUser = provUser; + } + + public static String getProvUserPwd() { + return provUserPwd; + } + + public static void setProvUserPwd(String provUserPwd) { + MirrorMakerService.provUserPwd = provUserPwd; + } + + public static String getDefaultProducerPort() { + return defaultProducerPort; + } + + public static void setDefaultProducerPort(String defaultProducerPort) { + MirrorMakerService.defaultProducerPort = defaultProducerPort; + } + + public static String getDefaultConsumerPort() { + return defaultConsumerPort; + } + + public static void setDefaultConsumerPort(String defaultConsumerPort) { + MirrorMakerService.defaultConsumerPort = defaultConsumerPort; + } }