Changes for configurable kafka ports
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / MirrorMakerService.java
index 29010b6..a73d981 100644 (file)
@@ -64,6 +64,9 @@ public class MirrorMakerService extends BaseLoggingClass {
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                String provUser = p.getProperty("MM.ProvUserMechId");
                String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+               String defaultProducerPort = p.getProperty( "MM.KafkaProducerPort", "9092");
+               String defaultConsumerPort = p.getProperty( "MM.KafkaConsumerPort", "2181");
+       
                prov = new MrTopicConnection( provUser, provUserPwd );
 
                String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
@@ -76,7 +79,7 @@ public class MirrorMakerService extends BaseLoggingClass {
                // but only send 1 message so MM Agents can read it relying on kafka delivery
                for( MR_Cluster central: clusters.getCentralClusters() ) {
                        prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn  );
-                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker());
+                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
                        if ( ! resp.is2xx() ) {
        
                                errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());