import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.model.ApiError;
import org.onap.dmaap.dbcapi.model.DcaeLocation;
-import org.onap.dmaap.dbcapi.model.Dmaap;
import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
import org.onap.dmaap.dbcapi.model.MR_Client;
import org.onap.dmaap.dbcapi.model.MR_Cluster;
// REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
private static String defaultGlobalMrHost;
- private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
+ private Map<String, Topic> mr_topics;
private static DmaapService dmaapSvc = new DmaapService();
- private MR_ClientService clientService = new MR_ClientService();
- private MR_ClusterService clusters = new MR_ClusterService();
- private DcaeLocationService locations = new DcaeLocationService();
- private MirrorMakerService bridge = new MirrorMakerService();
+ private MR_ClientService clientService;
+ private MR_ClusterService clusters;
+ private DcaeLocationService locations;
+ private MirrorMakerService bridge;
private static String centralCname;
private static boolean createTopicRoles;
+ private boolean strictGraph = true;
+ private boolean mmPerMR;
public TopicService(){
- DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+ this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig)DmaapConfig.getConfig(),
+ new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService());
+
+ }
+
+ TopicService(Map<String, Topic> mr_topics, MR_ClientService clientService, DmaapConfig p,
+ MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService bridge) {
+ this.mr_topics = mr_topics;
+ this.clientService = clientService;
defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
centralCname = p.getProperty("MR.CentralCname");
createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true"));
-
-
- logger.info( "TopicService properties: CentralCname=" + centralCname +
+ String unit_test = p.getProperty( "UnitTest", "No" );
+ if ( "Yes".equals(unit_test)) {
+ strictGraph = false;
+ }
+ mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
+ logger.info( "TopicService properties: CentralCname=" + centralCname +
" defaultGlobarlMrHost=" + defaultGlobalMrHost +
- " createTopicRoles=" + createTopicRoles );
+ " createTopicRoles=" + createTopicRoles +
+ " mmPerMR=" + mmPerMR );
+ this.clusters = clusters;
+ this.locations = locations;
+ this.bridge = bridge;
}
-
- public Map<String, Topic> getTopics() {
+
+ public Map<String, Topic> getTopics() {
return mr_topics;
}
ArrayList<Topic> topics = new ArrayList<>(mr_topics.values());
if ( withClients ) {
for( Topic topic: topics ) {
- topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+ topic.setClients(clientService.getAllMrClients(topic.getFqtn()));
}
}
return topics;
public Topic updateTopic( Topic topic, ApiError err ) {
- logger.info( "Entry: updateTopic");
+ logger.info( "updateTopic: entry");
+ logger.info( "updateTopic: topic=" + topic);
+ logger.info( "updateTopic: fqtn=" + topic.getFqtn() );
if ( topic.getFqtn().isEmpty()) {
return null;
}
+ logger.info( "updateTopic: call checkForBridge");
Topic ntopic = checkForBridge( topic, err );
if ( ntopic == null ) {
topic.setStatus( DmaapObject_Status.INVALID);
}
}
if(ntopic != null) {
+ logger.info( "updateTopic: call put");
mr_topics.put( ntopic.getFqtn(), ntopic );
}
err.setCode(Status.OK.getStatusCode());
public Topic checkForBridge( Topic topic, ApiError err ) {
-
+ logger.info( "checkForBridge: entry");
+ logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase());
if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
topic.setStatus( DmaapObject_Status.VALID);
return topic;
Set<String> groups = clusters.getGroups();
for ( String g : groups ) {
+ logger.info( "buildBridge for " + topic.getFqtn() + " on group" + g);
anythingWrong |= buildBridge( topic, err, g );
}
if ( anythingWrong ) {
}
private boolean buildBridge( Topic topic, ApiError err, String group ) {
-
+ logger.info( "buildBridge: entry");
boolean anythingWrong = false;
Graph graph;
+ logger.info( "buildBridge: strictGraph=" + strictGraph );
if ( group == null || group.isEmpty() ) {
- graph = new Graph( topic.getClients(), true );
+ graph = new Graph( topic.getClients(), strictGraph );
} else {
- graph = new Graph( topic.getClients(), true, group );
+ graph = new Graph( topic.getClients(), strictGraph, group );
}
+ logger.info( "buildBridge: graph=" + graph );
MR_Cluster groupCentralCluster = null;
+
if ( graph.isEmpty() ) {
+ logger.info( "buildBridge: graph is empty. return false" );
return false;
} else if ( group == null && topic.getReplicationCase().involvesFQDN() ) {
+ logger.info( "buildBridge: group is null and replicationCaseInvolvesFQDN. return false" );
return false;
} else if ( ! graph.hasCentral() ) {
logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
logger.info( "loc=" + loc );
DcaeLocation location = locations.getDcaeLocation(loc);
MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
- logger.info( "cluster=" + cluster );
+ logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() );
+ logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() );
case REPLICATION_EDGE_TO_CENTRAL:
case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only
source = cluster.getFqdn();
- target = centralCname;
+ target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname;
+ logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target );
break;
case REPLICATION_CENTRAL_TO_EDGE:
case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only
- source = centralCname;
+ source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
target = cluster.getFqdn();
break;
case REPLICATION_CENTRAL_TO_GLOBAL:
if ( source != null && target != null ) {
try {
logger.info( "Create a MM from " + source + " to " + target );
- MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
+ MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn());
mm.addTopic(topic.getFqtn());
bridge.updateMirrorMaker(mm);
} catch ( Exception ex ) {