Alternative MR replication method 01/57601/1
authordglFromAtt <dgl@research.att.com>
Wed, 25 Jul 2018 20:29:35 +0000 (16:29 -0400)
committerdglFromAtt <dgl@research.att.com>
Wed, 25 Jul 2018 20:31:25 +0000 (16:31 -0400)
This introduces a new set of replication rules for topics:
REPLICATION_EDGE_TO_FQDN
REPLICATION_FQDN_TO_EDGE
REPLICATION_EDFE_TO_FQDN_TO_GLOBAL
REPLICTION_GLOBAL_TO_FQDN_TO_EDGE

This will allow for some apps to design their own message replication
path for resiliency because it will use the FQDN of the cluster
instead of the using the CNAME.

Also contains:
- introduce (optional) replicationGroup field to mr_cluster
- some cleanup of unused code.

Change-Id: I063fb343af3f7bc17c88190563920afd84737ada
Signed-off-by: dglFromAtt <dgl@research.att.com>
Issue-ID: DMAAP-542

17 files changed:
pom.xml
src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java
src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java
src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java
src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java [deleted file]
src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java
src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java
src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java
src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java
src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
src/main/java/org/onap/dmaap/dbcapi/util/Graph.java
src/main/resources/schema_9.sql [new file with mode: 0644]
src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
src/test/java/org/onap/dmaap/dbcapi/model/ReplicationVectorTest.java [deleted file]
src/test/java/org/onap/dmaap/dbcapi/util/GraphTest.java
version.properties

diff --git a/pom.xml b/pom.xml
index 4855f75..c3f39b2 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <jettyVersion>9.3.9.v20160517</jettyVersion> 
                <eelf.version>0.0.1</eelf.version>
-               <artifact.version>1.0.10-SNAPSHOT</artifact.version>
+               <artifact.version>1.0.11-SNAPSHOT</artifact.version>
                <!-- SONAR -->
                <jacoco.version>0.7.7.201606060606</jacoco.version>
                <sonar-jacoco-listeners.version>3.2</sonar-jacoco-listeners.version>
index 9b7c8ff..f0a5582 100644 (file)
@@ -47,40 +47,8 @@ public class DatabaseClass extends BaseLoggingClass {
        
        private static long lastTime = 0L;
        
-       private static class MirrorVectorHandler implements DBFieldHandler.SqlOp {
-               public Object get(ResultSet rs, int index) throws Exception {
-                       String val = rs.getString(index);
-                       if (val == null) {
-                               return(null);
-                       }
-                       Set<ReplicationVector> rv = new HashSet<ReplicationVector>();
-                       for (String s: val.split(",")) {
-                               String[] f = s.split(";");
-                               if (f.length < 3) {
-                                       continue;
-                               }
-                               rv.add(new ReplicationVector(DBFieldHandler.funesc(f[0]), DBFieldHandler.funesc(f[1]), DBFieldHandler.funesc(f[2])));
-                       }
-                       return(rv);
-               }
-               public void set(PreparedStatement ps, int index, Object val) throws Exception {
-                       if (val == null) {
-                               ps.setString(index, null);
-                               return;
-                       }
-                       Set xv = (Set)val;
-                       StringBuffer sb = new StringBuffer();
-                       String sep = "";
-                       for (Object o: xv) {
-                               ReplicationVector rv = (ReplicationVector)o;
-                               sb.append(sep).append(DBFieldHandler.fesc(rv.getFqtn())).append(';').append(DBFieldHandler.fesc(rv.getSourceCluster())).append(';').append(DBFieldHandler.fesc(rv.getTargetCluster()));
-                               sep = ",";
-                       }
-                       ps.setString(index, sb.toString());
-               }
-       }
 
-       // modified version of MirrorVectorHandler for Topics
+
        private static class MirrorTopicsHandler implements DBFieldHandler.SqlOp {
                public Object get(ResultSet rs, int index) throws Exception {
                        String val = rs.getString(index);
@@ -192,8 +160,7 @@ public class DatabaseClass extends BaseLoggingClass {
                                mr_clusters = new DBMap<MR_Cluster>(MR_Cluster.class, "mr_cluster", "dcae_location_name");
                                feeds = new DBMap<Feed>(Feed.class, "feed", "feed_id");
                                TableHandler.setSpecialCase("topic", "replication_case", new TopicReplicationTypeHandler());
-                               topics = new DBMap<Topic>(Topic.class, "topic", "fqtn");
-                               //TableHandler.setSpecialCase("mirror_maker", "vectors", new MirrorVectorHandler());
+                               topics = new DBMap<Topic>(Topic.class, "topic", "fqtn");                        
                                TableHandler.setSpecialCase("mirror_maker", "topics", new MirrorTopicsHandler());
                                mirrors = new DBMap<MirrorMaker>(MirrorMaker.class, "mirror_maker", "mm_name");
                        } catch (Exception e) {
index 66539ed..c83b6be 100644 (file)
 
 package org.onap.dmaap.dbcapi.database;
 
-import java.util.*;
 import java.lang.reflect.*;
-import java.sql.*;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
 
 class TableHandler<C>  {
        protected ConnectionFactory cf;
index 9f6f402..6447123 100644 (file)
@@ -36,7 +36,6 @@ public class MirrorMaker extends DmaapObject {
        private String  mmName;
        private ArrayList<String> topics;  //re-using this var name for backwards DB compatibility
        
-       private Set<ReplicationVector> vectors;
 
        
        public MirrorMaker(){
@@ -47,7 +46,6 @@ public class MirrorMaker extends DmaapObject {
                sourceCluster = source;
                targetCluster = target;
                mmName = genKey(source, target);
-               vectors = new HashSet<ReplicationVector>();
                topics = new ArrayList<String>();
 
        }
@@ -61,36 +59,6 @@ public class MirrorMaker extends DmaapObject {
        }
 
        
-       public void addVector( String fqtn, String source, String target ) {
-               logger.info( "addVector: fqtn=" + fqtn + " source=" + source + " target=" + target );
-               if ( ! sourceCluster.equals( source ) ){
-                       errorLogger.error( DmaapbcLogMessageEnum.MM_CIRCULAR_REF,  source,  sourceCluster );
-               }
-               vectors.add(new ReplicationVector( fqtn, source, target ));
-       }
-       
-       public void delVector( String fqtn, String source, String target ) {
-               vectors.remove(new ReplicationVector( fqtn, source, target));
-       }
-
-       
-       
-       public String toJSON() {
-               StringBuilder str = new StringBuilder( "{ \"source\": " + sourceCluster + ",\"topics\": ["  );
-               int numTargets = 0;
-               for (ReplicationVector rv: vectors) {
-                       if ( numTargets > 0 ) {
-                               str.append( ",");
-                       }
-                       str.append( " \"target\": " + rv.getTargetCluster() + ", \"topic\": " + rv.getFqtn());
-                       numTargets++;
-               }
-               str.append( "] }" );
-               
-               return str.toString();
-       }
-               
-       
        // returns the JSON for MM message containing which Topics to replicate
        /* 
         * example:
@@ -166,18 +134,11 @@ public class MirrorMaker extends DmaapObject {
        }
 
 
-       public Set<ReplicationVector> getVectors() {
-               return vectors;
-       }
-
-       public void setVectors(Set<ReplicationVector> vectors) {
-               this.vectors = vectors;
-       }
        public ArrayList<String> getTopics() {
                return topics;
        }
 
-       //public void setVectors(Set<ReplicationVector> vectors) {
+       
        public void setTopics(ArrayList<String> topics) {
                this.topics = topics;
        }
index 49b93a6..5d5b6c6 100644 (file)
@@ -34,7 +34,13 @@ public enum ReplicationType {
        REPLICATION_CENTRAL_TO_EDGE(20),
        REPLICATION_CENTRAL_TO_GLOBAL(21),
        REPLICATION_GLOBAL_TO_CENTRAL(30),
-       REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE(120);
+       REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE(120),
+       REPLICATION_EDGE_TO_FQDN(40),
+       REPLICATION_FQDN_TO_EDGE(41),
+       REPLICATION_FQDN_TO_GLOBAL(50),
+       REPLICATION_GLOBAL_TO_FQDN(51),
+       REPLICATION_EDGE_TO_FQDN_TO_GLOBAL(130),
+       REPLICATION_GLOBAL_TO_FQDN_TO_EDGE (140);
 
     private int value;
     private static Map map = new HashMap<>();
@@ -69,10 +75,30 @@ public enum ReplicationType {
        }
 
        public boolean involvesGlobal() {
-               if ( this.compareTo(REPLICATION_CENTRAL_TO_GLOBAL) == 0 ||
-                               this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL) == 0 ||
-                               this.compareTo(REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL) == 0 ||
-                               this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE) == 0) {
+       
+               
+               if ( ( this.compareTo(REPLICATION_CENTRAL_TO_GLOBAL) == 0 ) ||
+                        ( this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL) == 0 ) ||
+                        ( this.compareTo(REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL) == 0 ) ||
+                        ( this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE) == 0 ) ||
+                        ( this.compareTo(REPLICATION_EDGE_TO_FQDN_TO_GLOBAL) == 0 ) ||
+                        ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN_TO_EDGE) == 0 ) ||
+                        ( this.compareTo(REPLICATION_FQDN_TO_GLOBAL) == 0 ) ||
+                        ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN) == 0 ) ) {
+                       return true;
+               }
+               return false;
+       }
+       
+       public boolean involvesFQDN() {
+               if ( 
+                               ( this.compareTo(REPLICATION_EDGE_TO_FQDN) == 0 ) ||
+                               ( this.compareTo(REPLICATION_EDGE_TO_FQDN_TO_GLOBAL) == 0 ) ||
+                               ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN_TO_EDGE) == 0 ) ||
+                               ( this.compareTo(REPLICATION_FQDN_TO_GLOBAL) == 0 ) ||
+                               ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN) == 0 ) ||
+                               ( this.compareTo(REPLICATION_FQDN_TO_EDGE) == 0 ) 
+                               ) {
                        return true;
                }
                return false;
diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java b/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java
deleted file mode 100644 (file)
index add3998..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dmaap.dbcapi.model;
-
-public class ReplicationVector {
-       
-       public enum ReplicationVector_Status {
-               EMPTY,
-               NEW,
-               STAGED,
-               VALID,
-               INVALID,
-               INVALID_DUP,
-               DELETED
-       }
-
-       String  fqtn;
-       String  sourceCluster;
-       String  targetCluster;
-       ReplicationVector_Status status;
-       
-       public ReplicationVector(){
-               
-       }
-       
-       public ReplicationVector(String fqtn, String sourceCluster,
-                       String targetCluster) {
-               super();
-               this.fqtn = fqtn;
-               this.sourceCluster = sourceCluster;
-               this.targetCluster = targetCluster;
-       }
-
-       public String getFqtn() {
-               return fqtn;
-       }
-
-       public void setFqtn(String fqtn) {
-               this.fqtn = fqtn;
-       }
-
-       public String getSourceCluster() {
-               return sourceCluster;
-       }
-
-       public void setSourceCluster(String sourceCluster) {
-               this.sourceCluster = sourceCluster;
-       }
-
-       public String getTargetCluster() {
-               return targetCluster;
-       }
-
-       public void setTargetCluster(String targetCluster) {
-               this.targetCluster = targetCluster;
-       }
-       
-       public int hashCode() {
-               StringBuilder tmp = new StringBuilder( this.fqtn );
-               tmp.append(this.sourceCluster);
-               tmp.append(this.targetCluster);
-               
-               return tmp.toString().hashCode();
-       }
-       private static boolean xeq(String s1, String s2) {
-               if (s1 == null) {
-                       return(s2 == null);
-               } else {
-                       return(s1.equals(s2));
-               }
-       }
-       public boolean equals(Object o) {
-               if (o == this) {
-                       return(true);
-               }
-               if (!(o instanceof ReplicationVector)) {
-                       return(false);
-               }
-               ReplicationVector x = (ReplicationVector)o;
-               return(xeq(fqtn, x.fqtn) && xeq(sourceCluster, x.sourceCluster) && xeq(targetCluster, x.targetCluster));
-       }
-}
index 1a6310c..aab1cac 100644 (file)
 
 package org.onap.dmaap.dbcapi.resources;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-
 import java.util.List;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
 import javax.ws.rs.PUT;
-import javax.ws.rs.HeaderParam;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriInfo;
 import javax.ws.rs.core.Response.Status;
 
-import org.onap.dmaap.dbcapi.authentication.AuthenticationErrorException;
 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
-import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
 import org.onap.dmaap.dbcapi.model.ApiError;
 import org.onap.dmaap.dbcapi.model.BrTopic;
-import org.onap.dmaap.dbcapi.model.DcaeLocation;
 import org.onap.dmaap.dbcapi.model.Dmaap;
 import org.onap.dmaap.dbcapi.model.MirrorMaker;
 import org.onap.dmaap.dbcapi.service.ApiService;
 import org.onap.dmaap.dbcapi.service.MirrorMakerService;
 
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
 @Path("/bridge")
 @Api( value= "bridge", description = "Endpoint for retreiving MR Bridge metrics" )
 @Consumes(MediaType.APPLICATION_JSON)
@@ -77,7 +71,7 @@ public class BridgeResource extends BaseLoggingClass {
                        BrTopic brTopic = new BrTopic();
                        
                        logger.info( "getBridgeTopics():" + " source=" + source + ", target=" + target);
-       //              System.out.println("getBridgedTopics() " + "source=" + source + ", target=" + target );
+       
                        if (source != null && target != null) {         // get topics between 2 bridged locations
                                brTopic.setBrSource(source);
                                brTopic.setBrTarget(target);
index 8221813..95f9d33 100644 (file)
@@ -120,7 +120,7 @@ public class TopicResource extends BaseLoggingClass {
                topic.setLastMod();
                
                Topic mrc =  mr_topicService.addTopic(topic, check.getErr());
-               if ( mrc != null && mrc.isStatusValid() ) {
+               if ( mrc != null && check.getErr().is2xx() ) {
                        return check.success(Status.CREATED.getStatusCode(), mrc);
                }
                return check.error();
index 40f86b6..5bd62cb 100644 (file)
@@ -57,10 +57,12 @@ public class MR_ClientService extends BaseLoggingClass{
        private Map<String, Topic> topics = DatabaseClass.getTopics();
        private Map<String, DcaeLocation> locations = DatabaseClass.getDcaeLocations();
        private DmaapService dmaap = new DmaapService();
+       private String centralCname;
        
        public MR_ClientService() {
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
-               
+       
+               centralCname = p.getProperty("MR.CentralCname", "MRcname.not.set");
                deleteLevel = Integer.valueOf(p.getProperty("MR.ClientDeleteLevel", "0" ));
        }
        
@@ -132,12 +134,12 @@ public class MR_ClientService extends BaseLoggingClass{
                }
                String centralFqdn = null;
                DcaeLocation candidate = locations.get(client.getDcaeLocationName());
-               if ( candidate != null && candidate.isCentral() ) {
-                       DmaapConfig p = ( DmaapConfig)DmaapConfig.getConfig();
-                       centralFqdn = p.getProperty("MR.CentralCname");
-               }
+
                MR_Cluster cluster = clusters.get( client.getDcaeLocationName());
-               if (  cluster != null ) {
+               if (  cluster != null && candidate != null ) {
+                       if ( candidate.isCentral() && ! topic.getReplicationCase().involvesFQDN() ) {
+                               centralFqdn = centralCname;
+                       }
                        client.setTopicURL(cluster.genTopicURL(centralFqdn, client.getFqtn()));
                        if ( centralFqdn == null ) {
                                client.setStatus( addTopicToCluster( cluster, topic, err));
@@ -148,8 +150,8 @@ public class MR_ClientService extends BaseLoggingClass{
                        
                        } else {
                                MR_ClusterService clusters = new MR_ClusterService();   
-                               // in 1610, MM should only exist for edge-to-central
-                               //  we use a cname for the central target
+                               //  MM should only exist for edge-to-central
+                               //  we use a cname for the central target (default resiliency with no replicationGroup set)
                                // but still need to provision topics on all central MRs
                                for( MR_Cluster central: clusters.getCentralClusters() ) {
                                        client.setStatus( addTopicToCluster( central, topic, err));
@@ -161,7 +163,7 @@ public class MR_ClientService extends BaseLoggingClass{
                        }
                        
                } else {
-                       logger.info( "Client references a dcaeLocation that doesn't exist:" + client.getDcaeLocationName());
+                       logger.warn( "Client references a dcaeLocation that doesn't exist:" + client.getDcaeLocationName());
                        client.setStatus( DmaapObject_Status.STAGED);
                        //return null;
                }
index ed57279..e2c661b 100644 (file)
 package org.onap.dmaap.dbcapi.service;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.ws.rs.core.Response.Status;
 
@@ -32,6 +34,7 @@ import org.onap.dmaap.dbcapi.model.ApiError;
 import org.onap.dmaap.dbcapi.model.DcaeLocation;
 import org.onap.dmaap.dbcapi.model.MR_Cluster;
 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
+import org.onap.dmaap.dbcapi.service.DcaeLocationService;
 import org.onap.dmaap.dbcapi.util.DmaapConfig;
 
 public class MR_ClusterService extends BaseLoggingClass {
@@ -73,6 +76,10 @@ public class MR_ClusterService extends BaseLoggingClass {
                return null;
        }
        
+       public MR_Cluster getMr_ClusterByLoc( String loc ) {
+               return mr_clusters.get( loc );
+       }
+       
        public List<MR_Cluster> getCentralClusters() {
                DcaeLocationService locations = new DcaeLocationService();
                List<MR_Cluster> result = new ArrayList<MR_Cluster>();
@@ -87,6 +94,20 @@ public class MR_ClusterService extends BaseLoggingClass {
                }
                return result;
        }       
+       
+       // builds the set of unique cluster groups
+       public Set<String> getGroups() {
+               Set<String> result = new HashSet<String>();
+               for( MR_Cluster c: mr_clusters.values() ) {
+                       try {
+                               result.add(c.getReplicationGroup());
+                       } catch ( NullPointerException npe ) {
+                               logger.warn( "Failed to add Group for cluster:" + c.getDcaeLocationName() );
+                       }
+               }
+               return result;
+       }       
+
 
        public MR_Cluster addMr_Cluster( MR_Cluster cluster, ApiError apiError ) {
                logger.info( "Entry: addMr_Cluster");
index eed5022..3943419 100644 (file)
@@ -25,23 +25,25 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.ws.rs.core.Response.Status;
 
 import org.onap.dmaap.dbcapi.aaf.AafService;
-import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
+import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
 import org.onap.dmaap.dbcapi.database.DatabaseClass;
 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
 import org.onap.dmaap.dbcapi.model.ApiError;
+import org.onap.dmaap.dbcapi.model.DcaeLocation;
 import org.onap.dmaap.dbcapi.model.Dmaap;
+import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
 import org.onap.dmaap.dbcapi.model.MR_Client;
 import org.onap.dmaap.dbcapi.model.MR_Cluster;
 import org.onap.dmaap.dbcapi.model.MirrorMaker;
 import org.onap.dmaap.dbcapi.model.ReplicationType;
 import org.onap.dmaap.dbcapi.model.Topic;
-import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
 import org.onap.dmaap.dbcapi.util.DmaapConfig;
 import org.onap.dmaap.dbcapi.util.Fqdn;
 import org.onap.dmaap.dbcapi.util.Graph;
@@ -54,16 +56,21 @@ public class TopicService extends BaseLoggingClass {
        private static String defaultGlobalMrHost;
        
        private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
-       private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters();
        
        private static DmaapService dmaapSvc = new DmaapService();
        private static Dmaap dmaap = new DmaapService().getDmaap();
        private MR_ClientService clientService = new MR_ClientService();
+       private MR_ClusterService clusters = new MR_ClusterService();
+       private DcaeLocationService locations = new DcaeLocationService();
        private MirrorMakerService      bridge = new MirrorMakerService();
+       
+       private static String centralCname;
 
        public TopicService(){
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
+               centralCname = p.getProperty("MR.CentralCname");
+               logger.info( "TopicService properties: CentralCname=" + centralCname + "   defaultGlobarlMrHost=" + defaultGlobalMrHost );
        }
        
        public Map<String, Topic> getTopics() {                 
@@ -165,9 +172,12 @@ public class TopicService extends BaseLoggingClass {
                Topic ntopic = checkForBridge( topic, err );
                if ( ntopic == null ) {
                        topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
+                       if ( ! err.is2xx()) {
+                               return null;
+                       }
                }
 
+               
                mr_topics.put( nFqtn, ntopic );
 
                err.setCode(Status.OK.getStatusCode());
@@ -183,7 +193,9 @@ public class TopicService extends BaseLoggingClass {
                Topic ntopic = checkForBridge( topic, err );
                if ( ntopic == null ) {
                        topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
+                       if ( ! err.is2xx() ) {
+                               return null;
+                       }
                }
                mr_topics.put( ntopic.getFqtn(), ntopic );
                err.setCode(Status.OK.getStatusCode());
@@ -252,63 +264,147 @@ public class TopicService extends BaseLoggingClass {
                        return topic;   
                }
                
-               boolean anythingWrong = false;                          
-               String centralFqdn = new String();
-               Graph graph = new Graph( topic.getClients(), true );
+               boolean anythingWrong = false;
+               
+               Set<String> groups = clusters.getGroups();
+               for ( String g : groups ) {
+                       anythingWrong |= buildBridge( topic, err, g );
+               }
+               if ( anythingWrong ) {
+                       topic.setStatus( DmaapObject_Status.INVALID);
+                       if ( ! err.is2xx() ) {
+                               return null;
+                       }       
+               } else {
+                       topic.setStatus( DmaapObject_Status.VALID);
+               }
+               return topic;
+       }
+               
+       private boolean buildBridge( Topic topic, ApiError err, String group ) {
+
+               boolean anythingWrong = false;
+               Graph graph;
+               if ( group == null || group.isEmpty() ) {
+                       graph = new Graph( topic.getClients(), true );
+               } else {
+                       graph = new Graph( topic.getClients(), true, group );
+               }
+               MR_Cluster groupCentralCluster = null;
                
-               if ( graph.isHasCentral() ) {
-                       DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
-                       centralFqdn = p.getProperty("MR.CentralCname");
-                       logger.info( "CentralCname=" + centralFqdn );
+               if ( graph.isEmpty() ) {
+                       return false;
+               } else if ( group == null &&  topic.getReplicationCase().involvesFQDN() ) {
+                       return false;
+               } else if ( ! graph.hasCentral() ) {
+                       logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
+                       return true;
                } else {
-                       logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients");
+                       groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
                }
-               Collection<String> locations = graph.getKeys();
-               for( String loc : locations ) {
+               Collection<String> clientLocations = graph.getKeys();
+               for( String loc : clientLocations ) {
                        logger.info( "loc=" + loc );
-                       MR_Cluster cluster = clusters.get(loc);
+                       DcaeLocation location = locations.getDcaeLocation(loc);
+                       MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
                        logger.info( "cluster=" + cluster );
 
                        
                                
                        String source = null;
                        String target = null;
+                       
                        /*
-                        * all replication rules have 1 bridge...
+                        * Provision Edge to Central bridges...
                         */
-                       switch( topic.getReplicationCase() ) {
-                       case REPLICATION_EDGE_TO_CENTRAL:
-                       case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                       if ( ! location.isCentral()  && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+                               switch( topic.getReplicationCase() ) {
+                               case REPLICATION_EDGE_TO_CENTRAL:
+                               case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
+                                       source = cluster.getFqdn();
+                                       target = centralCname;
                                        break;
-                               }
-                               source = cluster.getFqdn();
-                               target = centralFqdn;
-                               break;
-                       case REPLICATION_CENTRAL_TO_EDGE:
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = cluster.getFqdn();
-                               break;
-                       case REPLICATION_CENTRAL_TO_GLOBAL:
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                               case REPLICATION_CENTRAL_TO_EDGE:
+                               case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
+                                       source = centralCname;
+                                       target = cluster.getFqdn();
+                                       break;
+                               case REPLICATION_CENTRAL_TO_GLOBAL:
+                               case REPLICATION_GLOBAL_TO_CENTRAL:
+                               case REPLICATION_FQDN_TO_GLOBAL:
+                               case REPLICATION_GLOBAL_TO_FQDN:
+                                       break;
+
+                               case REPLICATION_EDGE_TO_FQDN:
+                               case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2C portion only
+                                       source = cluster.getFqdn();
+                                       target = groupCentralCluster.getFqdn();
+                                       break;
+                               case REPLICATION_FQDN_TO_EDGE:
+                               case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for F2E portion only
+                                       source = groupCentralCluster.getFqdn();
+                                       target = cluster.getFqdn();
+                                       break;
+
+                               default:
+                                       logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                                       anythingWrong = true;
+                                       err.setCode(400);
+                                       err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+                                                       + topic.getReplicationCase() );
+                                       err.setMessage("Unexpected value for ReplicationType");
                                        continue;
                                }
-                               source = centralFqdn;
-                               target = topic.getGlobalMrURL();
-                               break;
-                       case REPLICATION_GLOBAL_TO_CENTRAL:
-                       case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+
+                       } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+                               /*
+                                * Provision Central to Global bridges
+                                */
+                               switch( topic.getReplicationCase() ) {
+
+                               case REPLICATION_CENTRAL_TO_GLOBAL:
+                               case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
+                                       source = centralCname;
+                                       target = topic.getGlobalMrURL();
+                                       break;
+                               case REPLICATION_GLOBAL_TO_CENTRAL:
+                               case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
+                                       source = topic.getGlobalMrURL();
+                                       target = centralCname;
+                                       break;
+
+                               case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2F portion only
+                                       source = groupCentralCluster.getFqdn();
+                                       target = topic.getGlobalMrURL();
+                                       break;
+
+                               case REPLICATION_FQDN_TO_GLOBAL:
+                                       source = groupCentralCluster.getFqdn();
+                                       target = topic.getGlobalMrURL();
+                                       break;
+                                       
+                               case REPLICATION_GLOBAL_TO_FQDN:
+                               case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for G2F portion only
+                                       source = topic.getGlobalMrURL();
+                                       target = groupCentralCluster.getFqdn();
+                                       break;
+
+                               case REPLICATION_FQDN_TO_EDGE:
+                               case REPLICATION_EDGE_TO_FQDN:
+                               case REPLICATION_EDGE_TO_CENTRAL:
+                               case REPLICATION_CENTRAL_TO_EDGE:
+                                       break;
+                               default:
+                                       logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                                       anythingWrong = true;
+                                       err.setCode(400);
+                                       err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+                                                       + topic.getReplicationCase() );
+                                       err.setMessage("Unexpected value for ReplicationType");
                                        continue;
-                               }
-                               source = topic.getGlobalMrURL();
-                               target = centralFqdn;
-                               break;
-                       default:
-                               logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+                               }                               
+                       } else {
+                               logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
                                anythingWrong = true;
                                continue;
                        }
@@ -328,65 +424,12 @@ public class TopicService extends BaseLoggingClass {
                                        anythingWrong = true;
                                        break;
                                }
-                       }
-                       
-                       
-                       /*
-                        * some replication rules have a 2nd bridge!
-                        */
-                       source = target = null;
-                       switch( topic.getReplicationCase() ) {
-                       case REPLICATION_EDGE_TO_CENTRAL:
-                       case REPLICATION_CENTRAL_TO_EDGE:
-                       case REPLICATION_CENTRAL_TO_GLOBAL:
-                       case REPLICATION_GLOBAL_TO_CENTRAL:
-                               continue;
-                       case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for C2G portion only
-                               if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = topic.getGlobalMrURL();
-                               break;
-       
-                       case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
-                               if ( graph.isHasCentral() &&  graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
-                                       continue;
-                               }
-                               source = centralFqdn;
-                               target = cluster.getFqdn();
-                               break;
-                       default:
-                               logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
-                               anythingWrong = true;
-                               break;
-                       }
-                       if ( source != null && target != null ) {
-                               try { 
-                                       logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getMirrorMaker( source, target);
-                                       if ( mm == null ) {
-                                               mm = new MirrorMaker(source, target);
-                                       }
-                                       mm.addTopic(topic.getFqtn());
-                                       bridge.updateMirrorMaker(mm);
-                               } catch ( Exception ex ) {
-                                       err.setCode(500);
-                                       err.setFields( "mirror_maker.topic");
-                                       err.setMessage("Unexpected condition: " + ex );
-                                       anythingWrong = true;
-                                       break;
-                               }       
-                       }
+                       }                       
+
                        
                }
-               if ( anythingWrong ) {
-                       topic.setStatus( DmaapObject_Status.INVALID);
-                       return null;
-               }
-       
-               topic.setStatus( DmaapObject_Status.VALID);
-               return topic;
+               return  anythingWrong;
+
        }
        
        /*
@@ -402,7 +445,7 @@ public class TopicService extends BaseLoggingClass {
                        Graph graph = new Graph( topic.getClients(), false );
                        
                        String centralFqdn = new String();
-                       if ( graph.isHasCentral() ) {
+                       if ( graph.hasCentral() ) {
                                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                                centralFqdn = p.getProperty("MR.CentralCname");
                        }
@@ -410,12 +453,12 @@ public class TopicService extends BaseLoggingClass {
                        Collection<String> locations = graph.getKeys();
                        for( String loc : locations ) {
                                logger.info( "loc=" + loc );
-                               MR_Cluster cluster = clusters.get(loc);
+                               MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
                                if ( cluster == null ) {
                                        logger.info( "No MR cluster for location " + loc );
                                        continue;
                                }
-                               if ( graph.isHasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+                               if ( graph.hasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
                                        logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
                                        return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;
                                        
index f86569d..a7700a1 100644 (file)
@@ -29,6 +29,8 @@ import java.util.Map;
 import org.onap.dmaap.dbcapi.database.DatabaseClass;
 import org.onap.dmaap.dbcapi.model.DcaeLocation;
 import org.onap.dmaap.dbcapi.model.MR_Client;
+import org.onap.dmaap.dbcapi.model.MR_Cluster;
+import org.onap.dmaap.dbcapi.service.MR_ClusterService;
 
 
 public class Graph {
@@ -49,21 +51,38 @@ public class Graph {
        public Graph( List<MR_Client> clients, boolean strict ) {
                if ( clients == null )
                        return;
+               initGraph( clients, strict, "" );
+               return;
+
+       }
+       public Graph( List<MR_Client> clients, boolean strict, String group ) {
+               if ( clients == null )
+                       return;
+               initGraph( clients, strict, group );
+               return;
+       }
+       
+       private void initGraph(List<MR_Client> clients, boolean strict, String group ) {
+               MR_ClusterService clusters = new MR_ClusterService();
                this.graph = new HashMap<String, String>();
                this.hasCentral = false;
                for( MR_Client client: clients ) {
                        if ( ! strict || client.isStatusValid()) {
                                String loc = client.getDcaeLocationName();
-                               for( String action : client.getAction() ){
-                                       DcaeLocation dcaeLoc = locations.get(loc);
+                               DcaeLocation dcaeLoc = locations.get(loc);
+                               if ( dcaeLoc == null ) continue;
+                               MR_Cluster c = clusters.getMr_ClusterByLoc(loc);
+                               if ( group != null &&  ! group.isEmpty() && ! group.equals(c.getReplicationGroup())) continue;
+                               
+                               for( String action : client.getAction() ){                      
                                        if ( ! action.equals("view") && dcaeLoc != null ) {
-                                               graph.put(loc, dcaeLoc.getDcaeLayer());
+                                               String layer = dcaeLoc.getDcaeLayer();
+                                               if ( layer != null && layer.contains(centralDcaeLayerName) ) {
+                                                       this.hasCentral = true;
+                                               }
+                                               graph.put(loc, layer);
                                        }
                                }
-                               String layer = graph.get(loc);
-                               if ( layer != null && layer.contains(centralDcaeLayerName) ) {
-                                       this.hasCentral = true;
-                               }
        
                        }               
                }               
@@ -88,7 +107,7 @@ public class Graph {
        public Collection<String> getKeys() {
                return graph.keySet();
        }
-       public boolean isHasCentral() {
+       public boolean hasCentral() {
                return hasCentral;
        }
        public void setHasCentral(boolean hasCentral) {
@@ -106,6 +125,9 @@ public class Graph {
                }
                return null;
        }
+       public boolean isEmpty() {
+               return graph.isEmpty();
+       }
        
        
 }
diff --git a/src/main/resources/schema_9.sql b/src/main/resources/schema_9.sql
new file mode 100644 (file)
index 0000000..ac43d78
--- /dev/null
@@ -0,0 +1,27 @@
+---
+-- ============LICENSE_START=======================================================
+-- OpenECOMP - org.onap.dbcapi
+-- ================================================================================
+-- Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+-- ================================================================================
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+-- 
+--      http://www.apache.org/licenses/LICENSE-2.0
+-- 
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+-- ============LICENSE_END=========================================================
+---
+
+
+@alter table mr_cluster
+       add column      replication_group       varchar(100)    
+;
+
+
+update dmaapbc_sch_ver set version = 9 where version = 8;
index 8b632ed..547bfc9 100644 (file)
@@ -83,8 +83,8 @@ public class MirrorMakerTest {
                String p2 = "2081";
                MirrorMaker t = new MirrorMaker( c1, c2 );
                String m = t.getMmName();
+       
 
-               t.addVector( f, c1, c2 );
                ArrayList<String> topics = new ArrayList<String>();
                topics.add( f );
                t.setTopics( topics );
@@ -92,14 +92,10 @@ public class MirrorMakerTest {
 
                int i = t.getTopicCount();
 
-               String s = t.toJSON();
-
-               s = t.updateWhiteList();
+               String s = t.updateWhiteList();
 
                s = t.createMirrorMaker(p1, p2);
 
-               t.delVector( f, c1, c2 );
-
        }
 
 }
diff --git a/src/test/java/org/onap/dmaap/dbcapi/model/ReplicationVectorTest.java b/src/test/java/org/onap/dmaap/dbcapi/model/ReplicationVectorTest.java
deleted file mode 100644 (file)
index dde3b49..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dmaap.dbcapi.model;
-
-import static org.junit.Assert.*;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dmaap.dbcapi.testframework.ReflectionHarness;
-
-import java.util.ArrayList;
-
-
-public class ReplicationVectorTest {
-
-       private static final String  fmt = "%24s: %s%n";
-
-       ReflectionHarness rh = new ReflectionHarness();
-
-
-       @Before
-       public void setUp() throws Exception {
-       }
-
-       @After
-       public void tearDown() throws Exception {
-       }
-
-
-       @Test
-       public void test1() {
-
-
-               rh.reflect( "org.onap.dmaap.dbcapi.model.ReplicationVector", "get", null );     
-       
-       }
-       @Test
-       public void test2() {
-
-               String v = "Validate";
-               rh.reflect( "org.onap.dmaap.dbcapi.model.ReplicationVector", "set", v );
-       }
-
-       @Test
-       public void test3() {
-               String f = "org.onap.interestingTopic";
-               String c1 =  "cluster1.onap.org";
-               String c2 =  "cluster2.onap.org";
-               ReplicationVector t = new ReplicationVector( f, c1, c2 );
-
-
-               assertTrue( f.equals( t.getFqtn() ));
-               assertTrue( c1.equals( t.getSourceCluster() ));
-               assertTrue( c2.equals( t.getTargetCluster() ));
-       }
-
-
-       @Test
-       public void test4() {
-               String f = "org.onap.interestingTopic";
-               String c1 =  "cluster1.onap.org";
-               String c2 =  "cluster2.onap.org";
-               ReplicationVector t = new ReplicationVector( f, c1, c2 );
-
-               int i = t.hashCode();
-
-               ReplicationVector t2 = new ReplicationVector(f, c1, c2 );
-
-               assertTrue( t.equals( t2 ));
-               assertTrue( t.equals( t ));
-               assertTrue( ! t.equals( f ));
-       }
-
-}
index 7cedfac..449746a 100644 (file)
@@ -89,7 +89,7 @@ public class GraphTest {
 
                s = g.getCentralLoc();          
                g.setHasCentral( true );
-               g.isHasCentral();
+               g.hasCentral();
 
                hm = g.getGraph();
 
index 5325c31..344bfdf 100644 (file)
@@ -27,7 +27,7 @@
 
 major=1
 minor=0
-patch=10
+patch=11
 base_version=${major}.${minor}.${patch}
 
 # Release must be completed with git revision # in Jenkins