Catalog alignment
[sdc.git] / catalog-dao / src / main / java / org / openecomp / sdc / be / dao / cassandra / schema / SdcSchemaUtils.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  * Modifications copyright (c) 2018 Nokia
20  * ================================================================================
21  */
22 package org.openecomp.sdc.be.dao.cassandra.schema;
23
24 import com.datastax.driver.core.Cluster;
25 import com.datastax.driver.core.Metadata;
26 import com.datastax.driver.core.ProtocolVersion;
27 import com.datastax.driver.core.Session;
28 import com.datastax.driver.core.SocketOptions;
29 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
30 import com.datastax.driver.core.policies.LoadBalancingPolicy;
31 import com.datastax.driver.core.policies.TokenAwarePolicy;
32 import org.openecomp.sdc.be.config.Configuration;
33 import org.openecomp.sdc.be.config.ConfigurationManager;
34 import org.openecomp.sdc.common.log.wrappers.Logger;
35
36 import java.util.List;
37 import java.util.function.Supplier;
38
39 public class SdcSchemaUtils {
40
41     private static Logger log = Logger.getLogger(SdcSchemaUtils.class.getName());
42     private Cluster cluster;
43     private boolean isConnected;  
44     
45     
46
47     public SdcSchemaUtils() {
48         super();
49         try {
50             isConnected = false;
51             cluster =  createCluster();
52             isConnected = true;
53         } catch (Exception e) {
54             log.info("** CassandraClient isn't connected. error is {}", e);
55         }
56
57         log.info("** cluster created");
58     }
59
60     /**
61      * the method creates the cluster object using the supplied cassandra nodes
62      * in the configuration
63      *
64      * @return cluster object our null in case of an invalid configuration
65      * 
66      * 
67      */
68     public Cluster createCluster() {
69         final Configuration.CassandrConfig config = getCassandraConfig();
70         List<String> nodes = config.getCassandraHosts();
71         Integer cassandraPort = config.getCassandraPort();
72         if (nodes == null || cassandraPort == null) {
73             log.info("no nodes or port were supplied in configuration.");
74             return null;
75         }
76         log.info("connecting to node:{} port{}.", nodes, cassandraPort);
77         Cluster.Builder clusterBuilder = Cluster.builder();
78         nodes.forEach(node -> clusterBuilder.addContactPoint(node).withPort(cassandraPort));
79
80        clusterBuilder.withMaxSchemaAgreementWaitSeconds(60); 
81              
82        setSocketOptions(clusterBuilder, config);
83         if(!enableAuthentication(clusterBuilder, config)){
84             return null;
85         }
86         
87         if(!enableSsl(clusterBuilder, config)){
88             return null;
89         }
90         setLocalDc(clusterBuilder, config);
91         
92         return clusterBuilder.build();
93     }
94     
95     /**
96      * 
97      * @return
98      */
99     public Session  connect() {
100         Session session = null;
101         if (cluster != null) {
102             try {
103                 session = cluster.connect();
104                
105             } catch (Throwable e) {
106                 log.debug("Failed to connect cluster, error :",  e);
107                
108             }
109         }
110         return session;
111     }
112     
113     public Metadata getMetadata(){
114         if (cluster != null){
115             return cluster.getMetadata();
116         }
117         return null;
118     }
119     
120     private void setLocalDc(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
121         String localDataCenter = config.getLocalDataCenter();
122         if (localDataCenter != null) {
123             log.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
124                     localDataCenter);
125             LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
126                     DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
127             clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
128         } else {
129             log.info(
130                     "localDatacenter was provided,  the driver will use the datacenter of the first contact point that was reached at initialization");
131         }
132     }
133     
134     private boolean enableSsl(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
135         boolean ssl = config.isSsl();
136         if (ssl) {
137             String truststorePath = config.getTruststorePath();
138             String truststorePassword = config.getTruststorePassword();
139             if (truststorePath == null || truststorePassword == null) {
140                 log.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
141                 return false;
142             } else {
143                 System.setProperty("javax.net.ssl.trustStore", truststorePath);
144                 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
145                 clusterBuilder.withSSL();
146             }
147
148         }
149         return true;
150     }
151     
152     
153     private void setSocketOptions(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
154         SocketOptions socketOptions =new SocketOptions();
155         Integer socketConnectTimeout = config.getSocketConnectTimeout();
156         if( socketConnectTimeout!=null ){
157             log.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
158             socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
159         }
160         clusterBuilder.withSocketOptions(socketOptions);
161     }
162     
163     private boolean enableAuthentication(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
164         boolean authenticate = config.isAuthenticate();
165        
166         if (authenticate) {
167             String username = config.getUsername();
168             String password = config.getPassword();
169             if (username == null || password == null) {
170                 log.error("authentication is enabled but username or password were not supplied.");
171                 return false;
172             } else {
173                 clusterBuilder.withCredentials(username, password);
174             }
175
176         }
177         return true;
178     }
179
180     public boolean executeStatement(String statement) {
181         return executeStatement(this::createCluster, statement);
182     }
183
184     public boolean executeStatements(String ... statements) {
185         return executeStatements(this::createCluster, statements);
186     }
187
188     boolean executeStatement(Supplier<Cluster> clusterSupplier, String statement) {
189         return executeStatements(clusterSupplier, statement);
190     }
191
192     boolean executeStatements(Supplier<Cluster> clusterSupplier, String ... statements) {
193         try(Cluster cluster = clusterSupplier.get();
194                 Session session = cluster.connect()) {
195             for (String statement : statements) {
196                 session.execute(statement);
197             }
198             return true;
199         } catch (RuntimeException e) {
200             log.error("could not execute statements", e);
201         }
202         return false;
203     }
204
205     Configuration.CassandrConfig getCassandraConfig() {
206         return ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig();
207     }
208     
209     
210     public void closeCluster() {
211         if (isConnected) {
212             cluster.close();
213         }
214         log.info("** CassandraClient cluster closed");
215     }
216
217 }