Junits for TopicService
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
index c5937f4..08e58be 100644 (file)
@@ -59,34 +59,48 @@ public class TopicService extends BaseLoggingClass {
        // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
        private static String defaultGlobalMrHost;
        
-       private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
+       private Map<String, Topic> mr_topics;
        
        private static DmaapService dmaapSvc = new DmaapService();
-       private MR_ClientService clientService = new MR_ClientService();
-       private MR_ClusterService clusters = new MR_ClusterService();
-       private DcaeLocationService locations = new DcaeLocationService();
-       private MirrorMakerService      bridge = new MirrorMakerService();
+       private MR_ClientService clientService;
+       private MR_ClusterService clusters;
+       private DcaeLocationService locations;
+       private MirrorMakerService      bridge;
        
        private static String centralCname;
        private static boolean createTopicRoles;
        private boolean strictGraph = true;
+       private boolean mmPerMR;
 
 
        public TopicService(){
-               DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+               this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig)DmaapConfig.getConfig(),
+                               new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService());
+
+       }
+
+       TopicService(Map<String, Topic> mr_topics,  MR_ClientService clientService, DmaapConfig p,
+                                MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService  bridge) {
+               this.mr_topics = mr_topics;
+               this.clientService = clientService;
                defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
                centralCname = p.getProperty("MR.CentralCname");
                createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true"));
                String unit_test = p.getProperty( "UnitTest", "No" );
-               if ( unit_test.equals( "Yes" ) ) {
+               if ( "Yes".equals(unit_test)) {
                        strictGraph = false;
                }
-               logger.info( "TopicService properties: CentralCname=" + centralCname + 
+               mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
+               logger.info( "TopicService properties: CentralCname=" + centralCname +
                                "   defaultGlobarlMrHost=" + defaultGlobalMrHost +
-                               " createTopicRoles=" + createTopicRoles );
+                               " createTopicRoles=" + createTopicRoles +
+                               " mmPerMR=" + mmPerMR );
+               this.clusters = clusters;
+               this.locations = locations;
+               this.bridge = bridge;
        }
-       
-       public Map<String, Topic> getTopics() {                 
+
+       public Map<String, Topic> getTopics() {
                return mr_topics;
        }
                
@@ -101,7 +115,7 @@ public class TopicService extends BaseLoggingClass {
                ArrayList<Topic> topics = new ArrayList<>(mr_topics.values());
                if ( withClients ) {
                        for( Topic topic: topics ) {
-                               topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+                               topic.setClients(clientService.getAllMrClients(topic.getFqtn()));
                        }
                }
                return topics;
@@ -381,7 +395,7 @@ public class TopicService extends BaseLoggingClass {
        
        public Topic checkForBridge( Topic topic, ApiError err ) {
                logger.info( "checkForBridge: entry");
-               logger.info( "fqtn=" + topic.getFqtn() + "replicatonType=" + topic.getReplicationCase());
+               logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase());
                if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
                        topic.setStatus( DmaapObject_Status.VALID);
                        return topic;   
@@ -436,7 +450,8 @@ public class TopicService extends BaseLoggingClass {
                        logger.info( "loc=" + loc );
                        DcaeLocation location = locations.getDcaeLocation(loc);
                        MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
-                       logger.info( "cluster=" + cluster );
+                       logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() );
+                       logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() );
 
                        
                                
@@ -451,11 +466,12 @@ public class TopicService extends BaseLoggingClass {
                                case REPLICATION_EDGE_TO_CENTRAL:
                                case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
                                        source = cluster.getFqdn();
-                                       target = centralCname;
+                                       target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname;
+                                       logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target );
                                        break;
                                case REPLICATION_CENTRAL_TO_EDGE:
                                case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
-                                       source = centralCname;
+                                       source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
                                        target = cluster.getFqdn();
                                        break;
                                case REPLICATION_CENTRAL_TO_GLOBAL:
@@ -540,7 +556,7 @@ public class TopicService extends BaseLoggingClass {
                        if ( source != null && target != null ) {
                                try { 
                                        logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
+                                       MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn());
                                        mm.addTopic(topic.getFqtn());
                                        bridge.updateMirrorMaker(mm);
                                } catch ( Exception ex ) {