X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fdbcapi.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdbcapi%2Fservice%2FTopicService.java;h=3943419d6ede683f779228387f9531767b9683a3;hp=244fe37be916bc3961c11c6bf6b11e80b53e7f3c;hb=1611944a45491e2b8f00606b0aac2cdb0de8dde8;hpb=a05efb7b7b3cfc77f5e3fda11e8434834829f56a diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index 244fe37..3943419 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -25,24 +25,27 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import javax.ws.rs.core.Response.Status; import org.onap.dmaap.dbcapi.aaf.AafService; -import org.onap.dmaap.dbcapi.aaf.DmaapPerm; import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType; -import org.onap.dmaap.dbcapi.aaf.database.DatabaseClass; +import org.onap.dmaap.dbcapi.aaf.DmaapPerm; +import org.onap.dmaap.dbcapi.database.DatabaseClass; import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum; import org.onap.dmaap.dbcapi.model.ApiError; +import org.onap.dmaap.dbcapi.model.DcaeLocation; import org.onap.dmaap.dbcapi.model.Dmaap; +import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; import org.onap.dmaap.dbcapi.model.MR_Client; import org.onap.dmaap.dbcapi.model.MR_Cluster; import org.onap.dmaap.dbcapi.model.MirrorMaker; import org.onap.dmaap.dbcapi.model.ReplicationType; import org.onap.dmaap.dbcapi.model.Topic; -import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; import org.onap.dmaap.dbcapi.util.DmaapConfig; +import org.onap.dmaap.dbcapi.util.Fqdn; import org.onap.dmaap.dbcapi.util.Graph; public class TopicService extends BaseLoggingClass { @@ -53,16 +56,21 @@ public class TopicService extends BaseLoggingClass { private static String defaultGlobalMrHost; private Map mr_topics = DatabaseClass.getTopics(); - private Map clusters = DatabaseClass.getMr_clusters(); private static DmaapService dmaapSvc = new DmaapService(); private static Dmaap dmaap = new DmaapService().getDmaap(); private MR_ClientService clientService = new MR_ClientService(); + private MR_ClusterService clusters = new MR_ClusterService(); + private DcaeLocationService locations = new DcaeLocationService(); private MirrorMakerService bridge = new MirrorMakerService(); + + private static String centralCname; public TopicService(){ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); + centralCname = p.getProperty("MR.CentralCname"); + logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost ); } public Map getTopics() { @@ -94,7 +102,9 @@ public class TopicService extends BaseLoggingClass { public Topic addTopic( Topic topic, ApiError err ) { logger.info( "Entry: addTopic"); - String nFqtn = Topic.genFqtn( topic.getTopicName() ); + logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() ); + String nFqtn = topic.genFqtn(); + logger.info( "FQTN=" + nFqtn ); if ( getTopic( nFqtn, err ) != null ) { String t = "topic already exists: " + nFqtn; logger.info( t ); @@ -104,7 +114,7 @@ public class TopicService extends BaseLoggingClass { return null; } err.reset(); // err filled with NOT_FOUND is expected case, but don't want to litter... - logger.info( "fqtn: " + nFqtn ); + topic.setFqtn( nFqtn ); AafService aaf = new AafService(ServiceType.AAF_TopicMgr); @@ -122,6 +132,20 @@ public class TopicService extends BaseLoggingClass { return null; } } + if ( topic.getReplicationCase().involvesGlobal() ) { + if ( topic.getGlobalMrURL() == null ) { + topic.setGlobalMrURL(defaultGlobalMrHost); + } + if ( ! Fqdn.isValid( topic.getGlobalMrURL())) { + logger.error( "GlobalMR FQDN not valid: " + topic.getGlobalMrURL()); + topic.setStatus( DmaapObject_Status.INVALID); + err.setCode(500); + err.setMessage("Value is not a valid FQDN:" + topic.getGlobalMrURL() ); + err.setFields("globalMrURL"); + + return null; + } + } if ( topic.getNumClients() > 0 ) { @@ -144,17 +168,16 @@ public class TopicService extends BaseLoggingClass { topic.setClients(clients2); } - if ( topic.getReplicationCase().involvesGlobal() ) { - if ( topic.getGlobalMrURL() == null ) { - topic.setGlobalMrURL(defaultGlobalMrHost); - } - } + Topic ntopic = checkForBridge( topic, err ); if ( ntopic == null ) { topic.setStatus( DmaapObject_Status.INVALID); - return null; + if ( ! err.is2xx()) { + return null; + } } + mr_topics.put( nFqtn, ntopic ); err.setCode(Status.OK.getStatusCode()); @@ -170,7 +193,9 @@ public class TopicService extends BaseLoggingClass { Topic ntopic = checkForBridge( topic, err ); if ( ntopic == null ) { topic.setStatus( DmaapObject_Status.INVALID); - return null; + if ( ! err.is2xx() ) { + return null; + } } mr_topics.put( ntopic.getFqtn(), ntopic ); err.setCode(Status.OK.getStatusCode()); @@ -239,63 +264,147 @@ public class TopicService extends BaseLoggingClass { return topic; } - boolean anythingWrong = false; - String centralFqdn = new String(); - Graph graph = new Graph( topic.getClients(), true ); + boolean anythingWrong = false; + + Set groups = clusters.getGroups(); + for ( String g : groups ) { + anythingWrong |= buildBridge( topic, err, g ); + } + if ( anythingWrong ) { + topic.setStatus( DmaapObject_Status.INVALID); + if ( ! err.is2xx() ) { + return null; + } + } else { + topic.setStatus( DmaapObject_Status.VALID); + } + return topic; + } + + private boolean buildBridge( Topic topic, ApiError err, String group ) { + + boolean anythingWrong = false; + Graph graph; + if ( group == null || group.isEmpty() ) { + graph = new Graph( topic.getClients(), true ); + } else { + graph = new Graph( topic.getClients(), true, group ); + } + MR_Cluster groupCentralCluster = null; - if ( graph.isHasCentral() ) { - DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - centralFqdn = p.getProperty("MR.CentralCname"); - logger.info( "CentralCname=" + centralFqdn ); + if ( graph.isEmpty() ) { + return false; + } else if ( group == null && topic.getReplicationCase().involvesFQDN() ) { + return false; + } else if ( ! graph.hasCentral() ) { + logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients"); + return true; } else { - logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients"); + groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc()); } - Collection locations = graph.getKeys(); - for( String loc : locations ) { + Collection clientLocations = graph.getKeys(); + for( String loc : clientLocations ) { logger.info( "loc=" + loc ); - MR_Cluster cluster = clusters.get(loc); + DcaeLocation location = locations.getDcaeLocation(loc); + MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); logger.info( "cluster=" + cluster ); String source = null; String target = null; + /* - * all replication rules have 1 bridge... + * Provision Edge to Central bridges... */ - switch( topic.getReplicationCase() ) { - case REPLICATION_EDGE_TO_CENTRAL: - case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only - if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + if ( ! location.isCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) { + switch( topic.getReplicationCase() ) { + case REPLICATION_EDGE_TO_CENTRAL: + case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only + source = cluster.getFqdn(); + target = centralCname; break; - } - source = cluster.getFqdn(); - target = centralFqdn; - break; - case REPLICATION_CENTRAL_TO_EDGE: - if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { - continue; - } - source = centralFqdn; - target = cluster.getFqdn(); - break; - case REPLICATION_CENTRAL_TO_GLOBAL: - if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + case REPLICATION_CENTRAL_TO_EDGE: + case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only + source = centralCname; + target = cluster.getFqdn(); + break; + case REPLICATION_CENTRAL_TO_GLOBAL: + case REPLICATION_GLOBAL_TO_CENTRAL: + case REPLICATION_FQDN_TO_GLOBAL: + case REPLICATION_GLOBAL_TO_FQDN: + break; + + case REPLICATION_EDGE_TO_FQDN: + case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2C portion only + source = cluster.getFqdn(); + target = groupCentralCluster.getFqdn(); + break; + case REPLICATION_FQDN_TO_EDGE: + case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for F2E portion only + source = groupCentralCluster.getFqdn(); + target = cluster.getFqdn(); + break; + + default: + logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); + anythingWrong = true; + err.setCode(400); + err.setFields("topic=" + topic.genFqtn() + " replicationCase=" + + topic.getReplicationCase() ); + err.setMessage("Unexpected value for ReplicationType"); continue; } - source = centralFqdn; - target = topic.getGlobalMrURL(); - break; - case REPLICATION_GLOBAL_TO_CENTRAL: - case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only - if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + + } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) { + /* + * Provision Central to Global bridges + */ + switch( topic.getReplicationCase() ) { + + case REPLICATION_CENTRAL_TO_GLOBAL: + case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: + source = centralCname; + target = topic.getGlobalMrURL(); + break; + case REPLICATION_GLOBAL_TO_CENTRAL: + case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only + source = topic.getGlobalMrURL(); + target = centralCname; + break; + + case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2F portion only + source = groupCentralCluster.getFqdn(); + target = topic.getGlobalMrURL(); + break; + + case REPLICATION_FQDN_TO_GLOBAL: + source = groupCentralCluster.getFqdn(); + target = topic.getGlobalMrURL(); + break; + + case REPLICATION_GLOBAL_TO_FQDN: + case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for G2F portion only + source = topic.getGlobalMrURL(); + target = groupCentralCluster.getFqdn(); + break; + + case REPLICATION_FQDN_TO_EDGE: + case REPLICATION_EDGE_TO_FQDN: + case REPLICATION_EDGE_TO_CENTRAL: + case REPLICATION_CENTRAL_TO_EDGE: + break; + default: + logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); + anythingWrong = true; + err.setCode(400); + err.setFields("topic=" + topic.genFqtn() + " replicationCase=" + + topic.getReplicationCase() ); + err.setMessage("Unexpected value for ReplicationType"); continue; - } - source = topic.getGlobalMrURL(); - target = centralFqdn; - break; - default: - logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); + } + } else { + logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done"); anythingWrong = true; continue; } @@ -315,65 +424,12 @@ public class TopicService extends BaseLoggingClass { anythingWrong = true; break; } - } - - - /* - * some replication rules have a 2nd bridge! - */ - source = target = null; - switch( topic.getReplicationCase() ) { - case REPLICATION_EDGE_TO_CENTRAL: - case REPLICATION_CENTRAL_TO_EDGE: - case REPLICATION_CENTRAL_TO_GLOBAL: - case REPLICATION_GLOBAL_TO_CENTRAL: - continue; - case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for C2G portion only - if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { - continue; - } - source = centralFqdn; - target = topic.getGlobalMrURL(); - break; - - case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only - if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { - continue; - } - source = centralFqdn; - target = cluster.getFqdn(); - break; - default: - logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); - anythingWrong = true; - break; - } - if ( source != null && target != null ) { - try { - logger.info( "Create a MM from " + source + " to " + target ); - MirrorMaker mm = bridge.getMirrorMaker( source, target); - if ( mm == null ) { - mm = new MirrorMaker(source, target); - } - mm.addTopic(topic.getFqtn()); - bridge.updateMirrorMaker(mm); - } catch ( Exception ex ) { - err.setCode(500); - err.setFields( "mirror_maker.topic"); - err.setMessage("Unexpected condition: " + ex ); - anythingWrong = true; - break; - } - } + } + } - if ( anythingWrong ) { - topic.setStatus( DmaapObject_Status.INVALID); - return null; - } - - topic.setStatus( DmaapObject_Status.VALID); - return topic; + return anythingWrong; + } /* @@ -389,7 +445,7 @@ public class TopicService extends BaseLoggingClass { Graph graph = new Graph( topic.getClients(), false ); String centralFqdn = new String(); - if ( graph.isHasCentral() ) { + if ( graph.hasCentral() ) { DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); centralFqdn = p.getProperty("MR.CentralCname"); } @@ -397,12 +453,12 @@ public class TopicService extends BaseLoggingClass { Collection locations = graph.getKeys(); for( String loc : locations ) { logger.info( "loc=" + loc ); - MR_Cluster cluster = clusters.get(loc); + MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); if ( cluster == null ) { logger.info( "No MR cluster for location " + loc ); continue; } - if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + if ( graph.hasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn ); return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;