//import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
import org.onap.dmaap.dbcapi.aaf.AafDecrypt;
-import org.onap.dmaap.dbcapi.aaf.client.MrTopicConnection;
-import org.onap.dmaap.dbcapi.aaf.database.DatabaseClass;
+import org.onap.dmaap.dbcapi.client.MrTopicConnection;
+import org.onap.dmaap.dbcapi.database.DatabaseClass;
import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.model.ApiError;
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
String provUser = p.getProperty("MM.ProvUserMechId");
String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+ String defaultProducerPort = p.getProperty( "MM.KafkaProducerPort", "9092");
+ String defaultConsumerPort = p.getProperty( "MM.KafkaConsumerPort", "2181");
+
prov = new MrTopicConnection( provUser, provUserPwd );
String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
// but only send 1 message so MM Agents can read it relying on kafka delivery
for( MR_Cluster central: clusters.getCentralClusters() ) {
prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- ApiError resp = prov.doPostMessage(mm.createMirrorMaker());
+ ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
if ( ! resp.is2xx() ) {
errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());