import javax.annotation.PreDestroy;
-import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.policies.*;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.openecomp.sdc.be.config.Configuration;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
-import com.datastax.driver.core.policies.DefaultRetryPolicy;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
private Cluster cluster;
private boolean isConnected;
- private Configuration.CassandrConfig configuration;
public CassandraClient() {
super();
isConnected = false;
List<String> cassandraHosts = null;
try {
-
- this.configuration = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig();
- cassandraHosts = configuration.getCassandraHosts();
- Long reconnectTimeout = configuration.getReconnectTimeout();
-
+ cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
+ .getCassandraHosts();
+ Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
+ .getCassandraConfig().getReconnectTimeout();
logger.debug("creating cluster to hosts:{} with reconnect timeout:{}", cassandraHosts, reconnectTimeout);
Cluster.Builder clusterBuilder = Cluster.builder()
.withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
-
cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host));
- setSocketOptions(clusterBuilder);
enableAuthentication(clusterBuilder);
enableSsl(clusterBuilder);
setLocalDc(clusterBuilder);
logger.info("** CassandraClient created");
}
- private void setSocketOptions(Cluster.Builder clusterBuilder) {
- SocketOptions socketOptions =new SocketOptions();
- Integer socketConnectTimeout = this.configuration.getSocketConnectTimeout();
- if( socketConnectTimeout!=null ){
- logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
- socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
- }
- Integer socketReadTimeout = this.configuration.getSocketReadTimeout();
- if( socketReadTimeout != null ){
- logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .",socketReadTimeout);
- socketOptions.setReadTimeoutMillis(socketReadTimeout);
- }
- clusterBuilder.withSocketOptions(socketOptions);
- }
-
private void setLocalDc(Cluster.Builder clusterBuilder) {
String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
.getLocalDataCenter();
if (localDataCenter != null) {
- logger.info("localDatacenter was provided, setting Cassandra client to use datacenter: {} as local.",
+ logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
localDataCenter);
LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());