//import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
import org.onap.dmaap.dbcapi.aaf.AafDecrypt;
-import org.onap.dmaap.dbcapi.aaf.client.MrTopicConnection;
-import org.onap.dmaap.dbcapi.aaf.database.DatabaseClass;
+import org.onap.dmaap.dbcapi.client.MrTopicConnection;
+import org.onap.dmaap.dbcapi.database.DatabaseClass;
import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.model.ApiError;
private static MrTopicConnection prov;
private static AafDecrypt decryptor;
+ private static String provUser;
+ private static String provUserPwd;
+ private static String defaultProducerPort;
+ private static String defaultConsumerPort;
+ private static String centralFqdn;
+ private int maxTopicsPerMM;
+
public MirrorMakerService() {
super();
-
decryptor = new AafDecrypt();
+ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+ provUser = p.getProperty("MM.ProvUserMechId");
+ provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+ defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092");
+ defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");
+ centralFqdn = p.getProperty("MR.CentralCname", "notSet");
+ maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
}
// will create a MM on MMagent if needed
// will update the MMagent whitelist with all topics for this MM
public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
logger.info( "updateMirrorMaker");
- DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
- String provUser = p.getProperty("MM.ProvUserMechId");
- String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+
prov = new MrTopicConnection( provUser, provUserPwd );
-
- String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
-
+
DmaapService dmaap = new DmaapService();
MR_ClusterService clusters = new MR_ClusterService();
// but only send 1 message so MM Agents can read it relying on kafka delivery
for( MR_Cluster central: clusters.getCentralClusters() ) {
prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- ApiError resp = prov.doPostMessage(mm.createMirrorMaker());
+ ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
if ( ! resp.is2xx() ) {
errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
mm.setStatus(DmaapObject_Status.INVALID);
} else {
prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- resp = prov.doPostMessage(mm.updateWhiteList());
+ resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
if ( ! resp.is2xx()) {
errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
mm.setStatus(DmaapObject_Status.INVALID);
mm.setLastMod();
return mirrors.put( mm.getMmName(), mm);
}
+ public MirrorMaker getMirrorMaker( String part1, String part2, int index ) {
+ String targetPart;
+
+ // original mm names did not have any index, so leave off index 0 for
+ // backwards compatibility
+ if ( index == 0 ) {
+ targetPart = part2;
+ } else {
+ targetPart = part2 + "_" + index;
+ }
+ logger.info( "getMirrorMaker using " + part1 + " and " + targetPart );
+ return mirrors.get(MirrorMaker.genKey(part1, targetPart));
+ }
public MirrorMaker getMirrorMaker( String part1, String part2 ) {
logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
return mirrors.get(MirrorMaker.genKey(part1, part2));
return mirrors.get(key);
}
- /*public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
- logger.info( "updateMirrorMaker");
- return mirrors.put( mm.getMmName(), mm);
- }
- */
public void delMirrorMaker( MirrorMaker mm ) {
logger.info( "delMirrorMaker");
return ret;
}
+
+ public MirrorMaker getNextMM( String source, String target, String fqtn ) {
+ int i = 0;
+ MirrorMaker mm = null;
+ while( mm == null ) {
+
+ mm = this.getMirrorMaker( source, target, i);
+ if ( mm == null ) {
+ mm = new MirrorMaker(source, target, i);
+ }
+ if ( mm.getTopics().contains(fqtn) ) {
+ break;
+ }
+ if ( mm.getTopicCount() >= maxTopicsPerMM ) {
+ logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics. Moving to next MM");
+ i++;
+ mm = null;
+ }
+ }
+
+
+ return mm;
+ }
+
+ public MirrorMaker splitMM( MirrorMaker orig ) {
+
+ String source = orig.getSourceCluster();
+ String target = orig.getTargetCluster();
+
+
+ ArrayList<String> whitelist = orig.getTopics();
+ while( whitelist.size() > maxTopicsPerMM ) {
+
+ int last = whitelist.size() - 1;
+ String topic = whitelist.get(last);
+ whitelist.remove(last);
+ MirrorMaker mm = this.getNextMM( source, target, "aValueThatShouldNotMatchAnything" );
+ mm.addTopic(topic);
+ this.updateMirrorMaker(mm);
+ }
+
+ orig.setTopics(whitelist);
+
+ return orig;
+
+ }
}