Provision MM at target MR
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / MirrorMakerService.java
index da9d822..5d695f4 100644 (file)
@@ -57,6 +57,7 @@ public class MirrorMakerService extends BaseLoggingClass {
        private static String defaultConsumerPort;
        private static String centralFqdn;
        private int maxTopicsPerMM;
+       private boolean mmPerMR;
        
        public MirrorMakerService() {
                super();
@@ -68,6 +69,7 @@ public class MirrorMakerService extends BaseLoggingClass {
                defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");       
                centralFqdn = p.getProperty("MR.CentralCname", "notSet");
                maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
+               mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
        }
 
        // will create a MM on MMagent if needed
@@ -79,34 +81,43 @@ public class MirrorMakerService extends BaseLoggingClass {
        
                DmaapService dmaap = new DmaapService();
                MR_ClusterService clusters = new MR_ClusterService();
-       
-               // in 1610, MM should only exist for edge-to-central
-               //  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
-               // but only send 1 message so MM Agents can read it relying on kafka delivery
-               for( MR_Cluster central: clusters.getCentralClusters() ) {
-                       prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn  );
-                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
-                       if ( ! resp.is2xx() ) {
-       
-                               errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
-                               mm.setStatus(DmaapObject_Status.INVALID);
-                       } else {
-                               prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
-                               resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
-                               if ( ! resp.is2xx()) {
-                                       errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
-                                       mm.setStatus(DmaapObject_Status.INVALID);
-                               } else {
-                                       mm.setStatus(DmaapObject_Status.VALID);
-                               }
-                       }
-                       
-                       // we only want to send one message even if there are multiple central clusters
-                       break;
+               MR_Cluster target_cluster = null;
+               String override = null;
                
-               } 
+               if ( ! mmPerMR ) {
+                       // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility
+                       //  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
+                       // but only send 1 message so MM Agents can read it relying on kafka delivery
+                       for( MR_Cluster cluster: clusters.getCentralClusters() ) {
+
+                               target_cluster = cluster;
+                               override = centralFqdn;
+                               // we only want to send one message even if there are multiple central clusters
+                               break;
+                       
+                       } 
+               } else {
+                       // In ONAP deployment architecture, the MM Agent is deployed with each target MR
+                       target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
+                       override = null;
+               }
                
+               prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override  );
+               ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
+               if ( ! resp.is2xx() ) {
 
+                       errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+                       mm.setStatus(DmaapObject_Status.INVALID);
+               } else {
+                       prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
+                       resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
+                       if ( ! resp.is2xx()) {
+                               errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
+                               mm.setStatus(DmaapObject_Status.INVALID);
+                       } else {
+                               mm.setStatus(DmaapObject_Status.VALID);
+                       }
+               }
 
                mm.setLastMod();
                return mirrors.put( mm.getMmName(), mm);