Support kafka attributes
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
index 6a93a21..49966d7 100644 (file)
@@ -66,10 +66,12 @@ public class TopicService extends BaseLoggingClass {
        
        private static String centralCname;
 
+
        public TopicService(){
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
                centralCname = p.getProperty("MR.CentralCname");
+
                
                logger.info( "TopicService properties: CentralCname=" + centralCname + 
                                "   defaultGlobarlMrHost=" + defaultGlobalMrHost  );
@@ -80,9 +82,18 @@ public class TopicService extends BaseLoggingClass {
        }
                
        public List<Topic> getAllTopics() {
+               return getAllTopics( true );
+       }
+       public List<Topic> getAllTopicsWithoutClients() {
+               return getAllTopics(false);
+       }
+       
+       private List<Topic> getAllTopics( Boolean withClients ) {
                ArrayList<Topic> topics = new ArrayList<Topic>(mr_topics.values());
-               for( Topic topic: topics ) {
-                       topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+               if ( withClients ) {
+                       for( Topic topic: topics ) {
+                               topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+                       }
                }
                return topics;
        }
@@ -240,10 +251,11 @@ public class TopicService extends BaseLoggingClass {
                String mmAgentRole = p.getProperty("MM.AgentRole");
                String[] Roles = { mmProvRole, mmAgentRole };
                String[] actions = { "view", "pub", "sub" };
-               Topic bridgeAdminTopic = new Topic();
+               Topic bridgeAdminTopic = new Topic().init();
                bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() );
                bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning");
                bridgeAdminTopic.setOwner( "DBC" );
+               
                ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
                for( String role: Roles ) {
                        MR_Client client = new MR_Client();
@@ -422,10 +434,7 @@ public class TopicService extends BaseLoggingClass {
                        if ( source != null && target != null ) {
                                try { 
                                        logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getMirrorMaker( source, target);
-                                       if ( mm == null ) {
-                                               mm = new MirrorMaker(source, target);
-                                       }
+                                       MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
                                        mm.addTopic(topic.getFqtn());
                                        bridge.updateMirrorMaker(mm);
                                } catch ( Exception ex ) {
@@ -443,6 +452,7 @@ public class TopicService extends BaseLoggingClass {
 
        }
        
+       
        /*
         * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
         * This was determined automatically based on presence of edge publishers and central subscribers.