2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 * Modifications copyright (c) 2018 Nokia
20 * ================================================================================
22 package org.openecomp.sdc.be.dao.cassandra.schema;
24 import com.datastax.driver.core.Cluster;
25 import com.datastax.driver.core.Metadata;
26 import com.datastax.driver.core.ProtocolVersion;
27 import com.datastax.driver.core.Session;
28 import com.datastax.driver.core.SocketOptions;
29 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
30 import com.datastax.driver.core.policies.LoadBalancingPolicy;
31 import com.datastax.driver.core.policies.TokenAwarePolicy;
32 import org.openecomp.sdc.be.config.Configuration;
33 import org.openecomp.sdc.be.config.ConfigurationManager;
34 import org.openecomp.sdc.common.log.wrappers.Logger;
36 import java.util.List;
37 import java.util.function.Supplier;
39 public class SdcSchemaUtils {
41 private static Logger log = Logger.getLogger(SdcSchemaUtils.class.getName());
42 private Cluster cluster;
43 private boolean isConnected;
47 public SdcSchemaUtils() {
51 cluster = createCluster();
53 } catch (Exception e) {
54 log.info("** CassandraClient isn't connected. error is {}", e);
57 log.info("** cluster created");
61 * the method creates the cluster object using the supplied cassandra nodes
62 * in the configuration
64 * @return cluster object our null in case of an invalid configuration
68 public Cluster createCluster() {
69 final Configuration.CassandrConfig config = getCassandraConfig();
70 List<String> nodes = config.getCassandraHosts();
71 Integer cassandraPort = config.getCassandraPort();
72 if (nodes == null || cassandraPort == null) {
73 log.info("no nodes or port were supplied in configuration.");
76 log.info("connecting to node:{} port{}.", nodes, cassandraPort);
77 Cluster.Builder clusterBuilder = Cluster.builder();
78 nodes.forEach(node -> clusterBuilder.addContactPoint(node).withPort(cassandraPort));
80 clusterBuilder.withMaxSchemaAgreementWaitSeconds(60);
82 setSocketOptions(clusterBuilder, config);
83 if(!enableAuthentication(clusterBuilder, config)){
87 if(!enableSsl(clusterBuilder, config)){
90 setLocalDc(clusterBuilder, config);
92 return clusterBuilder.build();
99 public Session connect() {
100 Session session = null;
101 if (cluster != null) {
103 session = cluster.connect();
105 } catch (Throwable e) {
106 log.debug("Failed to connect cluster, error :", e);
113 public Metadata getMetadata(){
114 if (cluster != null){
115 return cluster.getMetadata();
120 private void setLocalDc(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
121 String localDataCenter = config.getLocalDataCenter();
122 if (localDataCenter != null) {
123 log.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.",
125 LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(
126 DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
127 clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
130 "localDatacenter was provided, the driver will use the datacenter of the first contact point that was reached at initialization");
134 private boolean enableSsl(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
135 boolean ssl = config.isSsl();
137 String truststorePath = config.getTruststorePath();
138 String truststorePassword = config.getTruststorePassword();
139 if (truststorePath == null || truststorePassword == null) {
140 log.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
143 System.setProperty("javax.net.ssl.trustStore", truststorePath);
144 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
145 clusterBuilder.withSSL();
153 private void setSocketOptions(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
154 SocketOptions socketOptions =new SocketOptions();
155 Integer socketConnectTimeout = config.getSocketConnectTimeout();
156 if( socketConnectTimeout!=null ){
157 log.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout);
158 socketOptions.setConnectTimeoutMillis(socketConnectTimeout);
160 clusterBuilder.withSocketOptions(socketOptions);
163 private boolean enableAuthentication(Cluster.Builder clusterBuilder, Configuration.CassandrConfig config) {
164 boolean authenticate = config.isAuthenticate();
167 String username = config.getUsername();
168 String password = config.getPassword();
169 if (username == null || password == null) {
170 log.error("authentication is enabled but username or password were not supplied.");
173 clusterBuilder.withCredentials(username, password);
180 public boolean executeStatement(String statement) {
181 return executeStatement(this::createCluster, statement);
184 public boolean executeStatements(String ... statements) {
185 return executeStatements(this::createCluster, statements);
188 boolean executeStatement(Supplier<Cluster> clusterSupplier, String statement) {
189 return executeStatements(clusterSupplier, statement);
192 boolean executeStatements(Supplier<Cluster> clusterSupplier, String ... statements) {
193 try(Cluster cluster = clusterSupplier.get();
194 Session session = cluster.connect()) {
195 for (String statement : statements) {
196 session.execute(statement);
199 } catch (RuntimeException e) {
200 log.error("could not execute statements", e);
205 Configuration.CassandrConfig getCassandraConfig() {
206 return ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig();
210 public void closeCluster() {
214 log.info("** CassandraClient cluster closed");