Limit number of topics per mmagent whitelist 79/72379/1
authordglFromAtt <dgl@research.att.com>
Sat, 10 Nov 2018 18:29:08 +0000 (13:29 -0500)
committerdglFromAtt <dgl@research.att.com>
Sat, 10 Nov 2018 18:29:13 +0000 (13:29 -0500)
Change-Id: Id91106072aa2cc843414db78d568ccd1ecd69657
Signed-off-by: dglFromAtt <dgl@research.att.com>
Issue-ID: DMAAP-880

src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java
src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java
src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java
src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java

index 59d610c..5ec55d4 100644 (file)
@@ -24,6 +24,8 @@ import java.lang.reflect.*;
 import java.sql.*;
 import java.util.*;
 
+import org.onap.dmaap.dbcapi.model.*;
+
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
@@ -47,7 +49,7 @@ public class DBFieldHandler   {
                try {
                        objget = c.getMethod("is" + camelcase);
                } catch (Exception e) {
-                       errorLogger.error("Error", e);
+                       errorLogger.warn("No 'is' method for " + c.getName() + " so trying 'get' method");
                        objget = c.getMethod("get" + camelcase);
                }
                objset = c.getMethod("set" + camelcase, objget.getReturnType());
index a7041d0..7de42f1 100644 (file)
@@ -30,6 +30,7 @@ public class BrTopic {
        
        private String brSource;
        private String brTarget;
+       private String mmAgentName;
        private int topicCount;
        
        // no-op constructor used by framework
@@ -60,6 +61,14 @@ public class BrTopic {
                this.topicCount = topicCount;
        }
 
+       public String getMmAgentName() {
+               return mmAgentName;
+       }
+
+       public void setMmAgentName(String mmAgentName) {
+               this.mmAgentName = mmAgentName;
+       }
+
 
 
 }
index 6447123..e693afe 100644 (file)
@@ -29,7 +29,7 @@ import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
 import org.onap.dmaap.dbcapi.service.MirrorMakerService;
 
 public class MirrorMaker extends DmaapObject {
-       static final Logger logger = Logger.getLogger(MirrorMaker.class);
+
 
        private String  sourceCluster;
        private String  targetCluster;
@@ -41,8 +41,20 @@ public class MirrorMaker extends DmaapObject {
        public MirrorMaker(){
                
        }
-
+       public MirrorMaker(String source, String target, int i) {
+               initMM( source, target );
+               // original mm names did not have any index, so leave off index 0 for
+               // backwards compatibility
+               if ( i != 0 ) {
+                       String n = this.getMmName() + "_" + i;
+                       this.setMmName(n);
+               }
+       }
        public MirrorMaker(String source, String target) {
+               initMM( source, target );
+       }
+       
+       private void initMM(String source, String target) {
                sourceCluster = source;
                targetCluster = target;
                mmName = genKey(source, target);
@@ -72,7 +84,7 @@ public class MirrorMaker extends DmaapObject {
                                }
                        }   
         */
-       public String updateWhiteList() {
+       public String getWhitelistUpdateJSON() {
                StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"updateWhiteList\": {"  );
                str.append( " \"name\": \"" + this.getMmName() + "\", \"whitelist\": \"" );
                int numTargets = 0;
@@ -109,9 +121,9 @@ public class MirrorMaker extends DmaapObject {
                StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"createMirrorMaker\": {"  );
                str.append( " \"name\": \"" + this.getMmName() + "\", " );
                str.append( " \"consumer\": \"" + this.sourceCluster + ":" + consumerPort + "\", " );
-               str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\" ");
+               str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\", ");
                
-               str.append( " } }" );
+               str.append( " \"numStreams\": \"10\" } }" );
                
                return str.toString();
        }
index aab1cac..f1466ee 100644 (file)
@@ -56,108 +56,126 @@ public class BridgeResource extends BaseLoggingClass {
 
        @GET
        @ApiOperation( value = "return BrTopic details", 
-       notes = "Returns array of  `BrTopic` objects. If source and target query params are specified, only report on that bridge.  If detail param is true, list topics names, else just a count is returned", 
+       notes = "Returns array of  `BrTopic` objects. If source and target query params are specified, only report on that bridge.  "
+                       + "If detail param is true, list topics names, else just a count is returned.", 
        response = BrTopic.class)
 @ApiResponses( value = {
     @ApiResponse( code = 200, message = "Success", response = Dmaap.class),
     @ApiResponse( code = 400, message = "Error", response = ApiError.class )
 })
-       public Response getBridgedTopics(@QueryParam("source") String source,
-                                                                       @QueryParam("target") String target,
+       public Response getBridgedTopics(@QueryParam("mmagent") String mmagent,
                                                                        @QueryParam("detail") Boolean detailFlag ){
                ApiService check = new ApiService();
+               
+               if ( mmagent == null ) {
+                       return check.success(getMMcounts(Boolean.TRUE.equals(detailFlag)));
+
+               }
+               logger.info( "getBridgeTopics():" + " mmagent=" + mmagent);
 
                if ( ! Boolean.TRUE.equals(detailFlag)) {
                        BrTopic brTopic = new BrTopic();
                        
-                       logger.info( "getBridgeTopics():" + " source=" + source + ", target=" + target);
-       
-                       if (source != null && target != null) {         // get topics between 2 bridged locations
-                               brTopic.setBrSource(source);
-                               brTopic.setBrTarget(target);
-                               MirrorMaker mm = mmService.getMirrorMaker(source, target);
-                               if ( mm != null ) {             
-                                               brTopic.setTopicCount( mm.getTopicCount() );
-                               } 
-       
-                               logger.info( "topicCount [2 locations]: " + brTopic.getTopicCount() );
-                       }
-                       else if (source == null && target == null ) {
-                               List<String> mmList = mmService.getAllMirrorMakers();
-                               brTopic.setBrSource("all");
-                               brTopic.setBrTarget("all");
-                               int totCnt = 0;
-                               for( String key: mmList ) {
-                                       int mCnt = 0;
-                                       MirrorMaker mm = mmService.getMirrorMaker(key);
-                                       if ( mm != null ) {
-                                               mCnt = mm.getTopicCount();
-                                       }
-                                       logger.info( "Count for "+ key + ": " + mCnt);
-                                       totCnt += mCnt;
-                               }
-                               
-                               logger.info( "topicCount [all locations]: " + totCnt );
-                               brTopic.setTopicCount(totCnt);
-       
-                       }
-                       else {
-       
-                               logger.error( "source or target is missing");
-                               check.setCode(Status.BAD_REQUEST.getStatusCode());
-                               check.setMessage("Either both source and target or neither must be provided");
-                               return check.error();
-                       }
-                       return check.success(brTopic);
-               } else {
-                       
+                       // get topics between 2 bridged locations
+
+                       MirrorMaker mm = mmService.getMirrorMaker(mmagent);
+                       if ( mm == null ) {             
+                               return check.notFound();
+                       } 
+                                       
+                       brTopic.setTopicCount( mm.getTopicCount() );
+                       brTopic.setBrSource( mm.getSourceCluster());
+                       brTopic.setBrTarget( mm.getTargetCluster());
+                       brTopic.setMmAgentName(mm.getMmName());
                        
-                       logger.info( "getBridgeTopics() detail:" + " source=" + source + ", target=" + target);
-       
-                       if (source != null && target != null) {         // get topics between 2 bridged locations
-                               
-                               MirrorMaker mm = mmService.getMirrorMaker(source, target);
-                               if ( mm == null ) {             
-                                       return check.notFound();
-                               } 
-       
-                               return check.success(mm);
-                       }
+                       logger.info( "topicCount [2 locations]: " + brTopic.getTopicCount() );
+               
+                       return check.success(brTopic);
+               } else {        
+                       logger.info( "getBridgeTopics() detail:" + " mmagent=" + mmagent);
+                       // get topics between 2 bridged locations       
+                       MirrorMaker mm = mmService.getMirrorMaker(mmagent);
+                       if ( mm == null ) {             
+                               return check.notFound();
+                       } 
 
-                       else {
+                       return check.success(mm);
+               }
+       }
        
-                               logger.error( "source and target are required when detail=true");
-                               check.setCode(Status.BAD_REQUEST.getStatusCode());
-                               check.setMessage("source and target are required when detail=true");
-                               return check.error();
+       private BrTopic[] getMMcounts( Boolean showDetail ) {
+               
+               List<String> mmList = mmService.getAllMirrorMakers();
+               int s = 1;
+               if ( showDetail ) {
+                       s = mmList.size() + 1;
+               }
+               BrTopic[] brTopic = new BrTopic[s];
+               
+               int totCnt = 0;
+               s = 0;
+               for( String key: mmList ) {
+                       int mCnt = 0;
+                       MirrorMaker mm = mmService.getMirrorMaker(key);
+                       if ( mm != null ) {
+                               mCnt = mm.getTopicCount();
+                       }
+                       logger.info( "Count for "+ key + ": " + mCnt);
+                       totCnt += mCnt;
+                       if (showDetail) {
+                               brTopic[s] =  new BrTopic();
+                               brTopic[s].setBrSource( mm.getSourceCluster());
+                               brTopic[s].setBrTarget(mm.getTargetCluster());
+                               brTopic[s].setMmAgentName(mm.getMmName());
+                               brTopic[s].setTopicCount(mm.getTopicCount());
+                               s++;
                        }
                }
+               
+               logger.info( "topicCount [all locations]: " + totCnt );
+               brTopic[s] =  new BrTopic();
+               brTopic[s].setBrSource("all");
+               brTopic[s].setBrTarget("all");
+               brTopic[s].setMmAgentName("n/a");
+               brTopic[s].setTopicCount(totCnt);
+               return brTopic;
        }
        
        @PUT
        @ApiOperation( value = "update MirrorMaker details", 
-               notes = "replace the topic list for a specific Bridge.  Use JSON Body for value to replace whitelist, but if refreshFlag param is true, simply refresh using existing whitelist", 
+               notes = "replace the topic list for a specific Bridge.  Use JSON Body for value to replace whitelist, "
+                               + "but if refreshFlag param is true, simply refresh using existing whitelist."
+                               + "If split param is true, spread whitelist over smaller mmagents.", 
                response = MirrorMaker.class)
        @ApiResponses( value = {
            @ApiResponse( code = 200, message = "Success", response = Dmaap.class),
            @ApiResponse( code = 400, message = "Error", response = ApiError.class )
        })
-       public Response putBridgedTopics(@QueryParam("source") String source,
-                                                                       @QueryParam("target") String target,
+       public Response putBridgedTopics(@QueryParam("mmagent") String mmagent,
                                                                        @QueryParam("refresh") Boolean refreshFlag,
+                                                                       @QueryParam("split") Boolean splitFlag,
                                                                        MirrorMaker newBridge ){
                ApiService check = new ApiService();    
                        
-               logger.info( "putBridgeTopics() detail:" + " source=" + source + ", target=" + target);
+               logger.info( "putBridgeTopics() mmagent:" +  mmagent );
 
-               if (source != null && target != null) {         // get topics between 2 bridged locations
+               if ( mmagent != null ) {                // put topics between 2 bridged locations
                        
-                       MirrorMaker mm = mmService.getMirrorMaker(source, target);
+                       MirrorMaker mm = mmService.getMirrorMaker(mmagent);
                        if ( mm == null ) {             
                                return check.notFound();
                        } 
-                       if ( refreshFlag != null  &&  refreshFlag == false ) {
-                               logger.info( "setting whitelist from message body");
+                       
+                       if ( splitFlag != null && splitFlag == true ) {
+                               mm = mmService.splitMM( mm );
+                       } else if ( refreshFlag == null  ||  refreshFlag == false ) {
+                               logger.info( "setting whitelist from message body containing mmName=" + newBridge.getMmName());
+                               if ( ! mmagent.equals(newBridge.getMmName()) ){
+                                       logger.error( "mmagent query param does not match mmName in body");
+                                       check.setCode(Status.BAD_REQUEST.getStatusCode());
+                                       check.setMessage("mmagent query param does not match mmName in body");
+                                       return check.error();
+                               }
                                mm.setTopics( newBridge.getTopics() );
                        } else {
                                logger.info( "refreshing whitelist from memory");
@@ -168,9 +186,9 @@ public class BridgeResource extends BaseLoggingClass {
 
                else {
 
-                       logger.error( "source and target are required when detail=true");
+                       logger.error( "mmagent is required for PUT");
                        check.setCode(Status.BAD_REQUEST.getStatusCode());
-                       check.setMessage("source and target are required when detail=true");
+                       check.setMessage("mmagent is required for PUT");
                        return check.error();
                }
 
index 413590f..8acc4f3 100644 (file)
@@ -56,6 +56,7 @@ public class MirrorMakerService extends BaseLoggingClass {
        private static String defaultProducerPort;
        private static String defaultConsumerPort;
        private static String centralFqdn;
+       private int maxTopicsPerMM;
        
        public MirrorMakerService() {
                super();
@@ -66,6 +67,7 @@ public class MirrorMakerService extends BaseLoggingClass {
                defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092");
                defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");       
                centralFqdn = p.getProperty("MR.CentralCname", "notSet");
+               maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
        }
 
        // will create a MM on MMagent if needed
@@ -90,7 +92,7 @@ public class MirrorMakerService extends BaseLoggingClass {
                                mm.setStatus(DmaapObject_Status.INVALID);
                        } else {
                                prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
-                               resp = prov.doPostMessage(mm.updateWhiteList());
+                               resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
                                if ( ! resp.is2xx()) {
                                        errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
                                        mm.setStatus(DmaapObject_Status.INVALID);
@@ -109,6 +111,19 @@ public class MirrorMakerService extends BaseLoggingClass {
                mm.setLastMod();
                return mirrors.put( mm.getMmName(), mm);
        }
+       public MirrorMaker getMirrorMaker( String part1, String part2, int index ) {
+               String targetPart;
+
+               // original mm names did not have any index, so leave off index 0 for
+               // backwards compatibility
+               if ( index == 0 ) {
+                       targetPart = part2;
+               } else {
+                       targetPart = part2 + "_" + index;
+               }
+               logger.info( "getMirrorMaker using " + part1 + " and " + targetPart );
+               return mirrors.get(MirrorMaker.genKey(part1, targetPart));
+       }
        public MirrorMaker getMirrorMaker( String part1, String part2 ) {
                logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
                return mirrors.get(MirrorMaker.genKey(part1, part2));
@@ -139,5 +154,48 @@ public class MirrorMakerService extends BaseLoggingClass {
                
                return ret;
        }
+       
+       public MirrorMaker getNextMM( String source, String target ) {
+               int i = 0;
+               MirrorMaker mm = null;
+               while( mm == null ) {
+                       
+                       mm = this.getMirrorMaker( source, target, i);
+                       if ( mm == null ) {
+                               mm = new MirrorMaker(source, target, i);
+                       }
+                       if ( mm.getTopicCount() >= maxTopicsPerMM ) {
+                               logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics.  Moving to next MM");
+                               i++;
+                               mm = null;
+                       }
+               }
+        
+               
+               return mm;
+       }
+
+       public MirrorMaker splitMM( MirrorMaker orig ) {
+               
+               int index = 1;
+               String source = orig.getSourceCluster();
+               String target = orig.getTargetCluster();
+               
+               
+               ArrayList<String> whitelist = orig.getTopics();
+               while( whitelist.size() > maxTopicsPerMM ) {
+                       MirrorMaker mm = this.getNextMM( source, target );
+                       int last = whitelist.size() - 1;
+                       String topic = whitelist.get(last);
+                       whitelist.remove(last);
+                       mm.addTopic(topic);     
+                       this.updateMirrorMaker(mm);
+               }
+               
+               orig.setTopics(whitelist);
+
+               return orig;
+               
+       }
 
 }
index e4bc96c..a633982 100644 (file)
@@ -66,10 +66,12 @@ public class TopicService extends BaseLoggingClass {
        
        private static String centralCname;
 
+
        public TopicService(){
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
                centralCname = p.getProperty("MR.CentralCname");
+
                
                logger.info( "TopicService properties: CentralCname=" + centralCname + 
                                "   defaultGlobarlMrHost=" + defaultGlobalMrHost  );
@@ -431,10 +433,7 @@ public class TopicService extends BaseLoggingClass {
                        if ( source != null && target != null ) {
                                try { 
                                        logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getMirrorMaker( source, target);
-                                       if ( mm == null ) {
-                                               mm = new MirrorMaker(source, target);
-                                       }
+                                       MirrorMaker mm = bridge.getNextMM( source, target);
                                        mm.addTopic(topic.getFqtn());
                                        bridge.updateMirrorMaker(mm);
                                } catch ( Exception ex ) {
@@ -452,6 +451,7 @@ public class TopicService extends BaseLoggingClass {
 
        }
        
+       
        /*
         * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
         * This was determined automatically based on presence of edge publishers and central subscribers.
index 547bfc9..39de2be 100644 (file)
@@ -92,7 +92,7 @@ public class MirrorMakerTest {
 
                int i = t.getTopicCount();
 
-               String s = t.updateWhiteList();
+               String s = t.getWhitelistUpdateJSON();
 
                s = t.createMirrorMaker(p1, p2);