Alternative MR replication method
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
index eed5022..3943419 100644 (file)
@@ -25,23 +25,25 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.ws.rs.core.Response.Status;
 
 import org.onap.dmaap.dbcapi.aaf.AafService;
-import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
+import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
 import org.onap.dmaap.dbcapi.database.DatabaseClass;
 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
 import org.onap.dmaap.dbcapi.model.ApiError;
+import org.onap.dmaap.dbcapi.model.DcaeLocation;
 import org.onap.dmaap.dbcapi.model.Dmaap;
+import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
 import org.onap.dmaap.dbcapi.model.MR_Client;
 import org.onap.dmaap.dbcapi.model.MR_Cluster;
 import org.onap.dmaap.dbcapi.model.MirrorMaker;
 import org.onap.dmaap.dbcapi.model.ReplicationType;
 import org.onap.dmaap.dbcapi.model.Topic;
-import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
 import org.onap.dmaap.dbcapi.util.DmaapConfig;
 import org.onap.dmaap.dbcapi.util.Fqdn;
 import org.onap.dmaap.dbcapi.util.Graph;
@@ -54,16 +56,21 @@ public class TopicService extends BaseLoggingClass {
        private static String defaultGlobalMrHost;
        
        private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
-       private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters();
        
        private static DmaapService dmaapSvc = new DmaapService();
        private static Dmaap dmaap = new DmaapService().getDmaap();
        private MR_ClientService clientService = new MR_ClientService();
+       private MR_ClusterService clusters = new MR_ClusterService();
+       private DcaeLocationService locations = new DcaeLocationService();
        private MirrorMakerService      bridge = new MirrorMakerService();
+       
+       private static String centralCname;
 
        public TopicService(){
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
+               centralCname = p.getProperty("MR.CentralCname");
+               logger.info( "TopicService properties: CentralCname=" + centralCname + "   defaultGlobarlMrHost=" + defaultGlobalMrHost );
        }
        
        public Map<String, Topic> getTopics() {                 
@@ -165,9 +172,12 @@ public class TopicService extends BaseLoggingClass {
                Topic ntopic = checkForBridge( topic, err );
                if ( ntopic == null ) {
                        topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
+                       if ( ! err.is2xx()) {
+                               return null;
+                       }
                }
 
+               
                mr_topics.put( nFqtn, ntopic );
 
                err.setCode(Status.OK.getStatusCode());
@@ -183,7 +193,9 @@ public class TopicService extends BaseLoggingClass {
                Topic ntopic = checkForBridge( topic, err );
                if ( ntopic == null ) {
                        topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
+                       if ( ! err.is2xx() ) {
+                               return null;
+                       }
                }
                mr_topics.put( ntopic.getFqtn(), ntopic );
                err.setCode(Status.OK.getStatusCode());
@@ -252,63 +264,147 @@ public class TopicService extends BaseLoggingClass {
                        return topic;   
                }
                
-               boolean anythingWrong = false;                          
-               String centralFqdn = new String();
-               Graph graph = new Graph( topic.getClients(), true );
+               boolean anythingWrong = false;
+               
+               Set<String> groups = clusters.getGroups();
+               for ( String g : groups ) {
+                       anythingWrong |= buildBridge( topic, err, g );
+               }
+               if ( anythingWrong ) {
+                       topic.setStatus( DmaapObject_Status.INVALID);
+                       if ( ! err.is2xx() ) {
+                               return null;
+                       }       
+               } else {
+                       topic.setStatus( DmaapObject_Status.VALID);
+               }
+               return topic;
+       }
+               
+       private boolean buildBridge( Topic topic, ApiError err, String group ) {
+
+               boolean anythingWrong = false;
+               Graph graph;
+               if ( group == null || group.isEmpty() ) {
+                       graph = new Graph( topic.getClients(), true );
+               } else {
+                       graph = new Graph( topic.getClients(), true, group );
+               }
+               MR_Cluster groupCentralCluster = null;
                
-               if ( graph.isHasCentral() ) {
-                       DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
-                       centralFqdn = p.getProperty("MR.CentralCname");
-                       logger.info( "CentralCname=" + centralFqdn );
+               if ( graph.isEmpty() ) {
+                       return false;
+               } else if ( group == null &&  topic.getReplicationCase().involvesFQDN() ) {
+                       return false;
+               } else if ( ! graph.hasCentral() ) {
+                       logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
+                       return true;
                } else {
-                       logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients");
+                       groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
                }
-               Collection<String> locations = graph.getKeys();
-               for( String loc : locations ) {
+               Collection<String> clientLocations = graph.getKeys();
+               for( String loc : clientLocations ) {
                        logger.info( "loc=" + loc );
-                       MR_Cluster cluster = clusters.get(loc);
+                       DcaeLocation location = locations.getDcaeLocation(loc);
+                       MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
                        logger.info( "cluster=" + cluster );
 
                        
                                
                        String source = null;
                        String target = null;
+                       
                        /*
-                        * all replication rules have 1 bridge...
+                        * Provision Edge to Central bridges...
                         */
-                       switch( topic.getReplicationCase() ) {
-                       case REPLICATION_EDGE_TO_CENTRAL:
-                       case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                       if ( ! location.isCentral()  && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+                               switch( topic.getReplicationCase() ) {
+                               case REPLICATION_EDGE_TO_CENTRAL:
+                               case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
+                                       source = cluster.getFqdn();
+                                       target = centralCname;
                                        break;
-                               }
-                               source = cluster.getFqdn();
-                               target = centralFqdn;
-                               break;
-                       case REPLICATION_CENTRAL_TO_EDGE:
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = cluster.getFqdn();
-                               break;
-                       case REPLICATION_CENTRAL_TO_GLOBAL:
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                               case REPLICATION_CENTRAL_TO_EDGE:
+                               case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
+                                       source = centralCname;
+                                       target = cluster.getFqdn();
+                                       break;
+                               case REPLICATION_CENTRAL_TO_GLOBAL:
+                               case REPLICATION_GLOBAL_TO_CENTRAL:
+                               case REPLICATION_FQDN_TO_GLOBAL:
+                               case REPLICATION_GLOBAL_TO_FQDN:
+                                       break;
+
+                               case REPLICATION_EDGE_TO_FQDN:
+                               case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2C portion only
+                                       source = cluster.getFqdn();
+                                       target = groupCentralCluster.getFqdn();
+                                       break;
+                               case REPLICATION_FQDN_TO_EDGE:
+                               case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for F2E portion only
+                                       source = groupCentralCluster.getFqdn();
+                                       target = cluster.getFqdn();
+                                       break;
+
+                               default:
+                                       logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                                       anythingWrong = true;
+                                       err.setCode(400);
+                                       err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+                                                       + topic.getReplicationCase() );
+                                       err.setMessage("Unexpected value for ReplicationType");
                                        continue;
                                }
-                               source = centralFqdn;
-                               target = topic.getGlobalMrURL();
-                               break;
-                       case REPLICATION_GLOBAL_TO_CENTRAL:
-                       case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+
+                       } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+                               /*
+                                * Provision Central to Global bridges
+                                */
+                               switch( topic.getReplicationCase() ) {
+
+                               case REPLICATION_CENTRAL_TO_GLOBAL:
+                               case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
+                                       source = centralCname;
+                                       target = topic.getGlobalMrURL();
+                                       break;
+                               case REPLICATION_GLOBAL_TO_CENTRAL:
+                               case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
+                                       source = topic.getGlobalMrURL();
+                                       target = centralCname;
+                                       break;
+
+                               case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2F portion only
+                                       source = groupCentralCluster.getFqdn();
+                                       target = topic.getGlobalMrURL();
+                                       break;
+
+                               case REPLICATION_FQDN_TO_GLOBAL:
+                                       source = groupCentralCluster.getFqdn();
+                                       target = topic.getGlobalMrURL();
+                                       break;
+                                       
+                               case REPLICATION_GLOBAL_TO_FQDN:
+                               case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for G2F portion only
+                                       source = topic.getGlobalMrURL();
+                                       target = groupCentralCluster.getFqdn();
+                                       break;
+
+                               case REPLICATION_FQDN_TO_EDGE:
+                               case REPLICATION_EDGE_TO_FQDN:
+                               case REPLICATION_EDGE_TO_CENTRAL:
+                               case REPLICATION_CENTRAL_TO_EDGE:
+                                       break;
+                               default:
+                                       logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                                       anythingWrong = true;
+                                       err.setCode(400);
+                                       err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+                                                       + topic.getReplicationCase() );
+                                       err.setMessage("Unexpected value for ReplicationType");
                                        continue;
-                               }
-                               source = topic.getGlobalMrURL();
-                               target = centralFqdn;
-                               break;
-                       default:
-                               logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                               }                               
+                       } else {
+                               logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
                                anythingWrong = true;
                                continue;
                        }
@@ -328,65 +424,12 @@ public class TopicService extends BaseLoggingClass {
                                        anythingWrong = true;
                                        break;
                                }
-                       }
-                       
-                       
-                       /*
-                        * some replication rules have a 2nd bridge!
-                        */
-                       source = target = null;
-                       switch( topic.getReplicationCase() ) {
-                       case REPLICATION_EDGE_TO_CENTRAL:
-                       case REPLICATION_CENTRAL_TO_EDGE:
-                       case REPLICATION_CENTRAL_TO_GLOBAL:
-                       case REPLICATION_GLOBAL_TO_CENTRAL:
-                               continue;
-                       case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for C2G portion only
-                               if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = topic.getGlobalMrURL();
-                               break;
-       
-                       case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = cluster.getFqdn();
-                               break;
-                       default:
-                               logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
-                               anythingWrong = true;
-                               break;
-                       }
-                       if ( source != null && target != null ) {
-                               try { 
-                                       logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getMirrorMaker( source, target);
-                                       if ( mm == null ) {
-                                               mm = new MirrorMaker(source, target);
-                                       }
-                                       mm.addTopic(topic.getFqtn());
-                                       bridge.updateMirrorMaker(mm);
-                               } catch ( Exception ex ) {
-                                       err.setCode(500);
-                                       err.setFields( "mirror_maker.topic");
-                                       err.setMessage("Unexpected condition: " + ex );
-                                       anythingWrong = true;
-                                       break;
-                               }       
-                       }
+                       }                       
+
                        
                }
-               if ( anythingWrong ) {
-                       topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
-               }
-       
-               topic.setStatus( DmaapObject_Status.VALID);
-               return topic;
+               return  anythingWrong;
+
        }
        
        /*
@@ -402,7 +445,7 @@ public class TopicService extends BaseLoggingClass {
                        Graph graph = new Graph( topic.getClients(), false );
                        
                        String centralFqdn = new String();
-                       if ( graph.isHasCentral() ) {
+                       if ( graph.hasCentral() ) {
                                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                                centralFqdn = p.getProperty("MR.CentralCname");
                        }
@@ -410,12 +453,12 @@ public class TopicService extends BaseLoggingClass {
                        Collection<String> locations = graph.getKeys();
                        for( String loc : locations ) {
                                logger.info( "loc=" + loc );
-                               MR_Cluster cluster = clusters.get(loc);
+                               MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
                                if ( cluster == null ) {
                                        logger.info( "No MR cluster for location " + loc );
                                        continue;
                                }
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                               if ( graph.hasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
                                        logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
                                        return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;