2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.dao.cassandra;
22 import com.datastax.driver.core.Cluster;
23 import com.datastax.driver.core.Session;
24 import com.datastax.driver.core.SocketOptions;
25 import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
26 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
27 import com.datastax.driver.core.policies.DefaultRetryPolicy;
28 import com.datastax.driver.core.policies.LoadBalancingPolicy;
29 import com.datastax.driver.core.policies.TokenAwarePolicy;
30 import com.datastax.driver.mapping.Mapper;
31 import com.datastax.driver.mapping.MappingManager;
32 import fj.data.Either;
33 import java.util.List;
34 import javax.annotation.PreDestroy;
35 import org.apache.commons.lang3.tuple.ImmutablePair;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.common.log.enums.EcompLoggerErrorCode;
38 import org.openecomp.sdc.common.log.wrappers.Logger;
39 import org.springframework.stereotype.Component;
41 @Component("cassandra-client")
42 public class CassandraClient {
44 private static Logger logger = Logger.getLogger(CassandraClient.class.getName());
45 private Cluster cluster;
46 private boolean isConnected;
48 public CassandraClient() {
51 List<String> cassandraHosts = null;
53 cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getCassandraHosts();
54 Integer cassandraPort = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getCassandraPort();
55 Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getReconnectTimeout();
56 logger.debug("creating cluster to hosts:{} port:{} with reconnect timeout:{}", cassandraHosts, cassandraPort, reconnectTimeout);
57 Cluster.Builder clusterBuilder = Cluster.builder().withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
58 .withRetryPolicy(DefaultRetryPolicy.INSTANCE);
59 cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host).withPort(cassandraPort));
60 setSocketOptions(clusterBuilder);
61 enableAuthentication(clusterBuilder);
62 enableSsl(clusterBuilder);
63 setLocalDc(clusterBuilder);
64 cluster = clusterBuilder.build();
66 } catch (Exception e) {
67 logger.info("** CassandraClient isn't connected to {}", cassandraHosts);
69 logger.info("** CassandraClient created");
72 private void setSocketOptions(Cluster.Builder clusterBuilder) {
73 SocketOptions socketOptions = new SocketOptions();
74 Integer socketConnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
75 .getSocketConnectTimeout();
76 if (socketConnectTimeout != null) {
77 logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .", socketConnectTimeout);
78 socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
80 Integer socketReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketReadTimeout();
81 if (socketReadTimeout != null) {
82 logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .", socketReadTimeout);
83 socketOptions.setReadTimeoutMillis(socketReadTimeout);
85 clusterBuilder.withSocketOptions(socketOptions);
88 private void setLocalDc(Cluster.Builder clusterBuilder) {
89 String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getLocalDataCenter();
90 if (localDataCenter != null) {
91 logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.", localDataCenter);
92 LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
93 clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
96 "localDatacenter was provided, the driver will use the datacenter of the first contact point that was reached at initialization");
100 private void enableSsl(Cluster.Builder clusterBuilder) {
101 boolean ssl = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isSsl();
103 String truststorePath = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getTruststorePath();
104 String truststorePassword = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
105 .getTruststorePassword();
106 if (truststorePath == null || truststorePassword == null) {
107 logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
109 System.setProperty("javax.net.ssl.trustStore", truststorePath);
110 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
111 clusterBuilder.withSSL();
116 private void enableAuthentication(Cluster.Builder clusterBuilder) {
117 boolean authenticate = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isAuthenticate();
119 String username = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getUsername();
120 String password = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getPassword();
121 if (username == null || password == null) {
122 logger.error("authentication is enabled but username or password were not supplied.");
124 clusterBuilder.withCredentials(username, password);
130 * @param keyspace - key space to connect
133 public Either<ImmutablePair<Session, MappingManager>, CassandraOperationStatus> connect(String keyspace) {
134 if (cluster != null) {
136 Session session = cluster.connect(keyspace);
137 if (session != null) {
138 MappingManager manager = new MappingManager(session);
139 return Either.left(new ImmutablePair<>(session, manager));
141 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
143 } catch (Throwable e) {
144 logger.debug("Failed to connect to keyspace [{}], error :", keyspace, e);
145 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
148 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
151 public <T> CassandraOperationStatus save(T entity, Class<T> clazz, MappingManager manager) {
153 return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
156 Mapper<T> mapper = manager.mapper(clazz);
158 } catch (Exception e) {
159 logger.error(EcompLoggerErrorCode.DATA_ERROR, CassandraClient.class.getName(), "Failed to save entity [{}], error :", entity, e);
160 return CassandraOperationStatus.GENERAL_ERROR;
162 return CassandraOperationStatus.OK;
165 public <T> Either<T, CassandraOperationStatus> getById(String id, Class<T> clazz, MappingManager manager) {
167 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
170 Mapper<T> mapper = manager.mapper(clazz);
171 T result = mapper.get(id);
172 if (result == null) {
173 return Either.right(CassandraOperationStatus.NOT_FOUND);
175 return Either.left(result);
176 } catch (Exception e) {
177 logger.debug("Failed to get by Id [{}], error :", id, e);
178 return Either.right(CassandraOperationStatus.GENERAL_ERROR);
182 public <T> CassandraOperationStatus delete(String id, Class<T> clazz, MappingManager manager) {
184 return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
187 Mapper<T> mapper = manager.mapper(clazz);
189 } catch (Exception e) {
190 logger.debug("Failed to delete by id [{}], error :", id, e);
191 return CassandraOperationStatus.GENERAL_ERROR;
193 return CassandraOperationStatus.OK;
196 public boolean isConnected() {
201 public void closeClient() {
205 logger.info("** CassandraClient cluster closed");