// REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
private static String defaultGlobalMrHost;
- private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
+ private Map<String, Topic> mr_topics;
private static DmaapService dmaapSvc = new DmaapService();
- private MR_ClientService clientService = new MR_ClientService();
- private MR_ClusterService clusters = new MR_ClusterService();
- private DcaeLocationService locations = new DcaeLocationService();
- private MirrorMakerService bridge = new MirrorMakerService();
+ private MR_ClientService clientService;
+ private MR_ClusterService clusters;
+ private DcaeLocationService locations;
+ private MirrorMakerService bridge;
private static String centralCname;
private static boolean createTopicRoles;
private boolean strictGraph = true;
+ private boolean mmPerMR;
public TopicService(){
- DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+ this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig)DmaapConfig.getConfig(),
+ new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService());
+
+ }
+
+ TopicService(Map<String, Topic> mr_topics, MR_ClientService clientService, DmaapConfig p,
+ MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService bridge) {
+ this.mr_topics = mr_topics;
+ this.clientService = clientService;
defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
centralCname = p.getProperty("MR.CentralCname");
createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true"));
String unit_test = p.getProperty( "UnitTest", "No" );
- if ( unit_test.equals( "Yes" ) ) {
+ if ( "Yes".equals(unit_test)) {
strictGraph = false;
}
- logger.info( "TopicService properties: CentralCname=" + centralCname +
+ mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
+ logger.info( "TopicService properties: CentralCname=" + centralCname +
" defaultGlobarlMrHost=" + defaultGlobalMrHost +
- " createTopicRoles=" + createTopicRoles );
+ " createTopicRoles=" + createTopicRoles +
+ " mmPerMR=" + mmPerMR );
+ this.clusters = clusters;
+ this.locations = locations;
+ this.bridge = bridge;
}
-
- public Map<String, Topic> getTopics() {
+
+ public Map<String, Topic> getTopics() {
return mr_topics;
}
ArrayList<Topic> topics = new ArrayList<>(mr_topics.values());
if ( withClients ) {
for( Topic topic: topics ) {
- topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+ topic.setClients(clientService.getAllMrClients(topic.getFqtn()));
}
}
return topics;
public Topic checkForBridge( Topic topic, ApiError err ) {
logger.info( "checkForBridge: entry");
- logger.info( "fqtn=" + topic.getFqtn() + "replicatonType=" + topic.getReplicationCase());
+ logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase());
if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
topic.setStatus( DmaapObject_Status.VALID);
return topic;
logger.info( "loc=" + loc );
DcaeLocation location = locations.getDcaeLocation(loc);
MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
- logger.info( "cluster=" + cluster );
+ logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() );
+ logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() );
case REPLICATION_EDGE_TO_CENTRAL:
case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only
source = cluster.getFqdn();
- target = centralCname;
+ target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname;
+ logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target );
break;
case REPLICATION_CENTRAL_TO_EDGE:
case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only
- source = centralCname;
+ source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
target = cluster.getFqdn();
break;
case REPLICATION_CENTRAL_TO_GLOBAL:
if ( source != null && target != null ) {
try {
logger.info( "Create a MM from " + source + " to " + target );
- MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
+ MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn());
mm.addTopic(topic.getFqtn());
bridge.updateMirrorMaker(mm);
} catch ( Exception ex ) {