Make Cassandra port configurable.
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / cassandra / CassandraClient.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.dao.cassandra;
22
23 import com.datastax.driver.core.Cluster;
24 import com.datastax.driver.core.Session;
25 import com.datastax.driver.core.SocketOptions;
26 import com.datastax.driver.core.policies.*;
27 import com.datastax.driver.mapping.Mapper;
28 import com.datastax.driver.mapping.MappingManager;
29 import fj.data.Either;
30 import org.apache.commons.lang3.tuple.ImmutablePair;
31 import org.openecomp.sdc.be.config.ConfigurationManager;
32 import org.openecomp.sdc.common.log.wrappers.Logger;
33 import org.springframework.stereotype.Component;
34
35 import javax.annotation.PreDestroy;
36 import java.util.List;
37
38 @Component("cassandra-client")
39 public class CassandraClient {
40         private static Logger logger = Logger.getLogger(CassandraClient.class.getName());
41
42         private Cluster cluster;
43         private boolean isConnected;
44
45         public CassandraClient() {
46                 super();
47                 isConnected = false;
48                 List<String> cassandraHosts = null;
49                 try {
50                         cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
51                                         .getCassandraHosts();
52                         Integer cassandraPort = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
53                                         .getCassandraPort();
54                         Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration()
55                                         .getCassandraConfig().getReconnectTimeout();
56                         logger.debug("creating cluster to hosts:{} port:{} with reconnect timeout:{}", cassandraHosts, cassandraPort, reconnectTimeout);
57                         Cluster.Builder clusterBuilder = Cluster.builder()
58                                         .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
59                                         .withRetryPolicy(DefaultRetryPolicy.INSTANCE);
60
61                         cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host).withPort(cassandraPort));
62                         setSocketOptions(clusterBuilder);
63                         enableAuthentication(clusterBuilder);
64                         enableSsl(clusterBuilder);
65                         setLocalDc(clusterBuilder);
66
67                         cluster = clusterBuilder.build();
68                         isConnected = true;
69                 } catch (Exception e) {
70                         logger.info("** CassandraClient isn't connected to {}", cassandraHosts);
71                 }
72
73                 logger.info("** CassandraClient created");
74         }
75
76         private void setSocketOptions(Cluster.Builder clusterBuilder) {
77                 SocketOptions socketOptions =new SocketOptions();
78                 Integer socketConnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketConnectTimeout();
79                 if( socketConnectTimeout!=null ){
80                         logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
81                         socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
82                 }
83                 Integer socketReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketReadTimeout();
84                 if( socketReadTimeout != null ){
85                         logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .",socketReadTimeout);
86                         socketOptions.setReadTimeoutMillis(socketReadTimeout);
87                 }
88                 clusterBuilder.withSocketOptions(socketOptions);
89         }
90
91         private void setLocalDc(Cluster.Builder clusterBuilder) {
92                 String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
93                                 .getLocalDataCenter();
94                 if (localDataCenter != null) {
95                         logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
96                                         localDataCenter);
97                         LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
98                                         DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
99                         clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
100                 } else {
101                         logger.info(
102                                         "localDatacenter was provided,  the driver will use the datacenter of the first contact point that was reached at initialization");
103                 }
104         }
105
106         private void enableSsl(Cluster.Builder clusterBuilder) {
107                 boolean ssl = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isSsl();
108                 if (ssl) {
109                         String truststorePath = ConfigurationManager.getConfigurationManager().getConfiguration()
110                                         .getCassandraConfig().getTruststorePath();
111                         String truststorePassword = ConfigurationManager.getConfigurationManager().getConfiguration()
112                                         .getCassandraConfig().getTruststorePassword();
113                         if (truststorePath == null || truststorePassword == null) {
114                                 logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
115                         } else {
116                                 System.setProperty("javax.net.ssl.trustStore", truststorePath);
117                                 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
118                                 clusterBuilder.withSSL();
119                         }
120
121                 }
122         }
123
124         private void enableAuthentication(Cluster.Builder clusterBuilder) {
125                 boolean authenticate = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
126                                 .isAuthenticate();
127                 if (authenticate) {
128                         String username = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
129                                         .getUsername();
130                         String password = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig()
131                                         .getPassword();
132                         if (username == null || password == null) {
133                                 logger.error("authentication is enabled but username or password were not supplied.");
134                         } else {
135                                 clusterBuilder.withCredentials(username, password);
136                         }
137
138                 }
139         }
140
141         /**
142          * 
143          * @param keyspace
144          *            - key space to connect
145          * @return
146          */
147         public Either<ImmutablePair<Session, MappingManager>, CassandraOperationStatus> connect(String keyspace) {
148                 if (cluster != null) {
149                         try {
150                                 Session session = cluster.connect(keyspace);
151                                 if (session != null) {
152                                         MappingManager manager = new MappingManager(session);
153                                         return Either.left(new ImmutablePair<>(session, manager));
154                                 } else {
155                                         return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
156                                 }
157                         } catch (Throwable e) {
158                                 logger.debug("Failed to connect to keyspace [{}], error :", keyspace, e);
159                                 return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED);
160                         }
161                 }
162                 return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
163         }
164
165         public <T> CassandraOperationStatus save(T entity, Class<T> clazz, MappingManager manager) {
166                 if (!isConnected) {
167                         return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
168                 }
169                 try {
170                         Mapper<T> mapper = manager.mapper(clazz);
171                         mapper.save(entity);
172                 } catch (Exception e) {
173                         logger.debug("Failed to save entity [{}], error :", entity, e);
174                         return CassandraOperationStatus.GENERAL_ERROR;
175                 }
176                 return CassandraOperationStatus.OK;
177         }
178
179         public <T> Either<T, CassandraOperationStatus> getById(String id, Class<T> clazz, MappingManager manager) {
180                 if (!isConnected) {
181                         return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED);
182                 }
183                 try {
184                         Mapper<T> mapper = manager.mapper(clazz);
185                         T result = mapper.get(id);
186                         if (result == null) {
187                                 return Either.right(CassandraOperationStatus.NOT_FOUND);
188                         }
189                         return Either.left(result);
190                 } catch (Exception e) {
191                         logger.debug("Failed to get by Id [{}], error :", id, e);
192                         return Either.right(CassandraOperationStatus.GENERAL_ERROR);
193                 }
194         }
195
196         public <T> CassandraOperationStatus delete(String id, Class<T> clazz, MappingManager manager) {
197                 if (!isConnected) {
198                         return CassandraOperationStatus.CLUSTER_NOT_CONNECTED;
199                 }
200                 try {
201                         Mapper<T> mapper = manager.mapper(clazz);
202                         mapper.delete(id);
203                 } catch (Exception e) {
204                         logger.debug("Failed to delete by id [{}], error :", id, e);
205                         return CassandraOperationStatus.GENERAL_ERROR;
206                 }
207                 return CassandraOperationStatus.OK;
208         }
209
210         public boolean isConnected() {
211                 return isConnected;
212         }
213
214         @PreDestroy
215         public void closeClient() {
216                 if (isConnected) {
217                         cluster.close();
218                 }
219                 logger.info("** CassandraClient cluster closed");
220         }
221 }