Provision topics on mr_clusters
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / MR_ClusterService.java
index 2dae579..dea5d83 100644 (file)
 package org.onap.dmaap.dbcapi.service;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.ws.rs.core.Response.Status;
 
+import org.onap.dmaap.dbcapi.client.MrProvConnection;
 import org.onap.dmaap.dbcapi.database.DatabaseClass;
 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
 import org.onap.dmaap.dbcapi.model.ApiError;
 import org.onap.dmaap.dbcapi.model.DcaeLocation;
 import org.onap.dmaap.dbcapi.model.MR_Cluster;
+import org.onap.dmaap.dbcapi.model.Topic;
 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
+import org.onap.dmaap.dbcapi.service.DcaeLocationService;
 import org.onap.dmaap.dbcapi.util.DmaapConfig;
 
 public class MR_ClusterService extends BaseLoggingClass {
@@ -73,17 +78,39 @@ public class MR_ClusterService extends BaseLoggingClass {
                return null;
        }
        
+       public MR_Cluster getMr_ClusterByLoc( String loc ) {
+               return mr_clusters.get( loc );
+       }
+       
        public List<MR_Cluster> getCentralClusters() {
                DcaeLocationService locations = new DcaeLocationService();
                List<MR_Cluster> result = new ArrayList<MR_Cluster>();
                for( MR_Cluster c: mr_clusters.values() ) {
-                       if ( locations.getDcaeLocation(c.getDcaeLocationName()).isCentral() ) {
-                               result.add(c);
+                       try {
+                               if ( locations.getDcaeLocation(c.getDcaeLocationName()).isCentral() ) {
+                                       result.add(c);
+                               }
+                       } catch ( NullPointerException npe ) {
+                               logger.warn( "Failed test isCentral for location:" + c.getDcaeLocationName() );
+                       }
+               }
+               return result;
+       }       
+       
+       // builds the set of unique cluster groups
+       public Set<String> getGroups() {
+               Set<String> result = new HashSet<String>();
+               for( MR_Cluster c: mr_clusters.values() ) {
+                       try {
+                               result.add(c.getReplicationGroup());
+                       } catch ( NullPointerException npe ) {
+                               logger.warn( "Failed to add Group for cluster:" + c.getDcaeLocationName() );
                        }
                }
                return result;
        }       
 
+
        public MR_Cluster addMr_Cluster( MR_Cluster cluster, ApiError apiError ) {
                logger.info( "Entry: addMr_Cluster");
                MR_Cluster mrc = mr_clusters.get( cluster.getDcaeLocationName() );
@@ -94,7 +121,7 @@ public class MR_ClusterService extends BaseLoggingClass {
                        return null;
                }
                cluster.setLastMod();
-               cluster.setStatus(DmaapObject_Status.VALID);
+               cluster.setStatus( addTopicsToCluster( cluster, apiError ) );
                mr_clusters.put( cluster.getDcaeLocationName(), cluster );
                DcaeLocationService svc = new DcaeLocationService();
                DcaeLocation loc = svc.getDcaeLocation( cluster.getDcaeLocationName() );
@@ -120,6 +147,7 @@ public class MR_ClusterService extends BaseLoggingClass {
                        return null;
                }
                cluster.setLastMod();
+               cluster.setStatus( addTopicsToCluster( cluster, apiError ) );
                mr_clusters.put( cluster.getDcaeLocationName(), cluster );
                DcaeLocationService svc = new DcaeLocationService();
                DcaeLocation loc = svc.getDcaeLocation( cluster.getDcaeLocationName() );
@@ -137,6 +165,7 @@ public class MR_ClusterService extends BaseLoggingClass {
                                mr_clusters.put( cluster.getDcaeLocationName(), cluster );
                        }
                }
+               
                apiError.setCode(200);
                return cluster;
        }
@@ -153,5 +182,22 @@ public class MR_ClusterService extends BaseLoggingClass {
                return mr_clusters.remove(key);
        }       
        
-
+       private DmaapObject_Status addTopicsToCluster( MR_Cluster cluster, ApiError err  ){
+               
+               TopicService ts = new TopicService();
+               MrProvConnection prov = new MrProvConnection();
+               List<Topic>  topics = ts.getAllTopicsWithoutClients();
+               for( Topic topic: topics ) {
+                       logger.info( "POST topic " + topic.getFqtn() + " to cluster " + cluster.getFqdn() + " in loc " + cluster.getDcaeLocationName());
+                       if ( prov.makeTopicConnection(cluster)) {
+                               String resp = prov.doPostTopic(topic, err);
+                               logger.info( "response code: " + err.getCode() );
+                               if ( ! err.is2xx() && ! (err.getCode() == 409) ) {
+                                       return DmaapObject_Status.INVALID;
+                               } 
+                       }
+               }
+               
+               return DmaapObject_Status.VALID;
+       }
 }