X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdbcapi%2Fservice%2FTopicService.java;h=08e58bec99da81a05ad6efed3ab327a8f60aa4a4;hb=1ef01702fd6e5b1742cb67a6614b97948e263e5f;hp=c5937f4bff1bafb6f7e28fad0310332a739a1e53;hpb=cee37b37655ed5a52078e37c7da51c887e2d73e5;p=dmaap%2Fdbcapi.git diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index c5937f4..08e58be 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -59,34 +59,48 @@ public class TopicService extends BaseLoggingClass { // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122 private static String defaultGlobalMrHost; - private Map mr_topics = DatabaseClass.getTopics(); + private Map mr_topics; private static DmaapService dmaapSvc = new DmaapService(); - private MR_ClientService clientService = new MR_ClientService(); - private MR_ClusterService clusters = new MR_ClusterService(); - private DcaeLocationService locations = new DcaeLocationService(); - private MirrorMakerService bridge = new MirrorMakerService(); + private MR_ClientService clientService; + private MR_ClusterService clusters; + private DcaeLocationService locations; + private MirrorMakerService bridge; private static String centralCname; private static boolean createTopicRoles; private boolean strictGraph = true; + private boolean mmPerMR; public TopicService(){ - DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); + this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig)DmaapConfig.getConfig(), + new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService()); + + } + + TopicService(Map mr_topics, MR_ClientService clientService, DmaapConfig p, + MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService bridge) { + this.mr_topics = mr_topics; + this.clientService = clientService; defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); centralCname = p.getProperty("MR.CentralCname"); createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true")); String unit_test = p.getProperty( "UnitTest", "No" ); - if ( unit_test.equals( "Yes" ) ) { + if ( "Yes".equals(unit_test)) { strictGraph = false; } - logger.info( "TopicService properties: CentralCname=" + centralCname + + mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); + logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost + - " createTopicRoles=" + createTopicRoles ); + " createTopicRoles=" + createTopicRoles + + " mmPerMR=" + mmPerMR ); + this.clusters = clusters; + this.locations = locations; + this.bridge = bridge; } - - public Map getTopics() { + + public Map getTopics() { return mr_topics; } @@ -101,7 +115,7 @@ public class TopicService extends BaseLoggingClass { ArrayList topics = new ArrayList<>(mr_topics.values()); if ( withClients ) { for( Topic topic: topics ) { - topic.setClients( clientService.getAllMrClients(topic.getFqtn())); + topic.setClients(clientService.getAllMrClients(topic.getFqtn())); } } return topics; @@ -381,7 +395,7 @@ public class TopicService extends BaseLoggingClass { public Topic checkForBridge( Topic topic, ApiError err ) { logger.info( "checkForBridge: entry"); - logger.info( "fqtn=" + topic.getFqtn() + "replicatonType=" + topic.getReplicationCase()); + logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase()); if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) { topic.setStatus( DmaapObject_Status.VALID); return topic; @@ -436,7 +450,8 @@ public class TopicService extends BaseLoggingClass { logger.info( "loc=" + loc ); DcaeLocation location = locations.getDcaeLocation(loc); MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); - logger.info( "cluster=" + cluster ); + logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() ); + logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() ); @@ -451,11 +466,12 @@ public class TopicService extends BaseLoggingClass { case REPLICATION_EDGE_TO_CENTRAL: case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only source = cluster.getFqdn(); - target = centralCname; + target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname; + logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target ); break; case REPLICATION_CENTRAL_TO_EDGE: case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only - source = centralCname; + source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname; target = cluster.getFqdn(); break; case REPLICATION_CENTRAL_TO_GLOBAL: @@ -540,7 +556,7 @@ public class TopicService extends BaseLoggingClass { if ( source != null && target != null ) { try { logger.info( "Create a MM from " + source + " to " + target ); - MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn()); + MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn()); mm.addTopic(topic.getFqtn()); bridge.updateMirrorMaker(mm); } catch ( Exception ex ) {