Catalog alignment
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / cassandra / schema / SdcSchemaUtils.java
index e6b091b..3d68980 100644 (file)
 package org.openecomp.sdc.be.dao.cassandra.schema;
 
 import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ProtocolVersion;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
 import org.openecomp.sdc.be.config.Configuration;
 import org.openecomp.sdc.be.config.ConfigurationManager;
 import org.openecomp.sdc.common.log.wrappers.Logger;
@@ -34,12 +39,31 @@ import java.util.function.Supplier;
 public class SdcSchemaUtils {
 
     private static Logger log = Logger.getLogger(SdcSchemaUtils.class.getName());
+    private Cluster cluster;
+    private boolean isConnected;  
+    
+    
+
+    public SdcSchemaUtils() {
+        super();
+        try {
+            isConnected = false;
+            cluster =  createCluster();
+            isConnected = true;
+        } catch (Exception e) {
+            log.info("** CassandraClient isn't connected. error is {}", e);
+        }
+
+        log.info("** cluster created");
+    }
 
     /**
      * the method creates the cluster object using the supplied cassandra nodes
      * in the configuration
      *
      * @return cluster object our null in case of an invalid configuration
+     * 
+     * 
      */
     public Cluster createCluster() {
         final Configuration.CassandrConfig config = getCassandraConfig();
@@ -53,41 +77,104 @@ public class SdcSchemaUtils {
         Cluster.Builder clusterBuilder = Cluster.builder();
         nodes.forEach(node -> clusterBuilder.addContactPoint(node).withPort(cassandraPort));
 
-        clusterBuilder.withMaxSchemaAgreementWaitSeconds(60);
-
-        if (config.isAuthenticate()) {
-            String username = config.getUsername();
-            String password = config.getPassword();
-            if (username == null || password == null) {
-                log.info("authentication is enabled but username or password were not supplied.");
-                return null;
+       clusterBuilder.withMaxSchemaAgreementWaitSeconds(60); 
+             
+       setSocketOptions(clusterBuilder, config);
+        if(!enableAuthentication(clusterBuilder, config)){
+            return null;
+        }
+        
+        if(!enableSsl(clusterBuilder, config)){
+            return null;
+        }
+        setLocalDc(clusterBuilder, config);
+        
+        return clusterBuilder.build();
+    }
+    
+    /**
+     * 
+     * @return
+     */
+    public Session  connect() {
+        Session session = null;
+        if (cluster != null) {
+            try {
+                session = cluster.connect();
+               
+            } catch (Throwable e) {
+                log.debug("Failed to connect cluster, error :",  e);
+               
             }
-            clusterBuilder.withCredentials(username, password);
         }
-        if (config.isSsl()) {
+        return session;
+    }
+    
+    public Metadata getMetadata(){
+        if (cluster != null){
+            return cluster.getMetadata();
+        }
+        return null;
+    }
+    
+    private void setLocalDc(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
+        String localDataCenter = config.getLocalDataCenter();
+        if (localDataCenter != null) {
+            log.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
+                    localDataCenter);
+            LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
+                    DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
+            clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
+        } else {
+            log.info(
+                    "localDatacenter was provided,  the driver will use the datacenter of the first contact point that was reached at initialization");
+        }
+    }
+    
+    private boolean enableSsl(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
+        boolean ssl = config.isSsl();
+        if (ssl) {
             String truststorePath = config.getTruststorePath();
             String truststorePassword = config.getTruststorePassword();
             if (truststorePath == null || truststorePassword == null) {
-                log.info("ssl is enabled but truststorePath or truststorePassword were not supplied.");
-                return null;
+                log.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
+                return false;
+            } else {
+                System.setProperty("javax.net.ssl.trustStore", truststorePath);
+                System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
+                clusterBuilder.withSSL();
             }
-            System.setProperty("javax.net.ssl.trustStore", truststorePath);
-            System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
-            clusterBuilder.withSSL();
+
         }
+        return true;
+    }
+    
+    
+    private void setSocketOptions(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
         SocketOptions socketOptions =new SocketOptions();
         Integer socketConnectTimeout = config.getSocketConnectTimeout();
         if( socketConnectTimeout!=null ){
             log.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
             socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
         }
-        Integer socketReadTimeout = config.getSocketReadTimeout();
-        if( socketReadTimeout != null ){
-            log.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .",socketReadTimeout);
-            socketOptions.setReadTimeoutMillis(socketReadTimeout);
-        }
         clusterBuilder.withSocketOptions(socketOptions);
-        return clusterBuilder.build();
+    }
+    
+    private boolean enableAuthentication(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
+        boolean authenticate = config.isAuthenticate();
+       
+        if (authenticate) {
+            String username = config.getUsername();
+            String password = config.getPassword();
+            if (username == null || password == null) {
+                log.error("authentication is enabled but username or password were not supplied.");
+                return false;
+            } else {
+                clusterBuilder.withCredentials(username, password);
+            }
+
+        }
+        return true;
     }
 
     public boolean executeStatement(String statement) {
@@ -118,5 +205,13 @@ public class SdcSchemaUtils {
     Configuration.CassandrConfig getCassandraConfig() {
         return ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig();
     }
+    
+    
+    public void closeCluster() {
+        if (isConnected) {
+            cluster.close();
+        }
+        log.info("** CassandraClient cluster closed");
+    }
 
 }