Updates for more code coverage
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
index d3c849c..cb81619 100644 (file)
@@ -3,6 +3,8 @@
  * org.onap.dmaap
  * ================================================================================
  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ *
+ * Modifications Copyright (C) 2019 IBM.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,24 +27,30 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.ws.rs.core.Response.Status;
 
+import org.onap.dmaap.dbcapi.aaf.AafNamespace;
+import org.onap.dmaap.dbcapi.aaf.AafRole;
 import org.onap.dmaap.dbcapi.aaf.AafService;
-import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
+import org.onap.dmaap.dbcapi.aaf.DmaapGrant;
 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
+import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
 import org.onap.dmaap.dbcapi.database.DatabaseClass;
 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
 import org.onap.dmaap.dbcapi.model.ApiError;
+import org.onap.dmaap.dbcapi.model.DcaeLocation;
 import org.onap.dmaap.dbcapi.model.Dmaap;
+import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
 import org.onap.dmaap.dbcapi.model.MR_Client;
 import org.onap.dmaap.dbcapi.model.MR_Cluster;
 import org.onap.dmaap.dbcapi.model.MirrorMaker;
 import org.onap.dmaap.dbcapi.model.ReplicationType;
 import org.onap.dmaap.dbcapi.model.Topic;
-import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
 import org.onap.dmaap.dbcapi.util.DmaapConfig;
+import org.onap.dmaap.dbcapi.util.Fqdn;
 import org.onap.dmaap.dbcapi.util.Graph;
 
 public class TopicService extends BaseLoggingClass {
@@ -53,16 +61,30 @@ public class TopicService extends BaseLoggingClass {
        private static String defaultGlobalMrHost;
        
        private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
-       private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters();
        
        private static DmaapService dmaapSvc = new DmaapService();
-       private static Dmaap dmaap = new DmaapService().getDmaap();
        private MR_ClientService clientService = new MR_ClientService();
+       private MR_ClusterService clusters = new MR_ClusterService();
+       private DcaeLocationService locations = new DcaeLocationService();
        private MirrorMakerService      bridge = new MirrorMakerService();
+       
+       private static String centralCname;
+       private static boolean createTopicRoles;
+       private boolean strictGraph = true;
+
 
        public TopicService(){
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
+               centralCname = p.getProperty("MR.CentralCname");
+               createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true"));
+               String unit_test = p.getProperty( "UnitTest", "No" );
+               if ( unit_test.equals( "Yes" ) ) {
+                       strictGraph = false;
+               }
+               logger.info( "TopicService properties: CentralCname=" + centralCname + 
+                               "   defaultGlobarlMrHost=" + defaultGlobalMrHost +
+                               " createTopicRoles=" + createTopicRoles );
        }
        
        public Map<String, Topic> getTopics() {                 
@@ -70,9 +92,18 @@ public class TopicService extends BaseLoggingClass {
        }
                
        public List<Topic> getAllTopics() {
-               ArrayList<Topic> topics = new ArrayList<Topic>(mr_topics.values());
-               for( Topic topic: topics ) {
-                       topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+               return getAllTopics( true );
+       }
+       public List<Topic> getAllTopicsWithoutClients() {
+               return getAllTopics(false);
+       }
+       
+       private List<Topic> getAllTopics( Boolean withClients ) {
+               ArrayList<Topic> topics = new ArrayList<>(mr_topics.values());
+               if ( withClients ) {
+                       for( Topic topic: topics ) {
+                               topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+                       }
                }
                return topics;
        }
@@ -91,15 +122,117 @@ public class TopicService extends BaseLoggingClass {
                apiError.setCode(Status.OK.getStatusCode());
                return t;
        }
+       
+       private void aafTopicSetup(Topic topic, ApiError err ) {
+               
+               String nsr = dmaapSvc.getDmaap().getTopicNsRoot();
+               if ( nsr == null ) {
+                       err.setCode(500);
+                       err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)"  );
+                       err.setFields("topicNsRoot");
+                       return;
+               }
+
+               // establish AAF Connection using TopicMgr identity
+               AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
+               
+               AafRole pubRole = null;
+               AafRole subRole = null;
+               
+               // creating Topic Roles was not an original feature.
+               // For backwards compatibility, only do this if the feature is enabled.
+               // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns)
+               // then we likely don't have permission to create sub-ns and Roles so don't try.
+               if ( createTopicRoles && topic.getFqtn().startsWith(nsr)) {
+                       // create AAF namespace for this topic
+                       AafNamespace ns = new AafNamespace( topic.getFqtn(), aaf.getIdentity());
+                       {
+                               int rc = aaf.addNamespace( ns );
+                               if ( rc != 201 && rc != 409 ) {
+                                       err.setCode(500);
+                                       err.setMessage("Unexpected response from AAF:" + rc );
+                                       err.setFields("namespace:" + topic.getFqtn() + " identity="+ aaf.getIdentity());
+                                       return;
+                               }
+                       }
+                       
+                       // create AAF Roles for MR clients of this topic
+                       String rn = "publisher";
+                       pubRole = new AafRole( topic.getFqtn(), rn );
+                       int rc = aaf.addRole( pubRole );
+                       if ( rc != 201 && rc != 409 ) {
+                               err.setCode(500);
+                               err.setMessage("Unexpected response from AAF:" + rc );
+                               err.setFields("topic:" + topic.getFqtn() + " role="+ rn);
+                               return;
+                       }
+                       topic.setPublisherRole( pubRole.getFullyQualifiedRole() );
+                       
+                       rn = "subscriber";
+                       subRole = new AafRole( topic.getFqtn(), rn );
+                       rc = aaf.addRole( subRole );
+                       if ( rc != 201 && rc != 409 ) {
+                               err.setCode(500);
+                               err.setMessage("Unexpected response from AAF:" + rc );
+                               err.setFields("topic:" + topic.getFqtn() + " role="+ rn);
+                               return;
+                       }
+                       topic.setSubscriberRole( subRole.getFullyQualifiedRole() );
+               }
+       
+               // create AAF perms checked by MR
+               String instance = ":topic." + topic.getFqtn();
+               String[] actions = { "pub", "sub", "view" };
+               String t = dmaapSvc.getTopicPerm();
+               for ( String action : actions ){
+                       DmaapPerm perm = new DmaapPerm( t, instance, action );
+                       int rc = aaf.addPerm( perm );
+                       if ( rc != 201 && rc != 409 ) {
+                               err.setCode(500);
+                               err.setMessage("Unexpected response from AAF:" + rc );
+                               err.setFields("t="+t + " instance="+ instance + " action="+ action);
+                               return;
+                       }
+                       if ( createTopicRoles ) {
+                               // Grant perms to our default Roles
+                               if ( action.equals( "pub") || action.equals( "view") ) {
+                                       DmaapGrant g = new DmaapGrant( perm, pubRole.getFullyQualifiedRole() );
+                                       rc = aaf.addGrant( g );
+                                       if ( rc != 201 && rc != 409 ) {
+                                               err.setCode(rc);
+                                               err.setMessage( "Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole() );
+                                               logger.warn( err.getMessage());
+                                               return;
+                                       } 
+                               }
+                               if ( action.equals( "sub") || action.equals( "view") ) {
+                                       DmaapGrant g = new DmaapGrant( perm, subRole.getFullyQualifiedRole() );
+                                       rc = aaf.addGrant( g );
+                                       if ( rc != 201 && rc != 409 ) {
+                                               err.setCode(rc);
+                                               err.setMessage( "Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole() );
+                                               logger.warn( err.getMessage());
+                                               return;
+                                       } 
+                               }
+                       }
+
+               }
+       }
 
-       public Topic addTopic( Topic topic, ApiError err ) {
+       public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) {
                logger.info( "Entry: addTopic");
                logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() );
                String nFqtn =  topic.genFqtn();
                logger.info( "FQTN=" + nFqtn );
-               if ( getTopic( nFqtn, err ) != null ) {
+               Topic pTopic = getTopic( nFqtn, err );
+               if ( pTopic != null ) {
                        String t = "topic already exists: " + nFqtn;
                        logger.info( t );
+                       if (  useExisting ) {
+                               err.setCode(Status.OK.getStatusCode());
+                               return pTopic;
+                       }
                        err.setMessage( t );
                        err.setFields( "fqtn");
                        err.setCode(Status.CONFLICT.getStatusCode());
@@ -108,19 +241,23 @@ public class TopicService extends BaseLoggingClass {
                err.reset();  // err filled with NOT_FOUND is expected case, but don't want to litter...
 
                topic.setFqtn( nFqtn );
+               
+               aafTopicSetup( topic, err );
+               if ( err.getCode() >= 400 ) {
+                       return null;
+               }       
 
-               AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
-               String t = dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + ".mr.topic";
-               String instance = ":topic." + topic.getFqtn();
-
-               String[] actions = { "pub", "sub", "view" };
-               for ( String action : actions ){
-                       DmaapPerm perm = new DmaapPerm( t, instance, action );
-                       int rc = aaf.addPerm( perm );
-                       if ( rc != 201 && rc != 409 ) {
+               if ( topic.getReplicationCase().involvesGlobal() ) {
+                       if ( topic.getGlobalMrURL() == null ) {
+                               topic.setGlobalMrURL(defaultGlobalMrHost);
+                       }
+                       if ( ! Fqdn.isValid( topic.getGlobalMrURL())) {
+                               logger.error( "GlobalMR FQDN not valid: " + topic.getGlobalMrURL());
+                               topic.setStatus( DmaapObject_Status.INVALID);
                                err.setCode(500);
-                               err.setMessage("Unexpected response from AAF:" + rc );
-                               err.setFields("t="+t + " instance="+ instance + " action="+ action);
+                               err.setMessage("Value is not a valid FQDN:" +  topic.getGlobalMrURL() );
+                               err.setFields("globalMrURL");
+       
                                return null;
                        }
                }
@@ -137,6 +274,7 @@ public class TopicService extends BaseLoggingClass {
                                logger.info( "c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL());
                                MR_Client nc = new MR_Client( c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction());
                                nc.setFqtn(topic.getFqtn());
+                               nc.setClientIdentity( c.getClientIdentity());
                                logger.info( "nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL());
                                clients2.add( clientService.addMr_Client(nc, topic, err));
                                if ( ! err.is2xx()) {
@@ -146,17 +284,16 @@ public class TopicService extends BaseLoggingClass {
 
                        topic.setClients(clients2);
                }
-               if ( topic.getReplicationCase().involvesGlobal() ) {
-                       if ( topic.getGlobalMrURL() == null ) {
-                               topic.setGlobalMrURL(defaultGlobalMrHost);
-                       }
-               }
+
                Topic ntopic = checkForBridge( topic, err );
                if ( ntopic == null ) {
                        topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
+                       if ( ! err.is2xx()) {
+                               return null;
+                       }
                }
 
+               
                mr_topics.put( nFqtn, ntopic );
 
                err.setCode(Status.OK.getStatusCode());
@@ -165,16 +302,24 @@ public class TopicService extends BaseLoggingClass {
        
                
        public Topic updateTopic( Topic topic, ApiError err ) {
-               logger.info( "Entry: updateTopic");
+               logger.info( "updateTopic: entry");
+               logger.info( "updateTopic: topic=" + topic);
+               logger.info( "updateTopic: fqtn=" + topic.getFqtn() );
                if ( topic.getFqtn().isEmpty()) {
                        return null;
                }
+               logger.info( "updateTopic: call checkForBridge");
                Topic ntopic = checkForBridge( topic, err );
                if ( ntopic == null ) {
                        topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
+                       if ( ! err.is2xx() ) {
+                               return null;
+                       }
+               }
+               if(ntopic != null) {
+                       logger.info( "updateTopic: call put");
+                       mr_topics.put( ntopic.getFqtn(), ntopic );
                }
-               mr_topics.put( ntopic.getFqtn(), ntopic );
                err.setCode(Status.OK.getStatusCode());
                return ntopic;
        }
@@ -206,10 +351,11 @@ public class TopicService extends BaseLoggingClass {
                String mmAgentRole = p.getProperty("MM.AgentRole");
                String[] Roles = { mmProvRole, mmAgentRole };
                String[] actions = { "view", "pub", "sub" };
-               Topic bridgeAdminTopic = new Topic();
+               Topic bridgeAdminTopic = new Topic().init();
                bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() );
                bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning");
                bridgeAdminTopic.setOwner( "DBC" );
+               
                ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
                for( String role: Roles ) {
                        MR_Client client = new MR_Client();
@@ -222,7 +368,7 @@ public class TopicService extends BaseLoggingClass {
                
                TopicService ts = new TopicService();
                ApiError err = new ApiError();
-               ts.addTopic(bridgeAdminTopic, err);
+               ts.addTopic(bridgeAdminTopic, err, true);
                
                if ( err.is2xx() || err.getCode() == 409 ){
                        err.setCode(200);
@@ -235,79 +381,167 @@ public class TopicService extends BaseLoggingClass {
        
        
        public Topic checkForBridge( Topic topic, ApiError err ) {
-               
+               logger.info( "checkForBridge: entry");
+               logger.info( "fqtn=" + topic.getFqtn() + "replicatonType=" + topic.getReplicationCase());
                if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
                        topic.setStatus( DmaapObject_Status.VALID);
                        return topic;   
                }
                
-               boolean anythingWrong = false;                          
-               String centralFqdn = new String();
-               Graph graph = new Graph( topic.getClients(), true );
+               boolean anythingWrong = false;
+               
+               Set<String> groups = clusters.getGroups();
+               for ( String g : groups ) {
+                       logger.info( "buildBridge for " + topic.getFqtn() + " on group" + g);
+                       anythingWrong |= buildBridge( topic, err, g );
+               }
+               if ( anythingWrong ) {
+                       topic.setStatus( DmaapObject_Status.INVALID);
+                       if ( ! err.is2xx() ) {
+                               return null;
+                       }       
+               } else {
+                       topic.setStatus( DmaapObject_Status.VALID);
+               }
+               return topic;
+       }
+               
+       private boolean buildBridge( Topic topic, ApiError err, String group ) {
+               logger.info( "buildBridge: entry");
+               boolean anythingWrong = false;
+               Graph graph;
+               logger.info( "buildBridge: strictGraph=" + strictGraph );
+               if ( group == null || group.isEmpty() ) {
+                       graph = new Graph( topic.getClients(), strictGraph );
+               } else {
+                       graph = new Graph( topic.getClients(), strictGraph, group );
+               }
+               logger.info( "buildBridge: graph=" + graph );
+               MR_Cluster groupCentralCluster = null;
+               
                
-               if ( graph.isHasCentral() ) {
-                       DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
-                       centralFqdn = p.getProperty("MR.CentralCname");
-                       logger.info( "CentralCname=" + centralFqdn );
+               if ( graph.isEmpty() ) {
+                       logger.info( "buildBridge: graph is empty.  return false" );
+                       return false;
+               } else if ( group == null &&  topic.getReplicationCase().involvesFQDN() ) {
+                       logger.info( "buildBridge: group is null and replicationCaseInvolvesFQDN. return false" );
+                       return false;
+               } else if ( ! graph.hasCentral() ) {
+                       logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
+                       return true;
                } else {
-                       logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients");
+                       groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
                }
-               Collection<String> locations = graph.getKeys();
-               for( String loc : locations ) {
+               Collection<String> clientLocations = graph.getKeys();
+               for( String loc : clientLocations ) {
                        logger.info( "loc=" + loc );
-                       MR_Cluster cluster = clusters.get(loc);
+                       DcaeLocation location = locations.getDcaeLocation(loc);
+                       MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
                        logger.info( "cluster=" + cluster );
 
                        
                                
                        String source = null;
                        String target = null;
+                       
                        /*
-                        * all replication rules have 1 bridge...
+                        * Provision Edge to Central bridges...
                         */
-                       switch( topic.getReplicationCase() ) {
-                       case REPLICATION_EDGE_TO_CENTRAL:
-                       case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                       if ( ! location.isCentral()  && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+                               switch( topic.getReplicationCase() ) {
+                               case REPLICATION_EDGE_TO_CENTRAL:
+                               case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
+                                       source = cluster.getFqdn();
+                                       target = centralCname;
                                        break;
-                               }
-                               source = cluster.getFqdn();
-                               target = centralFqdn;
-                               break;
-                       case REPLICATION_CENTRAL_TO_EDGE:
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = cluster.getFqdn();
-                               break;
-                       case REPLICATION_CENTRAL_TO_GLOBAL:
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                               case REPLICATION_CENTRAL_TO_EDGE:
+                               case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
+                                       source = centralCname;
+                                       target = cluster.getFqdn();
+                                       break;
+                               case REPLICATION_CENTRAL_TO_GLOBAL:
+                               case REPLICATION_GLOBAL_TO_CENTRAL:
+                               case REPLICATION_FQDN_TO_GLOBAL:
+                               case REPLICATION_GLOBAL_TO_FQDN:
+                                       break;
+
+                               case REPLICATION_EDGE_TO_FQDN:
+                               case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2C portion only
+                                       source = cluster.getFqdn();
+                                       target = groupCentralCluster.getFqdn();
+                                       break;
+                               case REPLICATION_FQDN_TO_EDGE:
+                               case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for F2E portion only
+                                       source = groupCentralCluster.getFqdn();
+                                       target = cluster.getFqdn();
+                                       break;
+
+                               default:
+                                       logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                                       anythingWrong = true;
+                                       err.setCode(400);
+                                       err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+                                                       + topic.getReplicationCase() );
+                                       err.setMessage("Unexpected value for ReplicationType");
                                        continue;
                                }
-                               source = centralFqdn;
-                               target = topic.getGlobalMrURL();
-                               break;
-                       case REPLICATION_GLOBAL_TO_CENTRAL:
-                       case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+
+                       } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+                               /*
+                                * Provision Central to Global bridges
+                                */
+                               switch( topic.getReplicationCase() ) {
+
+                               case REPLICATION_CENTRAL_TO_GLOBAL:
+                               case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
+                                       source = centralCname;
+                                       target = topic.getGlobalMrURL();
+                                       break;
+                               case REPLICATION_GLOBAL_TO_CENTRAL:
+                               case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
+                                       source = topic.getGlobalMrURL();
+                                       target = centralCname;
+                                       break;
+
+                               case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2F portion only
+                                       source = groupCentralCluster.getFqdn();
+                                       target = topic.getGlobalMrURL();
+                                       break;
+
+                               case REPLICATION_FQDN_TO_GLOBAL:
+                                       source = groupCentralCluster.getFqdn();
+                                       target = topic.getGlobalMrURL();
+                                       break;
+                                       
+                               case REPLICATION_GLOBAL_TO_FQDN:
+                               case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for G2F portion only
+                                       source = topic.getGlobalMrURL();
+                                       target = groupCentralCluster.getFqdn();
+                                       break;
+
+                               case REPLICATION_FQDN_TO_EDGE:
+                               case REPLICATION_EDGE_TO_FQDN:
+                               case REPLICATION_EDGE_TO_CENTRAL:
+                               case REPLICATION_CENTRAL_TO_EDGE:
+                                       break;
+                               default:
+                                       logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                                       anythingWrong = true;
+                                       err.setCode(400);
+                                       err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+                                                       + topic.getReplicationCase() );
+                                       err.setMessage("Unexpected value for ReplicationType");
                                        continue;
-                               }
-                               source = topic.getGlobalMrURL();
-                               target = centralFqdn;
-                               break;
-                       default:
-                               logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                               }                               
+                       } else {
+                               logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
                                anythingWrong = true;
                                continue;
                        }
                        if ( source != null && target != null ) {
                                try { 
                                        logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getMirrorMaker( source, target);
-                                       if ( mm == null ) {
-                                               mm = new MirrorMaker(source, target);
-                                       }
+                                       MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
                                        mm.addTopic(topic.getFqtn());
                                        bridge.updateMirrorMaker(mm);
                                } catch ( Exception ex ) {
@@ -317,67 +551,15 @@ public class TopicService extends BaseLoggingClass {
                                        anythingWrong = true;
                                        break;
                                }
-                       }
-                       
-                       
-                       /*
-                        * some replication rules have a 2nd bridge!
-                        */
-                       source = target = null;
-                       switch( topic.getReplicationCase() ) {
-                       case REPLICATION_EDGE_TO_CENTRAL:
-                       case REPLICATION_CENTRAL_TO_EDGE:
-                       case REPLICATION_CENTRAL_TO_GLOBAL:
-                       case REPLICATION_GLOBAL_TO_CENTRAL:
-                               continue;
-                       case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for C2G portion only
-                               if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = topic.getGlobalMrURL();
-                               break;
-       
-                       case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = cluster.getFqdn();
-                               break;
-                       default:
-                               logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
-                               anythingWrong = true;
-                               break;
-                       }
-                       if ( source != null && target != null ) {
-                               try { 
-                                       logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getMirrorMaker( source, target);
-                                       if ( mm == null ) {
-                                               mm = new MirrorMaker(source, target);
-                                       }
-                                       mm.addTopic(topic.getFqtn());
-                                       bridge.updateMirrorMaker(mm);
-                               } catch ( Exception ex ) {
-                                       err.setCode(500);
-                                       err.setFields( "mirror_maker.topic");
-                                       err.setMessage("Unexpected condition: " + ex );
-                                       anythingWrong = true;
-                                       break;
-                               }       
-                       }
+                       }                       
+
                        
                }
-               if ( anythingWrong ) {
-                       topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
-               }
-       
-               topic.setStatus( DmaapObject_Status.VALID);
-               return topic;
+               return  anythingWrong;
+
        }
        
+       
        /*
         * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
         * This was determined automatically based on presence of edge publishers and central subscribers.
@@ -391,7 +573,7 @@ public class TopicService extends BaseLoggingClass {
                        Graph graph = new Graph( topic.getClients(), false );
                        
                        String centralFqdn = new String();
-                       if ( graph.isHasCentral() ) {
+                       if ( graph.hasCentral() ) {
                                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                                centralFqdn = p.getProperty("MR.CentralCname");
                        }
@@ -399,12 +581,12 @@ public class TopicService extends BaseLoggingClass {
                        Collection<String> locations = graph.getKeys();
                        for( String loc : locations ) {
                                logger.info( "loc=" + loc );
-                               MR_Cluster cluster = clusters.get(loc);
+                               MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
                                if ( cluster == null ) {
                                        logger.info( "No MR cluster for location " + loc );
                                        continue;
                                }
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                               if ( graph.hasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
                                        logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
                                        return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;