private static String centralCname;
+
public TopicService(){
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
centralCname = p.getProperty("MR.CentralCname");
- logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost );
+
+
+ logger.info( "TopicService properties: CentralCname=" + centralCname +
+ " defaultGlobarlMrHost=" + defaultGlobalMrHost );
}
public Map<String, Topic> getTopics() {
}
public List<Topic> getAllTopics() {
+ return getAllTopics( true );
+ }
+ public List<Topic> getAllTopicsWithoutClients() {
+ return getAllTopics(false);
+ }
+
+ private List<Topic> getAllTopics( Boolean withClients ) {
ArrayList<Topic> topics = new ArrayList<Topic>(mr_topics.values());
- for( Topic topic: topics ) {
- topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+ if ( withClients ) {
+ for( Topic topic: topics ) {
+ topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+ }
}
return topics;
}
return t;
}
- public Topic addTopic( Topic topic, ApiError err ) {
+ public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) {
logger.info( "Entry: addTopic");
logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() );
String nFqtn = topic.genFqtn();
logger.info( "FQTN=" + nFqtn );
- if ( getTopic( nFqtn, err ) != null ) {
+ Topic pTopic = getTopic( nFqtn, err );
+ if ( pTopic != null ) {
String t = "topic already exists: " + nFqtn;
logger.info( t );
+ if ( useExisting ) {
+ err.setCode(Status.OK.getStatusCode());
+ return pTopic;
+ }
err.setMessage( t );
err.setFields( "fqtn");
err.setCode(Status.CONFLICT.getStatusCode());
err.reset(); // err filled with NOT_FOUND is expected case, but don't want to litter...
topic.setFqtn( nFqtn );
-
+
AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
- String t = dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + ".mr.topic";
+
+ String t = dmaapSvc.getTopicPerm();
+
String instance = ":topic." + topic.getFqtn();
String[] actions = { "pub", "sub", "view" };
TopicService ts = new TopicService();
ApiError err = new ApiError();
- ts.addTopic(bridgeAdminTopic, err);
+ ts.addTopic(bridgeAdminTopic, err, true);
if ( err.is2xx() || err.getCode() == 409 ){
err.setCode(200);
if ( source != null && target != null ) {
try {
logger.info( "Create a MM from " + source + " to " + target );
- MirrorMaker mm = bridge.getMirrorMaker( source, target);
- if ( mm == null ) {
- mm = new MirrorMaker(source, target);
- }
+ MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
mm.addTopic(topic.getFqtn());
bridge.updateMirrorMaker(mm);
} catch ( Exception ex ) {
}
+
/*
* Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
* This was determined automatically based on presence of edge publishers and central subscribers.