//import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
import org.onap.dmaap.dbcapi.aaf.AafDecrypt;
-import org.onap.dmaap.dbcapi.aaf.client.MrTopicConnection;
-import org.onap.dmaap.dbcapi.aaf.database.DatabaseClass;
+import org.onap.dmaap.dbcapi.client.MrTopicConnection;
+import org.onap.dmaap.dbcapi.database.DatabaseClass;
import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.model.ApiError;
private static MrTopicConnection prov;
private static AafDecrypt decryptor;
+ private static String provUser;
+ private static String provUserPwd;
+ private static String defaultProducerPort;
+ private static String defaultConsumerPort;
+ private static String centralFqdn;
+
public MirrorMakerService() {
super();
-
decryptor = new AafDecrypt();
+ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+ provUser = p.getProperty("MM.ProvUserMechId");
+ provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+ defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092");
+ defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");
+ centralFqdn = p.getProperty("MR.CentralCname", "notSet");
}
// will create a MM on MMagent if needed
// will update the MMagent whitelist with all topics for this MM
public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
logger.info( "updateMirrorMaker");
- DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
- String provUser = p.getProperty("MM.ProvUserMechId");
- String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+
prov = new MrTopicConnection( provUser, provUserPwd );
-
- String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
-
+
DmaapService dmaap = new DmaapService();
MR_ClusterService clusters = new MR_ClusterService();
// but only send 1 message so MM Agents can read it relying on kafka delivery
for( MR_Cluster central: clusters.getCentralClusters() ) {
prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- ApiError resp = prov.doPostMessage(mm.createMirrorMaker());
+ ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
if ( ! resp.is2xx() ) {
errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
return mirrors.get(key);
}
- /*public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
- logger.info( "updateMirrorMaker");
- return mirrors.put( mm.getMmName(), mm);
- }
- */
public void delMirrorMaker( MirrorMaker mm ) {
logger.info( "delMirrorMaker");