2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.dao.cassandra;
23 import com.datastax.driver.core.Cluster;
24 import com.datastax.driver.core.Session;
25 import com.datastax.driver.core.SocketOptions;
26 import com.datastax.driver.core.policies.*;
27 import com.datastax.driver.mapping.Mapper;
28 import com.datastax.driver.mapping.MappingManager;
29 import fj.data.Either;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.openecomp.sdc.be.config.ConfigurationManager;
32 import org.openecomp.sdc.common.log.wrappers.Logger;
33 import org.springframework.stereotype.Component;
35 import javax.annotation.PreDestroy;
36 import java.util.List;
38 @Component("cassandra-client")
39 public class CassandraClient {
40 private static Logger logger = Logger.getLogger(CassandraClient.class.getName());
42 private Cluster cluster;
43 private boolean isConnected;
45 public CassandraClient() {
48 List<String> cassandraHosts = null;
50 cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
52 Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
53 .getCassandraConfig().getReconnectTimeout();
55 logger.debug("creating cluster to hosts:{} with reconnect timeout:{}", cassandraHosts, reconnectTimeout);
56 Cluster.Builder clusterBuilder = Cluster.builder()
57 .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
58 .withRetryPolicy(DefaultRetryPolicy.INSTANCE);
60 cassandraHosts.forEach(clusterBuilder::addContactPoint);
61 setSocketOptions(clusterBuilder);
62 enableAuthentication(clusterBuilder);
63 enableSsl(clusterBuilder);
64 setLocalDc(clusterBuilder);
66 cluster = clusterBuilder.build();
68 } catch (Exception e) {
69 logger.info("** CassandraClient isn't connected to {}", cassandraHosts);
72 logger.info("** CassandraClient created");
75 private void setSocketOptions(Cluster.Builder clusterBuilder) {
76 SocketOptions socketOptions =new SocketOptions();
77 Integer socketConnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketConnectTimeout();
78 if( socketConnectTimeout!=null ){
79 logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
80 socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
82 Integer socketReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketReadTimeout();
83 if( socketReadTimeout != null ){
84 logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .",socketReadTimeout);
85 socketOptions.setReadTimeoutMillis(socketReadTimeout);
87 clusterBuilder.withSocketOptions(socketOptions);
90 private void setLocalDc(Cluster.Builder clusterBuilder) {
91 String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
92 .getLocalDataCenter();
93 if (localDataCenter != null) {
94 logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
96 LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
97 DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
98 clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
101 "localDatacenter was provided, the driver will use the datacenter of the first contact point that was reached at initialization");
105 private void enableSsl(Cluster.Builder clusterBuilder) {
106 boolean ssl = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isSsl();
108 String truststorePath = ConfigurationManager.getConfigurationManager().getConfiguration()
109 .getCassandraConfig().getTruststorePath();
110 String truststorePassword = ConfigurationManager.getConfigurationManager().getConfiguration()
111 .getCassandraConfig().getTruststorePassword();
112 if (truststorePath == null || truststorePassword == null) {
113 logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
115 System.setProperty("javax.net.ssl.trustStore", truststorePath);
116 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
117 clusterBuilder.withSSL();
123 private void enableAuthentication(Cluster.Builder clusterBuilder) {
124 boolean authenticate = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
127 String username = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
129 String password = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
131 if (username == null || password == null) {
132 logger.error("authentication is enabled but username or password were not supplied.");
134 clusterBuilder.withCredentials(username, password);
143 * - key space to connect
146 public Either<ImmutablePair<Session, MappingManager>, CassandraOperationStatus> connect(String keyspace) {
147 if (cluster != null) {
149 Session session = cluster.connect(keyspace);
150 if (session != null) {
151 MappingManager manager = new MappingManager(session);
152 return Either.left(new ImmutablePair<>(session, manager));
154 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
156 } catch (Throwable e) {
157 logger.debug("Failed to connect to keyspace [{}], error ,", keyspace);
158 logger.debug("Exception :", e);
159 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
162 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
165 public <T> CassandraOperationStatus save(T entity, Class<T> clazz, MappingManager manager) {
167 return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
170 Mapper<T> mapper = manager.mapper(clazz);
172 } catch (Exception e) {
173 logger.debug("Failed to save entity [{}], error :", entity, e);
174 return CassandraOperationStatus.GENERAL_ERROR;
176 return CassandraOperationStatus.OK;
179 public <T> Either<T, CassandraOperationStatus> getById(String id, Class<T> clazz, MappingManager manager) {
181 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
184 Mapper<T> mapper = manager.mapper(clazz);
185 T result = mapper.get(id);
186 if (result == null) {
187 return Either.right(CassandraOperationStatus.NOT_FOUND);
189 return Either.left(result);
190 } catch (Exception e) {
191 logger.debug("Failed to get by Id [{}], error :", id, e);
192 return Either.right(CassandraOperationStatus.GENERAL_ERROR);
196 public <T> CassandraOperationStatus delete(String id, Class<T> clazz, MappingManager manager) {
198 return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
201 Mapper<T> mapper = manager.mapper(clazz);
203 } catch (Exception e) {
204 logger.debug("Failed to delete by id [{}], error :", id, e);
205 return CassandraOperationStatus.GENERAL_ERROR;
207 return CassandraOperationStatus.OK;
210 public boolean isConnected() {
215 public void closeClient() {
219 logger.info("** CassandraClient cluster closed");