import org.onap.dmaap.dbcapi.service.MirrorMakerService;
public class MirrorMaker extends DmaapObject {
- static final Logger logger = Logger.getLogger(MirrorMaker.class);
+
private String sourceCluster;
private String targetCluster;
public MirrorMaker(){
}
-
+ public MirrorMaker(String source, String target, int i) {
+ initMM( source, target );
+ // original mm names did not have any index, so leave off index 0 for
+ // backwards compatibility
+ if ( i != 0 ) {
+ String n = this.getMmName() + "_" + i;
+ this.setMmName(n);
+ }
+ }
public MirrorMaker(String source, String target) {
+ initMM( source, target );
+ }
+
+ private void initMM(String source, String target) {
sourceCluster = source;
targetCluster = target;
mmName = genKey(source, target);
}
}
*/
- public String updateWhiteList() {
+ public String getWhitelistUpdateJSON() {
StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"updateWhiteList\": {" );
str.append( " \"name\": \"" + this.getMmName() + "\", \"whitelist\": \"" );
int numTargets = 0;
StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"createMirrorMaker\": {" );
str.append( " \"name\": \"" + this.getMmName() + "\", " );
str.append( " \"consumer\": \"" + this.sourceCluster + ":" + consumerPort + "\", " );
- str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\" ");
+ str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\", ");
- str.append( " } }" );
+ str.append( " \"numStreams\": \"10\" } }" );
return str.toString();
}
@GET
@ApiOperation( value = "return BrTopic details",
- notes = "Returns array of `BrTopic` objects. If source and target query params are specified, only report on that bridge. If detail param is true, list topics names, else just a count is returned",
+ notes = "Returns array of `BrTopic` objects. If source and target query params are specified, only report on that bridge. "
+ + "If detail param is true, list topics names, else just a count is returned.",
response = BrTopic.class)
@ApiResponses( value = {
@ApiResponse( code = 200, message = "Success", response = Dmaap.class),
@ApiResponse( code = 400, message = "Error", response = ApiError.class )
})
- public Response getBridgedTopics(@QueryParam("source") String source,
- @QueryParam("target") String target,
+ public Response getBridgedTopics(@QueryParam("mmagent") String mmagent,
@QueryParam("detail") Boolean detailFlag ){
ApiService check = new ApiService();
+
+ if ( mmagent == null ) {
+ return check.success(getMMcounts(Boolean.TRUE.equals(detailFlag)));
+
+ }
+ logger.info( "getBridgeTopics():" + " mmagent=" + mmagent);
if ( ! Boolean.TRUE.equals(detailFlag)) {
BrTopic brTopic = new BrTopic();
- logger.info( "getBridgeTopics():" + " source=" + source + ", target=" + target);
-
- if (source != null && target != null) { // get topics between 2 bridged locations
- brTopic.setBrSource(source);
- brTopic.setBrTarget(target);
- MirrorMaker mm = mmService.getMirrorMaker(source, target);
- if ( mm != null ) {
- brTopic.setTopicCount( mm.getTopicCount() );
- }
-
- logger.info( "topicCount [2 locations]: " + brTopic.getTopicCount() );
- }
- else if (source == null && target == null ) {
- List<String> mmList = mmService.getAllMirrorMakers();
- brTopic.setBrSource("all");
- brTopic.setBrTarget("all");
- int totCnt = 0;
- for( String key: mmList ) {
- int mCnt = 0;
- MirrorMaker mm = mmService.getMirrorMaker(key);
- if ( mm != null ) {
- mCnt = mm.getTopicCount();
- }
- logger.info( "Count for "+ key + ": " + mCnt);
- totCnt += mCnt;
- }
-
- logger.info( "topicCount [all locations]: " + totCnt );
- brTopic.setTopicCount(totCnt);
-
- }
- else {
-
- logger.error( "source or target is missing");
- check.setCode(Status.BAD_REQUEST.getStatusCode());
- check.setMessage("Either both source and target or neither must be provided");
- return check.error();
- }
- return check.success(brTopic);
- } else {
-
+ // get topics between 2 bridged locations
+
+ MirrorMaker mm = mmService.getMirrorMaker(mmagent);
+ if ( mm == null ) {
+ return check.notFound();
+ }
+
+ brTopic.setTopicCount( mm.getTopicCount() );
+ brTopic.setBrSource( mm.getSourceCluster());
+ brTopic.setBrTarget( mm.getTargetCluster());
+ brTopic.setMmAgentName(mm.getMmName());
- logger.info( "getBridgeTopics() detail:" + " source=" + source + ", target=" + target);
-
- if (source != null && target != null) { // get topics between 2 bridged locations
-
- MirrorMaker mm = mmService.getMirrorMaker(source, target);
- if ( mm == null ) {
- return check.notFound();
- }
-
- return check.success(mm);
- }
+ logger.info( "topicCount [2 locations]: " + brTopic.getTopicCount() );
+
+ return check.success(brTopic);
+ } else {
+ logger.info( "getBridgeTopics() detail:" + " mmagent=" + mmagent);
+ // get topics between 2 bridged locations
+ MirrorMaker mm = mmService.getMirrorMaker(mmagent);
+ if ( mm == null ) {
+ return check.notFound();
+ }
- else {
+ return check.success(mm);
+ }
+ }
- logger.error( "source and target are required when detail=true");
- check.setCode(Status.BAD_REQUEST.getStatusCode());
- check.setMessage("source and target are required when detail=true");
- return check.error();
+ private BrTopic[] getMMcounts( Boolean showDetail ) {
+
+ List<String> mmList = mmService.getAllMirrorMakers();
+ int s = 1;
+ if ( showDetail ) {
+ s = mmList.size() + 1;
+ }
+ BrTopic[] brTopic = new BrTopic[s];
+
+ int totCnt = 0;
+ s = 0;
+ for( String key: mmList ) {
+ int mCnt = 0;
+ MirrorMaker mm = mmService.getMirrorMaker(key);
+ if ( mm != null ) {
+ mCnt = mm.getTopicCount();
+ }
+ logger.info( "Count for "+ key + ": " + mCnt);
+ totCnt += mCnt;
+ if (showDetail) {
+ brTopic[s] = new BrTopic();
+ brTopic[s].setBrSource( mm.getSourceCluster());
+ brTopic[s].setBrTarget(mm.getTargetCluster());
+ brTopic[s].setMmAgentName(mm.getMmName());
+ brTopic[s].setTopicCount(mm.getTopicCount());
+ s++;
}
}
+
+ logger.info( "topicCount [all locations]: " + totCnt );
+ brTopic[s] = new BrTopic();
+ brTopic[s].setBrSource("all");
+ brTopic[s].setBrTarget("all");
+ brTopic[s].setMmAgentName("n/a");
+ brTopic[s].setTopicCount(totCnt);
+ return brTopic;
}
@PUT
@ApiOperation( value = "update MirrorMaker details",
- notes = "replace the topic list for a specific Bridge. Use JSON Body for value to replace whitelist, but if refreshFlag param is true, simply refresh using existing whitelist",
+ notes = "replace the topic list for a specific Bridge. Use JSON Body for value to replace whitelist, "
+ + "but if refreshFlag param is true, simply refresh using existing whitelist."
+ + "If split param is true, spread whitelist over smaller mmagents.",
response = MirrorMaker.class)
@ApiResponses( value = {
@ApiResponse( code = 200, message = "Success", response = Dmaap.class),
@ApiResponse( code = 400, message = "Error", response = ApiError.class )
})
- public Response putBridgedTopics(@QueryParam("source") String source,
- @QueryParam("target") String target,
+ public Response putBridgedTopics(@QueryParam("mmagent") String mmagent,
@QueryParam("refresh") Boolean refreshFlag,
+ @QueryParam("split") Boolean splitFlag,
MirrorMaker newBridge ){
ApiService check = new ApiService();
- logger.info( "putBridgeTopics() detail:" + " source=" + source + ", target=" + target);
+ logger.info( "putBridgeTopics() mmagent:" + mmagent );
- if (source != null && target != null) { // get topics between 2 bridged locations
+ if ( mmagent != null ) { // put topics between 2 bridged locations
- MirrorMaker mm = mmService.getMirrorMaker(source, target);
+ MirrorMaker mm = mmService.getMirrorMaker(mmagent);
if ( mm == null ) {
return check.notFound();
}
- if ( refreshFlag != null && refreshFlag == false ) {
- logger.info( "setting whitelist from message body");
+
+ if ( splitFlag != null && splitFlag == true ) {
+ mm = mmService.splitMM( mm );
+ } else if ( refreshFlag == null || refreshFlag == false ) {
+ logger.info( "setting whitelist from message body containing mmName=" + newBridge.getMmName());
+ if ( ! mmagent.equals(newBridge.getMmName()) ){
+ logger.error( "mmagent query param does not match mmName in body");
+ check.setCode(Status.BAD_REQUEST.getStatusCode());
+ check.setMessage("mmagent query param does not match mmName in body");
+ return check.error();
+ }
mm.setTopics( newBridge.getTopics() );
} else {
logger.info( "refreshing whitelist from memory");
else {
- logger.error( "source and target are required when detail=true");
+ logger.error( "mmagent is required for PUT");
check.setCode(Status.BAD_REQUEST.getStatusCode());
- check.setMessage("source and target are required when detail=true");
+ check.setMessage("mmagent is required for PUT");
return check.error();
}
private static String defaultProducerPort;
private static String defaultConsumerPort;
private static String centralFqdn;
+ private int maxTopicsPerMM;
public MirrorMakerService() {
super();
defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092");
defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");
centralFqdn = p.getProperty("MR.CentralCname", "notSet");
+ maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
}
// will create a MM on MMagent if needed
mm.setStatus(DmaapObject_Status.INVALID);
} else {
prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- resp = prov.doPostMessage(mm.updateWhiteList());
+ resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
if ( ! resp.is2xx()) {
errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
mm.setStatus(DmaapObject_Status.INVALID);
mm.setLastMod();
return mirrors.put( mm.getMmName(), mm);
}
+ public MirrorMaker getMirrorMaker( String part1, String part2, int index ) {
+ String targetPart;
+
+ // original mm names did not have any index, so leave off index 0 for
+ // backwards compatibility
+ if ( index == 0 ) {
+ targetPart = part2;
+ } else {
+ targetPart = part2 + "_" + index;
+ }
+ logger.info( "getMirrorMaker using " + part1 + " and " + targetPart );
+ return mirrors.get(MirrorMaker.genKey(part1, targetPart));
+ }
public MirrorMaker getMirrorMaker( String part1, String part2 ) {
logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
return mirrors.get(MirrorMaker.genKey(part1, part2));
return ret;
}
+
+ public MirrorMaker getNextMM( String source, String target ) {
+ int i = 0;
+ MirrorMaker mm = null;
+ while( mm == null ) {
+
+ mm = this.getMirrorMaker( source, target, i);
+ if ( mm == null ) {
+ mm = new MirrorMaker(source, target, i);
+ }
+ if ( mm.getTopicCount() >= maxTopicsPerMM ) {
+ logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics. Moving to next MM");
+ i++;
+ mm = null;
+ }
+ }
+
+
+ return mm;
+ }
+
+ public MirrorMaker splitMM( MirrorMaker orig ) {
+
+ int index = 1;
+ String source = orig.getSourceCluster();
+ String target = orig.getTargetCluster();
+
+
+ ArrayList<String> whitelist = orig.getTopics();
+ while( whitelist.size() > maxTopicsPerMM ) {
+ MirrorMaker mm = this.getNextMM( source, target );
+ int last = whitelist.size() - 1;
+ String topic = whitelist.get(last);
+ whitelist.remove(last);
+ mm.addTopic(topic);
+ this.updateMirrorMaker(mm);
+ }
+
+ orig.setTopics(whitelist);
+
+ return orig;
+
+ }
}