2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.dao.cassandra;
23 import java.util.List;
25 import javax.annotation.PreDestroy;
27 import com.datastax.driver.core.policies.*;
28 import org.apache.commons.lang3.tuple.ImmutablePair;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.springframework.stereotype.Component;
34 import com.datastax.driver.core.Cluster;
35 import com.datastax.driver.core.Session;
36 import com.datastax.driver.mapping.Mapper;
37 import com.datastax.driver.mapping.MappingManager;
39 import fj.data.Either;
41 @Component("cassandra-client")
42 public class CassandraClient {
43 private static Logger logger = LoggerFactory.getLogger(CassandraClient.class.getName());
45 private Cluster cluster;
46 private boolean isConnected;
48 public CassandraClient() {
51 List<String> cassandraHosts = null;
53 cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
55 Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
56 .getCassandraConfig().getReconnectTimeout();
58 logger.debug("creating cluster to hosts:{} with reconnect timeout:{}", cassandraHosts, reconnectTimeout);
59 Cluster.Builder clusterBuilder = Cluster.builder()
60 .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
61 .withRetryPolicy(DefaultRetryPolicy.INSTANCE);
63 cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host));
64 enableAuthentication(clusterBuilder);
65 enableSsl(clusterBuilder);
66 setLocalDc(clusterBuilder);
68 cluster = clusterBuilder.build();
70 } catch (Exception e) {
71 logger.info("** CassandraClient isn't connected to {}", cassandraHosts);
74 logger.info("** CassandraClient created");
77 private void setLocalDc(Cluster.Builder clusterBuilder) {
78 String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
79 .getLocalDataCenter();
80 if (localDataCenter != null) {
81 logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
83 LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
84 DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
85 clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
88 "localDatacenter was provided, the driver will use the datacenter of the first contact point that was reached at initialization");
92 private void enableSsl(Cluster.Builder clusterBuilder) {
93 boolean ssl = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isSsl();
95 String truststorePath = ConfigurationManager.getConfigurationManager().getConfiguration()
96 .getCassandraConfig().getTruststorePath();
97 String truststorePassword = ConfigurationManager.getConfigurationManager().getConfiguration()
98 .getCassandraConfig().getTruststorePassword();
99 if (truststorePath == null || truststorePassword == null) {
100 logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
102 System.setProperty("javax.net.ssl.trustStore", truststorePath);
103 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
104 clusterBuilder.withSSL();
110 private void enableAuthentication(Cluster.Builder clusterBuilder) {
111 boolean authenticate = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
114 String username = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
116 String password = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
118 if (username == null || password == null) {
119 logger.error("authentication is enabled but username or password were not supplied.");
121 clusterBuilder.withCredentials(username, password);
130 * - key space to connect
133 public Either<ImmutablePair<Session, MappingManager>, CassandraOperationStatus> connect(String keyspace) {
134 if (cluster != null) {
136 Session session = cluster.connect(keyspace);
137 if (session != null) {
138 MappingManager manager = new MappingManager(session);
139 return Either.left(new ImmutablePair<Session, MappingManager>(session, manager));
141 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
143 } catch (Throwable e) {
144 logger.debug("Failed to connect to keyspace [{}], error :", keyspace, e);
145 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
148 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
151 public <T> CassandraOperationStatus save(T entity, Class<T> clazz, MappingManager manager) {
153 return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
156 Mapper<T> mapper = manager.mapper(clazz);
158 } catch (Exception e) {
159 logger.debug("Failed to save entity [{}], error :", entity, e);
160 return CassandraOperationStatus.GENERAL_ERROR;
162 return CassandraOperationStatus.OK;
165 public <T> Either<T, CassandraOperationStatus> getById(String id, Class<T> clazz, MappingManager manager) {
167 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
170 Mapper<T> mapper = manager.mapper(clazz);
171 T result = mapper.get(id);
172 if (result == null) {
173 return Either.right(CassandraOperationStatus.NOT_FOUND);
175 return Either.left(result);
176 } catch (Exception e) {
177 logger.debug("Failed to get by Id [{}], error :", id, e);
178 return Either.right(CassandraOperationStatus.GENERAL_ERROR);
182 public <T> CassandraOperationStatus delete(String id, Class<T> clazz, MappingManager manager) {
184 return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
187 Mapper<T> mapper = manager.mapper(clazz);
189 } catch (Exception e) {
190 logger.debug("Failed to delete by id [{}], error :", id, e);
191 return CassandraOperationStatus.GENERAL_ERROR;
193 return CassandraOperationStatus.OK;
196 public boolean isConnected() {
201 public void closeClient() {
205 logger.info("** CassandraClient cluster closed");