Handling of MirrorMaker properties
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / MirrorMakerService.java
index 6701328..413590f 100644 (file)
@@ -34,8 +34,8 @@ import java.util.Map;
 
 //import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
 import org.onap.dmaap.dbcapi.aaf.AafDecrypt;
-import org.onap.dmaap.dbcapi.aaf.client.MrTopicConnection;
-import org.onap.dmaap.dbcapi.aaf.database.DatabaseClass;
+import org.onap.dmaap.dbcapi.client.MrTopicConnection;
+import org.onap.dmaap.dbcapi.database.DatabaseClass;
 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
 import org.onap.dmaap.dbcapi.model.ApiError;
@@ -51,23 +51,30 @@ public class MirrorMakerService extends BaseLoggingClass {
        private static MrTopicConnection prov;
        private static AafDecrypt decryptor;
        
+       private static String provUser;
+       private static String provUserPwd;
+       private static String defaultProducerPort;
+       private static String defaultConsumerPort;
+       private static String centralFqdn;
+       
        public MirrorMakerService() {
                super();
-               
                decryptor = new AafDecrypt();
+               DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+               provUser = p.getProperty("MM.ProvUserMechId");
+               provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+               defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092");
+               defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");       
+               centralFqdn = p.getProperty("MR.CentralCname", "notSet");
        }
 
        // will create a MM on MMagent if needed
        // will update the MMagent whitelist with all topics for this MM
        public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
                logger.info( "updateMirrorMaker");
-               DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
-               String provUser = p.getProperty("MM.ProvUserMechId");
-               String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+       
                prov = new MrTopicConnection( provUser, provUserPwd );
-
-               String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
-               
+       
                DmaapService dmaap = new DmaapService();
                MR_ClusterService clusters = new MR_ClusterService();
        
@@ -76,7 +83,7 @@ public class MirrorMakerService extends BaseLoggingClass {
                // but only send 1 message so MM Agents can read it relying on kafka delivery
                for( MR_Cluster central: clusters.getCentralClusters() ) {
                        prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn  );
-                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker());
+                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
                        if ( ! resp.is2xx() ) {
        
                                errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
@@ -111,11 +118,6 @@ public class MirrorMakerService extends BaseLoggingClass {
                return mirrors.get(key);
        }
        
-       /*public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
-               logger.info( "updateMirrorMaker");
-               return mirrors.put( mm.getMmName(), mm);
-       }
-       */
        
        public void delMirrorMaker( MirrorMaker mm ) {
                logger.info( "delMirrorMaker");