Merge "DcaeLocation tests and refactor"
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
index c5937f4..68dfd51 100644 (file)
@@ -70,6 +70,7 @@ public class TopicService extends BaseLoggingClass {
        private static String centralCname;
        private static boolean createTopicRoles;
        private boolean strictGraph = true;
+       private boolean mmPerMR;
 
 
        public TopicService(){
@@ -81,9 +82,11 @@ public class TopicService extends BaseLoggingClass {
                if ( unit_test.equals( "Yes" ) ) {
                        strictGraph = false;
                }
+               mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
                logger.info( "TopicService properties: CentralCname=" + centralCname + 
                                "   defaultGlobarlMrHost=" + defaultGlobalMrHost +
-                               " createTopicRoles=" + createTopicRoles );
+                               " createTopicRoles=" + createTopicRoles +
+                               " mmPerMR=" + mmPerMR );
        }
        
        public Map<String, Topic> getTopics() {                 
@@ -381,7 +384,7 @@ public class TopicService extends BaseLoggingClass {
        
        public Topic checkForBridge( Topic topic, ApiError err ) {
                logger.info( "checkForBridge: entry");
-               logger.info( "fqtn=" + topic.getFqtn() + "replicatonType=" + topic.getReplicationCase());
+               logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase());
                if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
                        topic.setStatus( DmaapObject_Status.VALID);
                        return topic;   
@@ -436,7 +439,8 @@ public class TopicService extends BaseLoggingClass {
                        logger.info( "loc=" + loc );
                        DcaeLocation location = locations.getDcaeLocation(loc);
                        MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
-                       logger.info( "cluster=" + cluster );
+                       logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() );
+                       logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() );
 
                        
                                
@@ -451,11 +455,12 @@ public class TopicService extends BaseLoggingClass {
                                case REPLICATION_EDGE_TO_CENTRAL:
                                case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
                                        source = cluster.getFqdn();
-                                       target = centralCname;
+                                       target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname;
+                                       logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target );
                                        break;
                                case REPLICATION_CENTRAL_TO_EDGE:
                                case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
-                                       source = centralCname;
+                                       source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
                                        target = cluster.getFqdn();
                                        break;
                                case REPLICATION_CENTRAL_TO_GLOBAL:
@@ -540,7 +545,7 @@ public class TopicService extends BaseLoggingClass {
                        if ( source != null && target != null ) {
                                try { 
                                        logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
+                                       MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn());
                                        mm.addTopic(topic.getFqtn());
                                        bridge.updateMirrorMaker(mm);
                                } catch ( Exception ex ) {