Make Cassandra port configurable.
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / cassandra / CassandraClient.java
index 38606d0..028247d 100644 (file)
 
 package org.openecomp.sdc.be.dao.cassandra;
 
-import java.util.List;
-
-import javax.annotation.PreDestroy;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.openecomp.sdc.be.config.ConfigurationManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
-import com.datastax.driver.core.policies.DefaultRetryPolicy;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.datastax.driver.core.policies.TokenAwarePolicy;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.policies.*;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
-
 import fj.data.Either;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.util.List;
 
 @Component("cassandra-client")
 public class CassandraClient {
-       private static Logger logger = LoggerFactory.getLogger(CassandraClient.class.getName());
+       private static Logger logger = Logger.getLogger(CassandraClient.class.getName());
 
        private Cluster cluster;
        private boolean isConnected;
@@ -56,15 +49,17 @@ public class CassandraClient {
                try {
                        cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
                                        .getCassandraHosts();
+                       Integer cassandraPort = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
+                                       .getCassandraPort();
                        Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
                                        .getCassandraConfig().getReconnectTimeout();
-
-                       logger.debug("creating cluster to hosts:{} with reconnect timeout:{}", cassandraHosts, reconnectTimeout);
+                       logger.debug("creating cluster to hosts:{} port:{} with reconnect timeout:{}", cassandraHosts, cassandraPort, reconnectTimeout);
                        Cluster.Builder clusterBuilder = Cluster.builder()
                                        .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
                                        .withRetryPolicy(DefaultRetryPolicy.INSTANCE);
 
-                       cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host));
+                       cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host).withPort(cassandraPort));
+                       setSocketOptions(clusterBuilder);
                        enableAuthentication(clusterBuilder);
                        enableSsl(clusterBuilder);
                        setLocalDc(clusterBuilder);
@@ -78,6 +73,21 @@ public class CassandraClient {
                logger.info("** CassandraClient created");
        }
 
+       private void setSocketOptions(Cluster.Builder clusterBuilder) {
+               SocketOptions socketOptions =new SocketOptions();
+               Integer socketConnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketConnectTimeout();
+               if( socketConnectTimeout!=null ){
+                       logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
+                       socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
+               }
+               Integer socketReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketReadTimeout();
+               if( socketReadTimeout != null ){
+                       logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .",socketReadTimeout);
+                       socketOptions.setReadTimeoutMillis(socketReadTimeout);
+               }
+               clusterBuilder.withSocketOptions(socketOptions);
+       }
+
        private void setLocalDc(Cluster.Builder clusterBuilder) {
                String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
                                .getLocalDataCenter();
@@ -140,7 +150,7 @@ public class CassandraClient {
                                Session session = cluster.connect(keyspace);
                                if (session != null) {
                                        MappingManager manager = new MappingManager(session);
-                                       return Either.left(new ImmutablePair<Session, MappingManager>(session, manager));
+                                       return Either.left(new ImmutablePair<>(session, manager));
                                } else {
                                        return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
                                }