Provision MM at target MR 70/84570/1
authordglFromAtt <dgl@research.att.com>
Mon, 8 Apr 2019 21:54:15 +0000 (17:54 -0400)
committerdglFromAtt <dgl@research.att.com>
Mon, 8 Apr 2019 21:54:22 +0000 (17:54 -0400)
Change-Id: Ib4038bee3bbcb4ca0e0efff8266aad6b93c41040
Signed-off-by: dglFromAtt <dgl@research.att.com>
Issue-ID: DMAAP-1145

pom.xml
src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java
src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
version.properties

diff --git a/pom.xml b/pom.xml
index 90cc026..6517cd1 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <jettyVersion>9.4.12.RC2</jettyVersion> 
                <eelf.version>1.0.0</eelf.version>
-               <artifact.version>1.0.25-SNAPSHOT</artifact.version>
+               <artifact.version>1.0.26-SNAPSHOT</artifact.version>
                <!-- SONAR -->
                <jacoco.version>0.7.7.201606060606</jacoco.version>
                <sonar-jacoco-listeners.version>3.2</sonar-jacoco-listeners.version>
index a92dbc7..28a9add 100644 (file)
@@ -31,8 +31,10 @@ import java.net.ProtocolException;
 import java.net.URL;
 import java.net.HttpURLConnection;
 
+import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSession;
 
 import org.apache.commons.codec.binary.Base64;
 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
@@ -49,13 +51,14 @@ public class MrTopicConnection extends BaseLoggingClass  {
        private  String mmProvCred; 
        private String unit_test;
        private boolean useAAF;
-
+       private boolean hostnameVerify;
 
        public MrTopicConnection(String user, String pwd ) {
                mmProvCred = new String( user + ":" + pwd );
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
         unit_test = p.getProperty( "UnitTest", "No" );
        useAAF= "true".equalsIgnoreCase(p.getProperty("UseAAF", "false"));
+       hostnameVerify= "true".equalsIgnoreCase(p.getProperty("MR.hostnameVerify", "true"));
        }
        
        public boolean makeTopicConnection( MR_Cluster cluster, String topic, String overrideFqdn ) {
@@ -71,13 +74,28 @@ public class MrTopicConnection extends BaseLoggingClass  {
                return makeConnection( topicURL );
        }
 
+       
        private boolean makeSecureConnection( String pURL ) {
                logger.info( "makeConnection to " + pURL );
-       
+               
                try {
+                       HostnameVerifier hostnameVerifier = new HostnameVerifier() {
+                               @Override
+                               public boolean verify( String hostname, SSLSession session ) {
+                                       return true;
+                               }
+                       
+                       };
+       
+               
                        URL u = new URL( pURL );
-                       uc = (HttpsURLConnection) u.openConnection();
+                       uc = (HttpsURLConnection) u.openConnection();                   
                        uc.setInstanceFollowRedirects(false);
+                       if ( ! hostnameVerify ) {
+                               HttpsURLConnection ucs = (HttpsURLConnection) uc;
+                               ucs.setHostnameVerifier(hostnameVerifier);
+                       }
+       
                        logger.info( "open connection to " + pURL );
                        return(true);
                } catch (Exception e) {
index da9d822..5d695f4 100644 (file)
@@ -57,6 +57,7 @@ public class MirrorMakerService extends BaseLoggingClass {
        private static String defaultConsumerPort;
        private static String centralFqdn;
        private int maxTopicsPerMM;
+       private boolean mmPerMR;
        
        public MirrorMakerService() {
                super();
@@ -68,6 +69,7 @@ public class MirrorMakerService extends BaseLoggingClass {
                defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");       
                centralFqdn = p.getProperty("MR.CentralCname", "notSet");
                maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
+               mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
        }
 
        // will create a MM on MMagent if needed
@@ -79,34 +81,43 @@ public class MirrorMakerService extends BaseLoggingClass {
        
                DmaapService dmaap = new DmaapService();
                MR_ClusterService clusters = new MR_ClusterService();
-       
-               // in 1610, MM should only exist for edge-to-central
-               //  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
-               // but only send 1 message so MM Agents can read it relying on kafka delivery
-               for( MR_Cluster central: clusters.getCentralClusters() ) {
-                       prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn  );
-                       ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
-                       if ( ! resp.is2xx() ) {
-       
-                               errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
-                               mm.setStatus(DmaapObject_Status.INVALID);
-                       } else {
-                               prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
-                               resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
-                               if ( ! resp.is2xx()) {
-                                       errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
-                                       mm.setStatus(DmaapObject_Status.INVALID);
-                               } else {
-                                       mm.setStatus(DmaapObject_Status.VALID);
-                               }
-                       }
-                       
-                       // we only want to send one message even if there are multiple central clusters
-                       break;
+               MR_Cluster target_cluster = null;
+               String override = null;
                
-               } 
+               if ( ! mmPerMR ) {
+                       // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility
+                       //  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
+                       // but only send 1 message so MM Agents can read it relying on kafka delivery
+                       for( MR_Cluster cluster: clusters.getCentralClusters() ) {
+
+                               target_cluster = cluster;
+                               override = centralFqdn;
+                               // we only want to send one message even if there are multiple central clusters
+                               break;
+                       
+                       } 
+               } else {
+                       // In ONAP deployment architecture, the MM Agent is deployed with each target MR
+                       target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
+                       override = null;
+               }
                
+               prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override  );
+               ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
+               if ( ! resp.is2xx() ) {
 
+                       errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
+                       mm.setStatus(DmaapObject_Status.INVALID);
+               } else {
+                       prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
+                       resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
+                       if ( ! resp.is2xx()) {
+                               errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
+                               mm.setStatus(DmaapObject_Status.INVALID);
+                       } else {
+                               mm.setStatus(DmaapObject_Status.VALID);
+                       }
+               }
 
                mm.setLastMod();
                return mirrors.put( mm.getMmName(), mm);
index c5937f4..a7991e8 100644 (file)
@@ -70,6 +70,7 @@ public class TopicService extends BaseLoggingClass {
        private static String centralCname;
        private static boolean createTopicRoles;
        private boolean strictGraph = true;
+       private boolean mmPerMR;
 
 
        public TopicService(){
@@ -81,9 +82,11 @@ public class TopicService extends BaseLoggingClass {
                if ( unit_test.equals( "Yes" ) ) {
                        strictGraph = false;
                }
+               mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
                logger.info( "TopicService properties: CentralCname=" + centralCname + 
                                "   defaultGlobarlMrHost=" + defaultGlobalMrHost +
-                               " createTopicRoles=" + createTopicRoles );
+                               " createTopicRoles=" + createTopicRoles +
+                               " mmPerMR=" + mmPerMR );
        }
        
        public Map<String, Topic> getTopics() {                 
@@ -451,11 +454,11 @@ public class TopicService extends BaseLoggingClass {
                                case REPLICATION_EDGE_TO_CENTRAL:
                                case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
                                        source = cluster.getFqdn();
-                                       target = centralCname;
+                                       target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname;
                                        break;
                                case REPLICATION_CENTRAL_TO_EDGE:
                                case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
-                                       source = centralCname;
+                                       source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
                                        target = cluster.getFqdn();
                                        break;
                                case REPLICATION_CENTRAL_TO_GLOBAL:
index 1eff060..8a9bf52 100644 (file)
@@ -27,7 +27,7 @@
 
 major=1
 minor=0
-patch=25
+patch=26
 base_version=${major}.${minor}.${patch}
 
 # Release must be completed with git revision # in Jenkins