From 27b65d2fadd596e224256ac68d76d67acc2603b7 Mon Sep 17 00:00:00 2001 From: dglFromAtt Date: Mon, 8 Apr 2019 17:54:15 -0400 Subject: [PATCH] Provision MM at target MR Change-Id: Ib4038bee3bbcb4ca0e0efff8266aad6b93c41040 Signed-off-by: dglFromAtt Issue-ID: DMAAP-1145 --- pom.xml | 2 +- .../dmaap/dbcapi/client/MrTopicConnection.java | 24 +++++++-- .../dmaap/dbcapi/service/MirrorMakerService.java | 61 +++++++++++++--------- .../onap/dmaap/dbcapi/service/TopicService.java | 9 ++-- version.properties | 2 +- 5 files changed, 65 insertions(+), 33 deletions(-) diff --git a/pom.xml b/pom.xml index 90cc026..6517cd1 100644 --- a/pom.xml +++ b/pom.xml @@ -387,7 +387,7 @@ UTF-8 9.4.12.RC2 1.0.0 - 1.0.25-SNAPSHOT + 1.0.26-SNAPSHOT 0.7.7.201606060606 3.2 diff --git a/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java b/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java index a92dbc7..28a9add 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java +++ b/src/main/java/org/onap/dmaap/dbcapi/client/MrTopicConnection.java @@ -31,8 +31,10 @@ import java.net.ProtocolException; import java.net.URL; import java.net.HttpURLConnection; +import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSession; import org.apache.commons.codec.binary.Base64; import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; @@ -49,13 +51,14 @@ public class MrTopicConnection extends BaseLoggingClass { private String mmProvCred; private String unit_test; private boolean useAAF; - + private boolean hostnameVerify; public MrTopicConnection(String user, String pwd ) { mmProvCred = new String( user + ":" + pwd ); DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); unit_test = p.getProperty( "UnitTest", "No" ); useAAF= "true".equalsIgnoreCase(p.getProperty("UseAAF", "false")); + hostnameVerify= "true".equalsIgnoreCase(p.getProperty("MR.hostnameVerify", "true")); } public boolean makeTopicConnection( MR_Cluster cluster, String topic, String overrideFqdn ) { @@ -71,13 +74,28 @@ public class MrTopicConnection extends BaseLoggingClass { return makeConnection( topicURL ); } + private boolean makeSecureConnection( String pURL ) { logger.info( "makeConnection to " + pURL ); - + try { + HostnameVerifier hostnameVerifier = new HostnameVerifier() { + @Override + public boolean verify( String hostname, SSLSession session ) { + return true; + } + + }; + + URL u = new URL( pURL ); - uc = (HttpsURLConnection) u.openConnection(); + uc = (HttpsURLConnection) u.openConnection(); uc.setInstanceFollowRedirects(false); + if ( ! hostnameVerify ) { + HttpsURLConnection ucs = (HttpsURLConnection) uc; + ucs.setHostnameVerifier(hostnameVerifier); + } + logger.info( "open connection to " + pURL ); return(true); } catch (Exception e) { diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java index da9d822..5d695f4 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java @@ -57,6 +57,7 @@ public class MirrorMakerService extends BaseLoggingClass { private static String defaultConsumerPort; private static String centralFqdn; private int maxTopicsPerMM; + private boolean mmPerMR; public MirrorMakerService() { super(); @@ -68,6 +69,7 @@ public class MirrorMakerService extends BaseLoggingClass { defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181"); centralFqdn = p.getProperty("MR.CentralCname", "notSet"); maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5")); + mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); } // will create a MM on MMagent if needed @@ -79,34 +81,43 @@ public class MirrorMakerService extends BaseLoggingClass { DmaapService dmaap = new DmaapService(); MR_ClusterService clusters = new MR_ClusterService(); - - // in 1610, MM should only exist for edge-to-central - // we use a cname for the central MR cluster that is active, and provision on agent topic on that target - // but only send 1 message so MM Agents can read it relying on kafka delivery - for( MR_Cluster central: clusters.getCentralClusters() ) { - prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort )); - if ( ! resp.is2xx() ) { - - errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage()); - mm.setStatus(DmaapObject_Status.INVALID); - } else { - prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn ); - resp = prov.doPostMessage(mm.getWhitelistUpdateJSON()); - if ( ! resp.is2xx()) { - errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage()); - mm.setStatus(DmaapObject_Status.INVALID); - } else { - mm.setStatus(DmaapObject_Status.VALID); - } - } - - // we only want to send one message even if there are multiple central clusters - break; + MR_Cluster target_cluster = null; + String override = null; - } + if ( ! mmPerMR ) { + // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility + // we use a cname for the central MR cluster that is active, and provision on agent topic on that target + // but only send 1 message so MM Agents can read it relying on kafka delivery + for( MR_Cluster cluster: clusters.getCentralClusters() ) { + + target_cluster = cluster; + override = centralFqdn; + // we only want to send one message even if there are multiple central clusters + break; + + } + } else { + // In ONAP deployment architecture, the MM Agent is deployed with each target MR + target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster()); + override = null; + } + prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override ); + ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort )); + if ( ! resp.is2xx() ) { + errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage()); + mm.setStatus(DmaapObject_Status.INVALID); + } else { + prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override ); + resp = prov.doPostMessage(mm.getWhitelistUpdateJSON()); + if ( ! resp.is2xx()) { + errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage()); + mm.setStatus(DmaapObject_Status.INVALID); + } else { + mm.setStatus(DmaapObject_Status.VALID); + } + } mm.setLastMod(); return mirrors.put( mm.getMmName(), mm); diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index c5937f4..a7991e8 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -70,6 +70,7 @@ public class TopicService extends BaseLoggingClass { private static String centralCname; private static boolean createTopicRoles; private boolean strictGraph = true; + private boolean mmPerMR; public TopicService(){ @@ -81,9 +82,11 @@ public class TopicService extends BaseLoggingClass { if ( unit_test.equals( "Yes" ) ) { strictGraph = false; } + mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost + - " createTopicRoles=" + createTopicRoles ); + " createTopicRoles=" + createTopicRoles + + " mmPerMR=" + mmPerMR ); } public Map getTopics() { @@ -451,11 +454,11 @@ public class TopicService extends BaseLoggingClass { case REPLICATION_EDGE_TO_CENTRAL: case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only source = cluster.getFqdn(); - target = centralCname; + target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname; break; case REPLICATION_CENTRAL_TO_EDGE: case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only - source = centralCname; + source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname; target = cluster.getFqdn(); break; case REPLICATION_CENTRAL_TO_GLOBAL: diff --git a/version.properties b/version.properties index 1eff060..8a9bf52 100644 --- a/version.properties +++ b/version.properties @@ -27,7 +27,7 @@ major=1 minor=0 -patch=25 +patch=26 base_version=${major}.${minor}.${patch} # Release must be completed with git revision # in Jenkins -- 2.16.6