Sync Integ to Master
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / cassandra / CassandraClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.cassandra;
22
23 import java.util.List;
24
25 import javax.annotation.PreDestroy;
26
27 import com.datastax.driver.core.policies.*;
28 import org.apache.commons.lang3.tuple.ImmutablePair;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.springframework.stereotype.Component;
33
34 import com.datastax.driver.core.Cluster;
35 import com.datastax.driver.core.Session;
36 import com.datastax.driver.mapping.Mapper;
37 import com.datastax.driver.mapping.MappingManager;
38
39 import fj.data.Either;
40
41 @Component("cassandra-client")
42 public class CassandraClient {
43         private static Logger logger = LoggerFactory.getLogger(CassandraClient.class.getName());
44
45         private Cluster cluster;
46         private boolean isConnected;
47
48         public CassandraClient() {
49                 super();
50                 isConnected = false;
51                 List<String> cassandraHosts = null;
52                 try {
53                         cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
54                                         .getCassandraHosts();
55                         Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
56                                         .getCassandraConfig().getReconnectTimeout();
57
58                         logger.debug("creating cluster to hosts:{} with reconnect timeout:{}", cassandraHosts, reconnectTimeout);
59                         Cluster.Builder clusterBuilder = Cluster.builder()
60                                         .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
61                                         .withRetryPolicy(DefaultRetryPolicy.INSTANCE);
62
63                         cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host));
64                         enableAuthentication(clusterBuilder);
65                         enableSsl(clusterBuilder);
66                         setLocalDc(clusterBuilder);
67
68                         cluster = clusterBuilder.build();
69                         isConnected = true;
70                 } catch (Exception e) {
71                         logger.info("** CassandraClient isn't connected to {}", cassandraHosts);
72                 }
73
74                 logger.info("** CassandraClient created");
75         }
76
77         private void setLocalDc(Cluster.Builder clusterBuilder) {
78                 String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
79                                 .getLocalDataCenter();
80                 if (localDataCenter != null) {
81                         logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
82                                         localDataCenter);
83                         LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
84                                         DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
85                         clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
86                 } else {
87                         logger.info(
88                                         "localDatacenter was provided,  the driver will use the datacenter of the first contact point that was reached at initialization");
89                 }
90         }
91
92         private void enableSsl(Cluster.Builder clusterBuilder) {
93                 boolean ssl = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isSsl();
94                 if (ssl) {
95                         String truststorePath = ConfigurationManager.getConfigurationManager().getConfiguration()
96                                         .getCassandraConfig().getTruststorePath();
97                         String truststorePassword = ConfigurationManager.getConfigurationManager().getConfiguration()
98                                         .getCassandraConfig().getTruststorePassword();
99                         if (truststorePath == null || truststorePassword == null) {
100                                 logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
101                         } else {
102                                 System.setProperty("javax.net.ssl.trustStore", truststorePath);
103                                 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
104                                 clusterBuilder.withSSL();
105                         }
106
107                 }
108         }
109
110         private void enableAuthentication(Cluster.Builder clusterBuilder) {
111                 boolean authenticate = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
112                                 .isAuthenticate();
113                 if (authenticate) {
114                         String username = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
115                                         .getUsername();
116                         String password = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
117                                         .getPassword();
118                         if (username == null || password == null) {
119                                 logger.error("authentication is enabled but username or password were not supplied.");
120                         } else {
121                                 clusterBuilder.withCredentials(username, password);
122                         }
123
124                 }
125         }
126
127         /**
128          * 
129          * @param keyspace
130          *            - key space to connect
131          * @return
132          */
133         public Either<ImmutablePair<Session, MappingManager>, CassandraOperationStatus> connect(String keyspace) {
134                 if (cluster != null) {
135                         try {
136                                 Session session = cluster.connect(keyspace);
137                                 if (session != null) {
138                                         MappingManager manager = new MappingManager(session);
139                                         return Either.left(new ImmutablePair<Session, MappingManager>(session, manager));
140                                 } else {
141                                         return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
142                                 }
143                         } catch (Throwable e) {
144                                 logger.debug("Failed to connect to keyspace [{}], error :", keyspace, e);
145                                 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
146                         }
147                 }
148                 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
149         }
150
151         public <T> CassandraOperationStatus save(T entity, Class<T> clazz, MappingManager manager) {
152                 if (!isConnected) {
153                         return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
154                 }
155                 try {
156                         Mapper<T> mapper = manager.mapper(clazz);
157                         mapper.save(entity);
158                 } catch (Exception e) {
159                         logger.debug("Failed to save entity [{}], error :", entity, e);
160                         return CassandraOperationStatus.GENERAL_ERROR;
161                 }
162                 return CassandraOperationStatus.OK;
163         }
164
165         public <T> Either<T, CassandraOperationStatus> getById(String id, Class<T> clazz, MappingManager manager) {
166                 if (!isConnected) {
167                         return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
168                 }
169                 try {
170                         Mapper<T> mapper = manager.mapper(clazz);
171                         T result = mapper.get(id);
172                         if (result == null) {
173                                 return Either.right(CassandraOperationStatus.NOT_FOUND);
174                         }
175                         return Either.left(result);
176                 } catch (Exception e) {
177                         logger.debug("Failed to get by Id [{}], error :", id, e);
178                         return Either.right(CassandraOperationStatus.GENERAL_ERROR);
179                 }
180         }
181
182         public <T> CassandraOperationStatus delete(String id, Class<T> clazz, MappingManager manager) {
183                 if (!isConnected) {
184                         return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
185                 }
186                 try {
187                         Mapper<T> mapper = manager.mapper(clazz);
188                         mapper.delete(id);
189                 } catch (Exception e) {
190                         logger.debug("Failed to delete by id [{}], error :", id, e);
191                         return CassandraOperationStatus.GENERAL_ERROR;
192                 }
193                 return CassandraOperationStatus.OK;
194         }
195
196         public boolean isConnected() {
197                 return isConnected;
198         }
199
200         @PreDestroy
201         public void closeClient() {
202                 if (isConnected) {
203                         cluster.close();
204                 }
205                 logger.info("** CassandraClient cluster closed");
206         }
207 }