package org.onap.dmaap.dbcapi.model;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.service.MirrorMakerService;
+import java.util.ArrayList;
+
public class MirrorMaker extends DmaapObject {
- static final Logger logger = Logger.getLogger(MirrorMaker.class);
+
private String sourceCluster;
private String targetCluster;
private String mmName;
private ArrayList<String> topics; //re-using this var name for backwards DB compatibility
- private Set<ReplicationVector> vectors;
public MirrorMaker(){
}
-
+ public MirrorMaker(String source, String target, int i) {
+ initMM( source, target );
+ // original mm names did not have any index, so leave off index 0 for
+ // backwards compatibility
+ if ( i != 0 ) {
+ String n = this.getMmName() + "_" + i;
+ this.setMmName(n);
+ }
+ }
public MirrorMaker(String source, String target) {
+ initMM( source, target );
+ }
+
+ private void initMM(String source, String target) {
sourceCluster = source;
targetCluster = target;
mmName = genKey(source, target);
- vectors = new HashSet<ReplicationVector>();
- topics = new ArrayList<String>();
+ topics = new ArrayList<>();
}
}
- public void addVector( String fqtn, String source, String target ) {
- logger.info( "addVector: fqtn=" + fqtn + " source=" + source + " target=" + target );
- if ( ! sourceCluster.equals( source ) ){
- errorLogger.error( DmaapbcLogMessageEnum.MM_CIRCULAR_REF, source, sourceCluster );
- }
- vectors.add(new ReplicationVector( fqtn, source, target ));
- }
-
- public void delVector( String fqtn, String source, String target ) {
- vectors.remove(new ReplicationVector( fqtn, source, target));
- }
-
-
-
- public String toJSON() {
- StringBuilder str = new StringBuilder( "{ \"source\": " + sourceCluster + ",\"topics\": [" );
- int numTargets = 0;
- for (ReplicationVector rv: vectors) {
- if ( numTargets > 0 ) {
- str.append( ",");
- }
- str.append( " \"target\": " + rv.getTargetCluster() + ", \"topic\": " + rv.getFqtn());
- numTargets++;
- }
- str.append( "] }" );
-
- return str.toString();
- }
-
-
// returns the JSON for MM message containing which Topics to replicate
/*
* example:
}
}
*/
- public String updateWhiteList() {
+ public String getWhitelistUpdateJSON() {
StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"updateWhiteList\": {" );
str.append( " \"name\": \"" + this.getMmName() + "\", \"whitelist\": \"" );
int numTargets = 0;
- //for (ReplicationVector rv: vectors) {
for (String rv: topics) {
if ( numTargets > 0 ) {
str.append( ",");
}
- //str.append( rv.getFqtn() );
str.append( rv );
numTargets++;
}
}
}
*/
- public String createMirrorMaker() {
+ public String createMirrorMaker( String consumerPort, String producerPort ) {
StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"createMirrorMaker\": {" );
str.append( " \"name\": \"" + this.getMmName() + "\", " );
- str.append( " \"consumer\": \"" + this.sourceCluster + ":2181\", " );
- str.append( " \"producer\": \"" + this.targetCluster + ":9092\" ");
+ str.append( " \"consumer\": \"" + this.sourceCluster + ":" + consumerPort + "\", " );
+ str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\", ");
- str.append( " } }" );
+ str.append( " \"numStreams\": \"10\" } }" );
return str.toString();
}
}
- public Set<ReplicationVector> getVectors() {
- return vectors;
- }
-
- public void setVectors(Set<ReplicationVector> vectors) {
- this.vectors = vectors;
- }
public ArrayList<String> getTopics() {
return topics;
}
- //public void setVectors(Set<ReplicationVector> vectors) {
+
public void setTopics(ArrayList<String> topics) {
this.topics = topics;
}
public void addTopic( String topic ) {
- topics.add(topic);
+ if ( ! topics.contains(topic)) {
+ topics.add(topic);
+ }
+ logger.info( "Mirrormaker.addTopic: topic=" + topic + " . Now have " + topics.size() + " topics" );
}
public int getTopicCount() {