import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.ws.rs.core.Response.Status;
import org.onap.dmaap.dbcapi.aaf.AafService;
-import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
-import org.onap.dmaap.dbcapi.aaf.database.DatabaseClass;
+import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
+import org.onap.dmaap.dbcapi.database.DatabaseClass;
import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.model.ApiError;
+import org.onap.dmaap.dbcapi.model.DcaeLocation;
import org.onap.dmaap.dbcapi.model.Dmaap;
+import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
import org.onap.dmaap.dbcapi.model.MR_Client;
import org.onap.dmaap.dbcapi.model.MR_Cluster;
import org.onap.dmaap.dbcapi.model.MirrorMaker;
import org.onap.dmaap.dbcapi.model.ReplicationType;
import org.onap.dmaap.dbcapi.model.Topic;
-import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
import org.onap.dmaap.dbcapi.util.DmaapConfig;
+import org.onap.dmaap.dbcapi.util.Fqdn;
import org.onap.dmaap.dbcapi.util.Graph;
public class TopicService extends BaseLoggingClass {
private static String defaultGlobalMrHost;
private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
- private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters();
private static DmaapService dmaapSvc = new DmaapService();
private static Dmaap dmaap = new DmaapService().getDmaap();
private MR_ClientService clientService = new MR_ClientService();
+ private MR_ClusterService clusters = new MR_ClusterService();
+ private DcaeLocationService locations = new DcaeLocationService();
private MirrorMakerService bridge = new MirrorMakerService();
+
+ private static String centralCname;
+
public TopicService(){
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
+ centralCname = p.getProperty("MR.CentralCname");
+
+
+ logger.info( "TopicService properties: CentralCname=" + centralCname +
+ " defaultGlobarlMrHost=" + defaultGlobalMrHost );
}
public Map<String, Topic> getTopics() {
}
public List<Topic> getAllTopics() {
+ return getAllTopics( true );
+ }
+ public List<Topic> getAllTopicsWithoutClients() {
+ return getAllTopics(false);
+ }
+
+ private List<Topic> getAllTopics( Boolean withClients ) {
ArrayList<Topic> topics = new ArrayList<Topic>(mr_topics.values());
- for( Topic topic: topics ) {
- topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+ if ( withClients ) {
+ for( Topic topic: topics ) {
+ topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+ }
}
return topics;
}
return t;
}
- public Topic addTopic( Topic topic, ApiError err ) {
+ public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) {
logger.info( "Entry: addTopic");
logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() );
String nFqtn = topic.genFqtn();
logger.info( "FQTN=" + nFqtn );
- if ( getTopic( nFqtn, err ) != null ) {
+ Topic pTopic = getTopic( nFqtn, err );
+ if ( pTopic != null ) {
String t = "topic already exists: " + nFqtn;
logger.info( t );
+ if ( useExisting ) {
+ err.setCode(Status.OK.getStatusCode());
+ return pTopic;
+ }
err.setMessage( t );
err.setFields( "fqtn");
err.setCode(Status.CONFLICT.getStatusCode());
err.reset(); // err filled with NOT_FOUND is expected case, but don't want to litter...
topic.setFqtn( nFqtn );
-
+
AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
- String t = dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + ".mr.topic";
+
+ String t = dmaapSvc.getTopicPerm();
+
String instance = ":topic." + topic.getFqtn();
String[] actions = { "pub", "sub", "view" };
return null;
}
}
+ if ( topic.getReplicationCase().involvesGlobal() ) {
+ if ( topic.getGlobalMrURL() == null ) {
+ topic.setGlobalMrURL(defaultGlobalMrHost);
+ }
+ if ( ! Fqdn.isValid( topic.getGlobalMrURL())) {
+ logger.error( "GlobalMR FQDN not valid: " + topic.getGlobalMrURL());
+ topic.setStatus( DmaapObject_Status.INVALID);
+ err.setCode(500);
+ err.setMessage("Value is not a valid FQDN:" + topic.getGlobalMrURL() );
+ err.setFields("globalMrURL");
+
+ return null;
+ }
+ }
if ( topic.getNumClients() > 0 ) {
topic.setClients(clients2);
}
- if ( topic.getReplicationCase().involvesGlobal() ) {
- if ( topic.getGlobalMrURL() == null ) {
- topic.setGlobalMrURL(defaultGlobalMrHost);
- }
- }
+
Topic ntopic = checkForBridge( topic, err );
if ( ntopic == null ) {
topic.setStatus( DmaapObject_Status.INVALID);
- return null;
+ if ( ! err.is2xx()) {
+ return null;
+ }
}
+
mr_topics.put( nFqtn, ntopic );
err.setCode(Status.OK.getStatusCode());
Topic ntopic = checkForBridge( topic, err );
if ( ntopic == null ) {
topic.setStatus( DmaapObject_Status.INVALID);
- return null;
+ if ( ! err.is2xx() ) {
+ return null;
+ }
+ }
+ if(ntopic != null) {
+ mr_topics.put( ntopic.getFqtn(), ntopic );
}
- mr_topics.put( ntopic.getFqtn(), ntopic );
err.setCode(Status.OK.getStatusCode());
return ntopic;
}
TopicService ts = new TopicService();
ApiError err = new ApiError();
- ts.addTopic(bridgeAdminTopic, err);
+ ts.addTopic(bridgeAdminTopic, err, true);
if ( err.is2xx() || err.getCode() == 409 ){
err.setCode(200);
return topic;
}
- boolean anythingWrong = false;
- String centralFqdn = new String();
- Graph graph = new Graph( topic.getClients(), true );
+ boolean anythingWrong = false;
+
+ Set<String> groups = clusters.getGroups();
+ for ( String g : groups ) {
+ anythingWrong |= buildBridge( topic, err, g );
+ }
+ if ( anythingWrong ) {
+ topic.setStatus( DmaapObject_Status.INVALID);
+ if ( ! err.is2xx() ) {
+ return null;
+ }
+ } else {
+ topic.setStatus( DmaapObject_Status.VALID);
+ }
+ return topic;
+ }
+
+ private boolean buildBridge( Topic topic, ApiError err, String group ) {
+
+ boolean anythingWrong = false;
+ Graph graph;
+ if ( group == null || group.isEmpty() ) {
+ graph = new Graph( topic.getClients(), true );
+ } else {
+ graph = new Graph( topic.getClients(), true, group );
+ }
+ MR_Cluster groupCentralCluster = null;
- if ( graph.isHasCentral() ) {
- DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
- centralFqdn = p.getProperty("MR.CentralCname");
- logger.info( "CentralCname=" + centralFqdn );
+ if ( graph.isEmpty() ) {
+ return false;
+ } else if ( group == null && topic.getReplicationCase().involvesFQDN() ) {
+ return false;
+ } else if ( ! graph.hasCentral() ) {
+ logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
+ return true;
} else {
- logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients");
+ groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
}
- Collection<String> locations = graph.getKeys();
- for( String loc : locations ) {
+ Collection<String> clientLocations = graph.getKeys();
+ for( String loc : clientLocations ) {
logger.info( "loc=" + loc );
- MR_Cluster cluster = clusters.get(loc);
+ DcaeLocation location = locations.getDcaeLocation(loc);
+ MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
logger.info( "cluster=" + cluster );
String source = null;
String target = null;
+
/*
- * all replication rules have 1 bridge...
+ * Provision Edge to Central bridges...
*/
- switch( topic.getReplicationCase() ) {
- case REPLICATION_EDGE_TO_CENTRAL:
- case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only
- if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ if ( ! location.isCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+ switch( topic.getReplicationCase() ) {
+ case REPLICATION_EDGE_TO_CENTRAL:
+ case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only
+ source = cluster.getFqdn();
+ target = centralCname;
break;
- }
- source = cluster.getFqdn();
- target = centralFqdn;
- break;
- case REPLICATION_CENTRAL_TO_EDGE:
- if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
- continue;
- }
- source = centralFqdn;
- target = cluster.getFqdn();
- break;
- case REPLICATION_CENTRAL_TO_GLOBAL:
- if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ case REPLICATION_CENTRAL_TO_EDGE:
+ case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only
+ source = centralCname;
+ target = cluster.getFqdn();
+ break;
+ case REPLICATION_CENTRAL_TO_GLOBAL:
+ case REPLICATION_GLOBAL_TO_CENTRAL:
+ case REPLICATION_FQDN_TO_GLOBAL:
+ case REPLICATION_GLOBAL_TO_FQDN:
+ break;
+
+ case REPLICATION_EDGE_TO_FQDN:
+ case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2C portion only
+ source = cluster.getFqdn();
+ target = groupCentralCluster.getFqdn();
+ break;
+ case REPLICATION_FQDN_TO_EDGE:
+ case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for F2E portion only
+ source = groupCentralCluster.getFqdn();
+ target = cluster.getFqdn();
+ break;
+
+ default:
+ logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+ anythingWrong = true;
+ err.setCode(400);
+ err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+ + topic.getReplicationCase() );
+ err.setMessage("Unexpected value for ReplicationType");
continue;
}
- source = centralFqdn;
- target = topic.getGlobalMrURL();
- break;
- case REPLICATION_GLOBAL_TO_CENTRAL:
- case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only
- if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+
+ } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+ /*
+ * Provision Central to Global bridges
+ */
+ switch( topic.getReplicationCase() ) {
+
+ case REPLICATION_CENTRAL_TO_GLOBAL:
+ case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
+ source = centralCname;
+ target = topic.getGlobalMrURL();
+ break;
+ case REPLICATION_GLOBAL_TO_CENTRAL:
+ case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only
+ source = topic.getGlobalMrURL();
+ target = centralCname;
+ break;
+
+ case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2F portion only
+ source = groupCentralCluster.getFqdn();
+ target = topic.getGlobalMrURL();
+ break;
+
+ case REPLICATION_FQDN_TO_GLOBAL:
+ source = groupCentralCluster.getFqdn();
+ target = topic.getGlobalMrURL();
+ break;
+
+ case REPLICATION_GLOBAL_TO_FQDN:
+ case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for G2F portion only
+ source = topic.getGlobalMrURL();
+ target = groupCentralCluster.getFqdn();
+ break;
+
+ case REPLICATION_FQDN_TO_EDGE:
+ case REPLICATION_EDGE_TO_FQDN:
+ case REPLICATION_EDGE_TO_CENTRAL:
+ case REPLICATION_CENTRAL_TO_EDGE:
+ break;
+ default:
+ logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+ anythingWrong = true;
+ err.setCode(400);
+ err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+ + topic.getReplicationCase() );
+ err.setMessage("Unexpected value for ReplicationType");
continue;
- }
- source = topic.getGlobalMrURL();
- target = centralFqdn;
- break;
- default:
- logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+ }
+ } else {
+ logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
anythingWrong = true;
continue;
}
if ( source != null && target != null ) {
try {
logger.info( "Create a MM from " + source + " to " + target );
- MirrorMaker mm = bridge.getMirrorMaker( source, target);
- if ( mm == null ) {
- mm = new MirrorMaker(source, target);
- }
+ MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
mm.addTopic(topic.getFqtn());
bridge.updateMirrorMaker(mm);
} catch ( Exception ex ) {
anythingWrong = true;
break;
}
- }
-
-
- /*
- * some replication rules have a 2nd bridge!
- */
- source = target = null;
- switch( topic.getReplicationCase() ) {
- case REPLICATION_EDGE_TO_CENTRAL:
- case REPLICATION_CENTRAL_TO_EDGE:
- case REPLICATION_CENTRAL_TO_GLOBAL:
- case REPLICATION_GLOBAL_TO_CENTRAL:
- continue;
- case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for C2G portion only
- if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
- continue;
- }
- source = centralFqdn;
- target = topic.getGlobalMrURL();
- break;
-
- case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only
- if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
- continue;
- }
- source = centralFqdn;
- target = cluster.getFqdn();
- break;
- default:
- logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
- anythingWrong = true;
- break;
- }
- if ( source != null && target != null ) {
- try {
- logger.info( "Create a MM from " + source + " to " + target );
- MirrorMaker mm = bridge.getMirrorMaker( source, target);
- if ( mm == null ) {
- mm = new MirrorMaker(source, target);
- }
- mm.addTopic(topic.getFqtn());
- bridge.updateMirrorMaker(mm);
- } catch ( Exception ex ) {
- err.setCode(500);
- err.setFields( "mirror_maker.topic");
- err.setMessage("Unexpected condition: " + ex );
- anythingWrong = true;
- break;
- }
- }
+ }
+
}
- if ( anythingWrong ) {
- topic.setStatus( DmaapObject_Status.INVALID);
- return null;
- }
-
- topic.setStatus( DmaapObject_Status.VALID);
- return topic;
+ return anythingWrong;
+
}
+
/*
* Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
* This was determined automatically based on presence of edge publishers and central subscribers.
Graph graph = new Graph( topic.getClients(), false );
String centralFqdn = new String();
- if ( graph.isHasCentral() ) {
+ if ( graph.hasCentral() ) {
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
centralFqdn = p.getProperty("MR.CentralCname");
}
Collection<String> locations = graph.getKeys();
for( String loc : locations ) {
logger.info( "loc=" + loc );
- MR_Cluster cluster = clusters.get(loc);
+ MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
if ( cluster == null ) {
logger.info( "No MR cluster for location " + loc );
continue;
}
- if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ if ( graph.hasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;