package org.openecomp.sdc.be.dao.cassandra;
-import java.util.List;
-
-import javax.annotation.PreDestroy;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.openecomp.sdc.be.config.ConfigurationManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
-import com.datastax.driver.core.policies.DefaultRetryPolicy;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.datastax.driver.core.policies.TokenAwarePolicy;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.policies.*;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
-
import fj.data.Either;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.util.List;
@Component("cassandra-client")
public class CassandraClient {
- private static Logger logger = LoggerFactory.getLogger(CassandraClient.class.getName());
+ private static Logger logger = Logger.getLogger(CassandraClient.class.getName());
private Cluster cluster;
private boolean isConnected;
try {
cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
.getCassandraHosts();
+ Integer cassandraPort = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
+ .getCassandraPort();
Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
.getCassandraConfig().getReconnectTimeout();
-
- logger.debug("creating cluster to hosts:{} with reconnect timeout:{}", cassandraHosts, reconnectTimeout);
+ logger.debug("creating cluster to hosts:{} port:{} with reconnect timeout:{}", cassandraHosts, cassandraPort, reconnectTimeout);
Cluster.Builder clusterBuilder = Cluster.builder()
.withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
- cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host));
+ cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host).withPort(cassandraPort));
+ setSocketOptions(clusterBuilder);
enableAuthentication(clusterBuilder);
enableSsl(clusterBuilder);
setLocalDc(clusterBuilder);
logger.info("** CassandraClient created");
}
+ private void setSocketOptions(Cluster.Builder clusterBuilder) {
+ SocketOptions socketOptions =new SocketOptions();
+ Integer socketConnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketConnectTimeout();
+ if( socketConnectTimeout!=null ){
+ logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
+ socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
+ }
+ Integer socketReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketReadTimeout();
+ if( socketReadTimeout != null ){
+ logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .",socketReadTimeout);
+ socketOptions.setReadTimeoutMillis(socketReadTimeout);
+ }
+ clusterBuilder.withSocketOptions(socketOptions);
+ }
+
private void setLocalDc(Cluster.Builder clusterBuilder) {
String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
.getLocalDataCenter();
Session session = cluster.connect(keyspace);
if (session != null) {
MappingManager manager = new MappingManager(session);
- return Either.left(new ImmutablePair<Session, MappingManager>(session, manager));
+ return Either.left(new ImmutablePair<>(session, manager));
} else {
return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
}