2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.ci.tests.utils.cassandra;
23 import com.datastax.driver.core.*;
24 import com.datastax.driver.core.policies.*;
25 import com.datastax.driver.core.querybuilder.QueryBuilder;
26 import com.datastax.driver.core.querybuilder.Select;
27 import com.datastax.driver.core.querybuilder.Select.Where;
28 import org.javatuples.Pair;
29 import org.openecomp.sdc.be.resources.data.auditing.AuditingTypesConstants;
30 import org.openecomp.sdc.ci.tests.utils.Utils;
31 import org.openecomp.sdc.common.datastructure.AuditingFieldsKey;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import java.io.FileNotFoundException;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.List;
40 public final class CassandraUtils {
41 private static Logger logger = LoggerFactory.getLogger(CassandraUtils.class.getName());
43 protected static Cluster cluster = null;
44 protected static Session session;
46 public static void initConnection(String keyspace) throws FileNotFoundException {
47 List<String> cassandraHosts = new ArrayList<>();
49 cassandraHosts.add(Utils.getConfig().getCassandraHost());
50 long reconnectTimeout = 30000;
52 logger.debug("creating cluster to hosts:{} with reconnect timeout:{}", cassandraHosts, reconnectTimeout);
53 Cluster.Builder clusterBuilder = Cluster.builder()
54 .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout))
55 .withRetryPolicy(DefaultRetryPolicy.INSTANCE);
57 cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host));
58 enableAuthentication(clusterBuilder);
59 enableSsl(clusterBuilder);
60 setLocalDc(clusterBuilder);
62 cluster = clusterBuilder.build();
63 session = cluster.connect(keyspace);
64 } catch (Exception e) {
65 logger.info("** CassandraClient isn't connected to {}", cassandraHosts);
69 private static void enableAuthentication(Cluster.Builder clusterBuilder) throws FileNotFoundException {
70 boolean authenticate = Utils.getConfig().getCassandraAuthenticate();
72 String username = Utils.getConfig().getCassandraUsername();
73 String password = Utils.getConfig().getCassandraPassword();
74 if (username == null || password == null) {
75 logger.error("authentication is enabled but username or password were not supplied.");
77 clusterBuilder.withCredentials(username, password);
83 private static void enableSsl(Cluster.Builder clusterBuilder) throws FileNotFoundException {
84 boolean ssl = Utils.getConfig().getCassandraSsl();
86 String truststorePath = Utils.getConfig().getCassandraTruststorePath();
87 String truststorePassword = Utils.getConfig().getCassandraTruststorePassword();
88 if (truststorePath == null || truststorePassword == null) {
89 logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied.");
91 System.setProperty("javax.net.ssl.trustStore", truststorePath);
92 System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword);
93 clusterBuilder.withSSL();
99 private static void setLocalDc(Cluster.Builder clusterBuilder) throws FileNotFoundException {
100 String localDataCenter = Utils.getConfig().getLocalDataCenter();
101 if (localDataCenter != null) {
102 logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.", localDataCenter);
103 LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build());
104 clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy);
106 logger.info("localDatacenter was provided, the driver will use the datacenter of the first contact point that was reached at initialization");
110 public static void truncateTable(String keyspace, String tableName) throws FileNotFoundException {
112 if (session == null || session.isClosed()) {
113 initConnection(keyspace);
116 try (Cluster cluster = CassandraUtils.cluster){
118 if (session != null) {
119 session.execute(QueryBuilder.truncate(keyspace, tableName));
120 logger.debug("The table {}.{} was cleaned", keyspace, tableName);
122 throw new RuntimeException("Keyspace " + keyspace + " not connected");
127 public static void close() {
128 if (cluster != null) {
133 public static void truncateAllKeyspaces() throws FileNotFoundException {
134 // truncateAllTables(AuditingTypesConstants.ARTIFACT_KEYSPACE);
135 truncateAllTables(AuditingTypesConstants.AUDIT_KEYSPACE);
138 public static void truncateAllTables(String keyspace) throws FileNotFoundException {
140 if (session == null || session.isClosed()) {
141 initConnection(keyspace);
145 if (session != null) {
146 Metadata metadata = cluster.getMetadata();
147 KeyspaceMetadata keyspaceMetadata = metadata.getKeyspace(keyspace);
148 if (keyspaceMetadata != null) {
149 Collection<TableMetadata> tables = keyspaceMetadata.getTables();
150 tables.forEach(table -> {
151 session.execute(QueryBuilder.truncate(table));
152 logger.debug("Table trunceted - {}", table.getName());
156 throw new RuntimeException("Keyspace " + keyspace + " not connected");
160 if (cluster != null) {
166 public static List<Row> fetchFromTable(String keyspace, String tableName, List<Pair<AuditingFieldsKey, String>> fields) throws FileNotFoundException {
168 List<Pair<String, String>> fieldsConverted = new ArrayList<>();
170 // fields.forEach(pair -> {
171 // Pair<String, String> newPair = new Pair(pair.getValue0().getDisplayName(), pair.getValue1());
172 // fieldsConverted.add(newPair);
175 fields.forEach(pair -> {
176 Pair<String, String> newPair;
177 if (pair.getValue0() == AuditingFieldsKey.AUDIT_DISTRIBUTION_RESOURCE_URL) {
178 newPair = new Pair<String, String>("RESOURE_URL", pair.getValue1());
181 newPair = new Pair<String, String>(pair.getValue0().getDisplayName(), pair.getValue1());
183 fieldsConverted.add(newPair);
187 return fetchFromTableQuery(keyspace, tableName, fieldsConverted);
190 public static List<Row> fetchFromTableQuery(String keyspace, String tableName, List<Pair<String, String>> fields)
191 throws FileNotFoundException {
193 if (session == null || session.isClosed()) {
194 initConnection(keyspace);
198 if (session != null) {
199 Select select = QueryBuilder.select().all().from(keyspace, tableName);
200 if (fields != null) {
201 // Set<Entry<AuditingFieldsKey, String>> entrySet =
202 // fields.entrySet();
204 boolean multiple = (fields.size() > 1) ? true : false;
208 for (Pair<String, String> pair : fields) {
211 where = select.where(QueryBuilder.eq(pair.getValue0(), pair.getValue1()));
213 where.and(QueryBuilder.eq(pair.getValue0(), pair.getValue1()));
217 select.allowFiltering();
222 List<Row> rows = session.execute(select).all();
223 for (Row row : rows) {
224 logger.debug("{}", row);
229 // if (cluster != null) {