Added MirrorMaker unit tests
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / MirrorMakerService.java
index 413590f..7c4b2ce 100644 (file)
@@ -51,21 +51,33 @@ public class MirrorMakerService extends BaseLoggingClass {
        private static MrTopicConnection prov;
        private static AafDecrypt decryptor;
        
+       static final String PROV_USER_PROPERTY = "MM.ProvUserMechId";
+       static final String PROV_PWD_PROPERTY = "MM.ProvUserPwd";
+       static final String PROV_PWD_DEFAULT = "pwdNotSet";
+       static final String SOURCE_REPLICATION_PORT_PROPERTY = "MR.SourceReplicationPort";
+       static final String SOURCE_REPLICATION_PORT_DEFAULT = "9092";
+       static final String TARGET_REPLICATION_PORT_PROPERTY = "MR.TargetReplicationPort";
+       static final String TARGET_REPLICATION_PORT_DEFAULT = "2181";
+       
        private static String provUser;
        private static String provUserPwd;
        private static String defaultProducerPort;
        private static String defaultConsumerPort;
        private static String centralFqdn;
+       private int maxTopicsPerMM;
+       private boolean mmPerMR;
        
        public MirrorMakerService() {
                super();
                decryptor = new AafDecrypt();
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
-               provUser = p.getProperty("MM.ProvUserMechId");
-               provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
-               defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092");
-               defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");       
+               provUser = p.getProperty(PROV_USER_PROPERTY);
+               provUserPwd = decryptor.decrypt(p.getProperty( PROV_PWD_PROPERTY, PROV_PWD_DEFAULT ));
+               defaultProducerPort = p.getProperty( SOURCE_REPLICATION_PORT_PROPERTY, SOURCE_REPLICATION_PORT_DEFAULT );
+               defaultConsumerPort = p.getProperty( TARGET_REPLICATION_PORT_PROPERTY, TARGET_REPLICATION_PORT_DEFAULT );       
                centralFqdn = p.getProperty("MR.CentralCname", "notSet");
+               maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
+               mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
        }
 
        // will create a MM on MMagent if needed
@@ -77,38 +89,60 @@ public class MirrorMakerService extends BaseLoggingClass {
        
                DmaapService dmaap = new DmaapService();
                MR_ClusterService clusters = new MR_ClusterService();
-       
-               // in 1610, MM should only exist for edge-to-central
-               //  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
-               // but only send 1 message so MM Agents can read it relying on kafka delivery
-               for( MR_Cluster central: clusters.getCentralClusters() ) {
-                       prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn  );
-                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
-                       if ( ! resp.is2xx() ) {
-       
-                               errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+               MR_Cluster target_cluster = null;
+               String override = null;
+               
+               if ( ! mmPerMR ) {
+                       // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility
+                       //  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
+                       // but only send 1 message so MM Agents can read it relying on kafka delivery
+                       for( MR_Cluster cluster: clusters.getCentralClusters() ) {
+
+                               target_cluster = cluster;
+                               override = centralFqdn;
+                               // we only want to send one message even if there are multiple central clusters
+                               break;
+                       
+                       } 
+               } else {
+                       // In ONAP deployment architecture, the MM Agent is deployed with each target MR
+                       target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
+                       override = null;
+               }
+               
+               prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override  );
+               ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
+               if ( ! resp.is2xx() ) {
+
+                       errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+                       mm.setStatus(DmaapObject_Status.INVALID);
+               } else {
+                       prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
+                       resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
+                       if ( ! resp.is2xx()) {
+                               errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
                                mm.setStatus(DmaapObject_Status.INVALID);
                        } else {
-                               prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
-                               resp = prov.doPostMessage(mm.updateWhiteList());
-                               if ( ! resp.is2xx()) {
-                                       errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
-                                       mm.setStatus(DmaapObject_Status.INVALID);
-                               } else {
-                                       mm.setStatus(DmaapObject_Status.VALID);
-                               }
+                               mm.setStatus(DmaapObject_Status.VALID);
                        }
-                       
-                       // we only want to send one message even if there are multiple central clusters
-                       break;
-               
-               } 
-               
-
+               }
 
                mm.setLastMod();
                return mirrors.put( mm.getMmName(), mm);
        }
+       public MirrorMaker getMirrorMaker( String part1, String part2, int index ) {
+               String targetPart;
+
+               // original mm names did not have any index, so leave off index 0 for
+               // backwards compatibility
+               if ( index == 0 ) {
+                       targetPart = part2;
+               } else {
+                       targetPart = part2 + "_" + index;
+               }
+               logger.info( "getMirrorMaker using " + part1 + " and " + targetPart );
+               return mirrors.get(MirrorMaker.genKey(part1, targetPart));
+       }
        public MirrorMaker getMirrorMaker( String part1, String part2 ) {
                logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
                return mirrors.get(MirrorMaker.genKey(part1, part2));
@@ -139,5 +173,83 @@ public class MirrorMakerService extends BaseLoggingClass {
                
                return ret;
        }
+       
+       public MirrorMaker findNextMM( String source, String target, String fqtn ) {
+               int i = 0;
+               MirrorMaker mm = null;
+               while( mm == null ) {
+                       
+                       mm = this.getMirrorMaker( source, target, i);
+                       if ( mm == null ) {
+                               mm = new MirrorMaker(source, target, i);
+                       }
+                       if ( mm.getTopics().contains(fqtn) ) {
+                               break;
+                       }
+                       if ( mm.getTopicCount() >= maxTopicsPerMM ) {
+                               logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics.  Moving to next MM");
+                               i++;
+                               mm = null;
+                       }
+               }
+        
+               
+               return mm;
+       }
+
+       public MirrorMaker splitMM( MirrorMaker orig ) {
+               
+               String source = orig.getSourceCluster();
+               String target = orig.getTargetCluster();
+               
+               
+               ArrayList<String> whitelist = orig.getTopics();
+               while( whitelist.size() > maxTopicsPerMM ) {
+                       
+                       int last = whitelist.size() - 1;
+                       String topic = whitelist.get(last);
+                       whitelist.remove(last);
+                       MirrorMaker mm = this.findNextMM( source, target, "aValueThatShouldNotMatchAnything" );
+                       mm.addTopic(topic);     
+                       this.updateMirrorMaker(mm);
+               }
+               
+               orig.setTopics(whitelist);
+
+               return orig;
+               
+       }
+       
+       public static String getProvUser() {
+               return provUser;
+       }
+
+       public static void setProvUser(String provUser) {
+               MirrorMakerService.provUser = provUser;
+       }
+
+       public static String getProvUserPwd() {
+               return provUserPwd;
+       }
+
+       public static void setProvUserPwd(String provUserPwd) {
+               MirrorMakerService.provUserPwd = provUserPwd;
+       }
+
+       public static String getDefaultProducerPort() {
+               return defaultProducerPort;
+       }
+
+       public static void setDefaultProducerPort(String defaultProducerPort) {
+               MirrorMakerService.defaultProducerPort = defaultProducerPort;
+       }
+
+       public static String getDefaultConsumerPort() {
+               return defaultConsumerPort;
+       }
+
+       public static void setDefaultConsumerPort(String defaultConsumerPort) {
+               MirrorMakerService.defaultConsumerPort = defaultConsumerPort;
+       }
 
 }