private static String defaultConsumerPort;
private static String centralFqdn;
private int maxTopicsPerMM;
+ private boolean mmPerMR;
public MirrorMakerService() {
super();
defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");
centralFqdn = p.getProperty("MR.CentralCname", "notSet");
maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
+ mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
}
// will create a MM on MMagent if needed
DmaapService dmaap = new DmaapService();
MR_ClusterService clusters = new MR_ClusterService();
-
- // in 1610, MM should only exist for edge-to-central
- // we use a cname for the central MR cluster that is active, and provision on agent topic on that target
- // but only send 1 message so MM Agents can read it relying on kafka delivery
- for( MR_Cluster central: clusters.getCentralClusters() ) {
- prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
- if ( ! resp.is2xx() ) {
-
- errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
- mm.setStatus(DmaapObject_Status.INVALID);
- } else {
- prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
- if ( ! resp.is2xx()) {
- errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
- mm.setStatus(DmaapObject_Status.INVALID);
- } else {
- mm.setStatus(DmaapObject_Status.VALID);
- }
- }
-
- // we only want to send one message even if there are multiple central clusters
- break;
+ MR_Cluster target_cluster = null;
+ String override = null;
- }
+ if ( ! mmPerMR ) {
+ // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility
+ // we use a cname for the central MR cluster that is active, and provision on agent topic on that target
+ // but only send 1 message so MM Agents can read it relying on kafka delivery
+ for( MR_Cluster cluster: clusters.getCentralClusters() ) {
+
+ target_cluster = cluster;
+ override = centralFqdn;
+ // we only want to send one message even if there are multiple central clusters
+ break;
+
+ }
+ } else {
+ // In ONAP deployment architecture, the MM Agent is deployed with each target MR
+ target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
+ override = null;
+ }
+ prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
+ ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
+ if ( ! resp.is2xx() ) {
+ errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+ mm.setStatus(DmaapObject_Status.INVALID);
+ } else {
+ prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
+ resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
+ if ( ! resp.is2xx()) {
+ errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
+ mm.setStatus(DmaapObject_Status.INVALID);
+ } else {
+ mm.setStatus(DmaapObject_Status.VALID);
+ }
+ }
mm.setLastMod();
return mirrors.put( mm.getMmName(), mm);