tine out configuration not used
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / cassandra / CassandraClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.cassandra;
22
23 import java.util.List;
24
25 import javax.annotation.PreDestroy;
26
27 import com.datastax.driver.core.SocketOptions;
28 import org.apache.commons.lang3.tuple.ImmutablePair;
29 import org.openecomp.sdc.be.config.ConfigurationManager;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.springframework.stereotype.Component;
33
34 import com.datastax.driver.core.Cluster;
35 import com.datastax.driver.core.Session;
36 import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
37 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
38 import com.datastax.driver.core.policies.DefaultRetryPolicy;
39 import com.datastax.driver.core.policies.LoadBalancingPolicy;
40 import com.datastax.driver.core.policies.TokenAwarePolicy;
41 import com.datastax.driver.mapping.Mapper;
42 import com.datastax.driver.mapping.MappingManager;
43
44 import fj.data.Either;
45
46 @Component("cassandra-client")
47 public class CassandraClient {
48         private static Logger logger = LoggerFactory.getLogger(CassandraClient.class.getName());
49
50         private Cluster cluster;
51         private boolean isConnected;
52
53         public CassandraClient() {
54                 super();
55                 isConnected = false;
56                 List<String> cassandraHosts = null;
57                 try {
58                         cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
59                                         .getCassandraHosts();
60                         Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
61                                         .getCassandraConfig().getReconnectTimeout();
62
63                         logger.debug("creating cluster to hosts:{} with reconnect timeout:{}", cassandraHosts, reconnectTimeout);
64                         Cluster.Builder clusterBuilder = Cluster.builder()
65                                         .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
66                                         .withRetryPolicy(DefaultRetryPolicy.INSTANCE);
67
68                         cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host));
69                         setSocketOptions(clusterBuilder);
70                         enableAuthentication(clusterBuilder);
71                         enableSsl(clusterBuilder);
72                         setLocalDc(clusterBuilder);
73
74                         cluster = clusterBuilder.build();
75                         isConnected = true;
76                 } catch (Exception e) {
77                         logger.info("** CassandraClient isn't connected to {}", cassandraHosts);
78                 }
79
80                 logger.info("** CassandraClient created");
81         }
82
83         private void setSocketOptions(Cluster.Builder clusterBuilder) {
84                 SocketOptions socketOptions =new SocketOptions();
85                 Integer socketConnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketConnectTimeout();
86                 if( socketConnectTimeout!=null ){
87                         logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
88                         socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
89                 }
90                 Integer socketReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketReadTimeout();
91                 if( socketReadTimeout != null ){
92                         logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .",socketReadTimeout);
93                         socketOptions.setReadTimeoutMillis(socketReadTimeout);
94                 }
95                 clusterBuilder.withSocketOptions(socketOptions);
96         }
97
98         private void setLocalDc(Cluster.Builder clusterBuilder) {
99                 String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
100                                 .getLocalDataCenter();
101                 if (localDataCenter != null) {
102                         logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
103                                         localDataCenter);
104                         LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
105                                         DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
106                         clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
107                 } else {
108                         logger.info(
109                                         "localDatacenter was provided,  the driver will use the datacenter of the first contact point that was reached at initialization");
110                 }
111         }
112
113         private void enableSsl(Cluster.Builder clusterBuilder) {
114                 boolean ssl = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isSsl();
115                 if (ssl) {
116                         String truststorePath = ConfigurationManager.getConfigurationManager().getConfiguration()
117                                         .getCassandraConfig().getTruststorePath();
118                         String truststorePassword = ConfigurationManager.getConfigurationManager().getConfiguration()
119                                         .getCassandraConfig().getTruststorePassword();
120                         if (truststorePath == null || truststorePassword == null) {
121                                 logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
122                         } else {
123                                 System.setProperty("javax.net.ssl.trustStore", truststorePath);
124                                 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
125                                 clusterBuilder.withSSL();
126                         }
127
128                 }
129         }
130
131         private void enableAuthentication(Cluster.Builder clusterBuilder) {
132                 boolean authenticate = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
133                                 .isAuthenticate();
134                 if (authenticate) {
135                         String username = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
136                                         .getUsername();
137                         String password = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
138                                         .getPassword();
139                         if (username == null || password == null) {
140                                 logger.error("authentication is enabled but username or password were not supplied.");
141                         } else {
142                                 clusterBuilder.withCredentials(username, password);
143                         }
144
145                 }
146         }
147
148         /**
149          * 
150          * @param keyspace
151          *            - key space to connect
152          * @return
153          */
154         public Either<ImmutablePair<Session, MappingManager>, CassandraOperationStatus> connect(String keyspace) {
155                 if (cluster != null) {
156                         try {
157                                 Session session = cluster.connect(keyspace);
158                                 if (session != null) {
159                                         MappingManager manager = new MappingManager(session);
160                                         return Either.left(new ImmutablePair<Session, MappingManager>(session, manager));
161                                 } else {
162                                         return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
163                                 }
164                         } catch (Throwable e) {
165                                 logger.debug("Failed to connect to keyspace [{}], error :", keyspace, e);
166                                 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
167                         }
168                 }
169                 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
170         }
171
172         public <T> CassandraOperationStatus save(T entity, Class<T> clazz, MappingManager manager) {
173                 if (!isConnected) {
174                         return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
175                 }
176                 try {
177                         Mapper<T> mapper = manager.mapper(clazz);
178                         mapper.save(entity);
179                 } catch (Exception e) {
180                         logger.debug("Failed to save entity [{}], error :", entity, e);
181                         return CassandraOperationStatus.GENERAL_ERROR;
182                 }
183                 return CassandraOperationStatus.OK;
184         }
185
186         public <T> Either<T, CassandraOperationStatus> getById(String id, Class<T> clazz, MappingManager manager) {
187                 if (!isConnected) {
188                         return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
189                 }
190                 try {
191                         Mapper<T> mapper = manager.mapper(clazz);
192                         T result = mapper.get(id);
193                         if (result == null) {
194                                 return Either.right(CassandraOperationStatus.NOT_FOUND);
195                         }
196                         return Either.left(result);
197                 } catch (Exception e) {
198                         logger.debug("Failed to get by Id [{}], error :", id, e);
199                         return Either.right(CassandraOperationStatus.GENERAL_ERROR);
200                 }
201         }
202
203         public <T> CassandraOperationStatus delete(String id, Class<T> clazz, MappingManager manager) {
204                 if (!isConnected) {
205                         return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
206                 }
207                 try {
208                         Mapper<T> mapper = manager.mapper(clazz);
209                         mapper.delete(id);
210                 } catch (Exception e) {
211                         logger.debug("Failed to delete by id [{}], error :", id, e);
212                         return CassandraOperationStatus.GENERAL_ERROR;
213                 }
214                 return CassandraOperationStatus.OK;
215         }
216
217         public boolean isConnected() {
218                 return isConnected;
219         }
220
221         @PreDestroy
222         public void closeClient() {
223                 if (isConnected) {
224                         cluster.close();
225                 }
226                 logger.info("** CassandraClient cluster closed");
227         }
228 }