Limit number of topics per mmagent whitelist
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
index 6e7b744..a633982 100644 (file)
@@ -66,10 +66,12 @@ public class TopicService extends BaseLoggingClass {
        
        private static String centralCname;
 
+
        public TopicService(){
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
                centralCname = p.getProperty("MR.CentralCname");
+
                
                logger.info( "TopicService properties: CentralCname=" + centralCname + 
                                "   defaultGlobarlMrHost=" + defaultGlobalMrHost  );
@@ -80,9 +82,18 @@ public class TopicService extends BaseLoggingClass {
        }
                
        public List<Topic> getAllTopics() {
+               return getAllTopics( true );
+       }
+       public List<Topic> getAllTopicsWithoutClients() {
+               return getAllTopics(false);
+       }
+       
+       private List<Topic> getAllTopics( Boolean withClients ) {
                ArrayList<Topic> topics = new ArrayList<Topic>(mr_topics.values());
-               for( Topic topic: topics ) {
-                       topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+               if ( withClients ) {
+                       for( Topic topic: topics ) {
+                               topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+                       }
                }
                return topics;
        }
@@ -102,14 +113,19 @@ public class TopicService extends BaseLoggingClass {
                return t;
        }
 
-       public Topic addTopic( Topic topic, ApiError err ) {
+       public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) {
                logger.info( "Entry: addTopic");
                logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() );
                String nFqtn =  topic.genFqtn();
                logger.info( "FQTN=" + nFqtn );
-               if ( getTopic( nFqtn, err ) != null ) {
+               Topic pTopic = getTopic( nFqtn, err );
+               if ( pTopic != null ) {
                        String t = "topic already exists: " + nFqtn;
                        logger.info( t );
+                       if (  useExisting ) {
+                               err.setCode(Status.OK.getStatusCode());
+                               return pTopic;
+                       }
                        err.setMessage( t );
                        err.setFields( "fqtn");
                        err.setCode(Status.CONFLICT.getStatusCode());
@@ -251,7 +267,7 @@ public class TopicService extends BaseLoggingClass {
                
                TopicService ts = new TopicService();
                ApiError err = new ApiError();
-               ts.addTopic(bridgeAdminTopic, err);
+               ts.addTopic(bridgeAdminTopic, err, true);
                
                if ( err.is2xx() || err.getCode() == 409 ){
                        err.setCode(200);
@@ -417,10 +433,7 @@ public class TopicService extends BaseLoggingClass {
                        if ( source != null && target != null ) {
                                try { 
                                        logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getMirrorMaker( source, target);
-                                       if ( mm == null ) {
-                                               mm = new MirrorMaker(source, target);
-                                       }
+                                       MirrorMaker mm = bridge.getNextMM( source, target);
                                        mm.addTopic(topic.getFqtn());
                                        bridge.updateMirrorMaker(mm);
                                } catch ( Exception ex ) {
@@ -438,6 +451,7 @@ public class TopicService extends BaseLoggingClass {
 
        }
        
+       
        /*
         * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
         * This was determined automatically based on presence of edge publishers and central subscribers.