X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdbcapi%2Fservice%2FTopicService.java;h=08e58bec99da81a05ad6efed3ab327a8f60aa4a4;hb=1ef01702fd6e5b1742cb67a6614b97948e263e5f;hp=eeffa5b69d3eddd2066fd57767a1c2b3e80bf570;hpb=b578ddffd8ce8a7eceebd7b4df6ab4a6a9f803ec;p=dmaap%2Fdbcapi.git diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index eeffa5b..08e58be 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -42,7 +42,6 @@ import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum; import org.onap.dmaap.dbcapi.model.ApiError; import org.onap.dmaap.dbcapi.model.DcaeLocation; -import org.onap.dmaap.dbcapi.model.Dmaap; import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; import org.onap.dmaap.dbcapi.model.MR_Client; import org.onap.dmaap.dbcapi.model.MR_Cluster; @@ -60,31 +59,48 @@ public class TopicService extends BaseLoggingClass { // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122 private static String defaultGlobalMrHost; - private Map mr_topics = DatabaseClass.getTopics(); + private Map mr_topics; private static DmaapService dmaapSvc = new DmaapService(); - private MR_ClientService clientService = new MR_ClientService(); - private MR_ClusterService clusters = new MR_ClusterService(); - private DcaeLocationService locations = new DcaeLocationService(); - private MirrorMakerService bridge = new MirrorMakerService(); + private MR_ClientService clientService; + private MR_ClusterService clusters; + private DcaeLocationService locations; + private MirrorMakerService bridge; private static String centralCname; private static boolean createTopicRoles; + private boolean strictGraph = true; + private boolean mmPerMR; public TopicService(){ - DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); + this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig)DmaapConfig.getConfig(), + new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService()); + + } + + TopicService(Map mr_topics, MR_ClientService clientService, DmaapConfig p, + MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService bridge) { + this.mr_topics = mr_topics; + this.clientService = clientService; defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); centralCname = p.getProperty("MR.CentralCname"); createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true")); - - - logger.info( "TopicService properties: CentralCname=" + centralCname + + String unit_test = p.getProperty( "UnitTest", "No" ); + if ( "Yes".equals(unit_test)) { + strictGraph = false; + } + mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); + logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost + - " createTopicRoles=" + createTopicRoles ); + " createTopicRoles=" + createTopicRoles + + " mmPerMR=" + mmPerMR ); + this.clusters = clusters; + this.locations = locations; + this.bridge = bridge; } - - public Map getTopics() { + + public Map getTopics() { return mr_topics; } @@ -99,7 +115,7 @@ public class TopicService extends BaseLoggingClass { ArrayList topics = new ArrayList<>(mr_topics.values()); if ( withClients ) { for( Topic topic: topics ) { - topic.setClients( clientService.getAllMrClients(topic.getFqtn())); + topic.setClients(clientService.getAllMrClients(topic.getFqtn())); } } return topics; @@ -299,10 +315,13 @@ public class TopicService extends BaseLoggingClass { public Topic updateTopic( Topic topic, ApiError err ) { - logger.info( "Entry: updateTopic"); + logger.info( "updateTopic: entry"); + logger.info( "updateTopic: topic=" + topic); + logger.info( "updateTopic: fqtn=" + topic.getFqtn() ); if ( topic.getFqtn().isEmpty()) { return null; } + logger.info( "updateTopic: call checkForBridge"); Topic ntopic = checkForBridge( topic, err ); if ( ntopic == null ) { topic.setStatus( DmaapObject_Status.INVALID); @@ -311,6 +330,7 @@ public class TopicService extends BaseLoggingClass { } } if(ntopic != null) { + logger.info( "updateTopic: call put"); mr_topics.put( ntopic.getFqtn(), ntopic ); } err.setCode(Status.OK.getStatusCode()); @@ -374,7 +394,8 @@ public class TopicService extends BaseLoggingClass { public Topic checkForBridge( Topic topic, ApiError err ) { - + logger.info( "checkForBridge: entry"); + logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase()); if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) { topic.setStatus( DmaapObject_Status.VALID); return topic; @@ -384,6 +405,7 @@ public class TopicService extends BaseLoggingClass { Set groups = clusters.getGroups(); for ( String g : groups ) { + logger.info( "buildBridge for " + topic.getFqtn() + " on group" + g); anythingWrong |= buildBridge( topic, err, g ); } if ( anythingWrong ) { @@ -398,19 +420,24 @@ public class TopicService extends BaseLoggingClass { } private boolean buildBridge( Topic topic, ApiError err, String group ) { - + logger.info( "buildBridge: entry"); boolean anythingWrong = false; Graph graph; + logger.info( "buildBridge: strictGraph=" + strictGraph ); if ( group == null || group.isEmpty() ) { - graph = new Graph( topic.getClients(), true ); + graph = new Graph( topic.getClients(), strictGraph ); } else { - graph = new Graph( topic.getClients(), true, group ); + graph = new Graph( topic.getClients(), strictGraph, group ); } + logger.info( "buildBridge: graph=" + graph ); MR_Cluster groupCentralCluster = null; + if ( graph.isEmpty() ) { + logger.info( "buildBridge: graph is empty. return false" ); return false; } else if ( group == null && topic.getReplicationCase().involvesFQDN() ) { + logger.info( "buildBridge: group is null and replicationCaseInvolvesFQDN. return false" ); return false; } else if ( ! graph.hasCentral() ) { logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients"); @@ -423,7 +450,8 @@ public class TopicService extends BaseLoggingClass { logger.info( "loc=" + loc ); DcaeLocation location = locations.getDcaeLocation(loc); MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); - logger.info( "cluster=" + cluster ); + logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() ); + logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() ); @@ -438,11 +466,12 @@ public class TopicService extends BaseLoggingClass { case REPLICATION_EDGE_TO_CENTRAL: case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only source = cluster.getFqdn(); - target = centralCname; + target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname; + logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target ); break; case REPLICATION_CENTRAL_TO_EDGE: case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only - source = centralCname; + source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname; target = cluster.getFqdn(); break; case REPLICATION_CENTRAL_TO_GLOBAL: @@ -527,7 +556,7 @@ public class TopicService extends BaseLoggingClass { if ( source != null && target != null ) { try { logger.info( "Create a MM from " + source + " to " + target ); - MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn()); + MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn()); mm.addTopic(topic.getFqtn()); bridge.updateMirrorMaker(mm); } catch ( Exception ex ) {