1 package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
3 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
5 import java.nio.ByteBuffer;
6 import java.util.ArrayList;
7 import java.util.HashMap;
8 import java.util.LinkedList;
11 import java.util.concurrent.TimeUnit;
13 import com.thinkaurelius.titan.diskstorage.EntryMetaData;
14 import com.thinkaurelius.titan.diskstorage.*;
15 import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
16 import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
17 import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
18 import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
19 import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
20 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
21 import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
22 import com.thinkaurelius.titan.util.system.NetworkUtil;
24 import org.apache.cassandra.dht.AbstractByteOrderedPartitioner;
25 import org.apache.cassandra.dht.ByteOrderedPartitioner;
26 import org.apache.cassandra.dht.IPartitioner;
27 import org.apache.cassandra.dht.Token;
28 import org.apache.cassandra.thrift.*;
29 import org.apache.cassandra.utils.FBUtilities;
30 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
31 import org.apache.thrift.TException;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import com.google.common.base.Preconditions;
36 import com.google.common.collect.ImmutableMap;
37 import com.thinkaurelius.titan.diskstorage.BackendException;
38 import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
39 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
40 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory;
41 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool;
42 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
43 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
44 import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
46 import static com.thinkaurelius.titan.diskstorage.configuration.ConfigOption.disallowEmpty;
49 * This class creates {@see CassandraThriftKeyColumnValueStore}s and
50 * handles Cassandra-backed allocation of vertex IDs for Titan (when so
53 * @author Dan LaRocque <dalaro@hopcount.org>
55 @PreInitializeConfigOptions
56 public class CassandraThriftStoreManager extends AbstractCassandraStoreManager {
58 public enum PoolExhaustedAction {
59 BLOCK(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK),
60 FAIL(GenericKeyedObjectPool.WHEN_EXHAUSTED_FAIL),
61 GROW(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);
65 PoolExhaustedAction(byte b) {
69 public byte getByte() {
74 private static final Logger log = LoggerFactory.getLogger(CassandraThriftStoreManager.class);
76 public static final ConfigNamespace THRIFT_NS =
77 new ConfigNamespace(AbstractCassandraStoreManager.CASSANDRA_NS, "thrift",
78 "Options for Titan's own Thrift Cassandra backend");
80 public static final ConfigNamespace CPOOL_NS =
81 new ConfigNamespace(THRIFT_NS, "cpool", "Options for the Apache commons-pool connection manager");
83 public static final ConfigOption<String> CPOOL_WHEN_EXHAUSTED =
84 new ConfigOption<>(CPOOL_NS, "when-exhausted",
85 "What to do when clients concurrently request more active connections than are allowed " +
86 "by the pool. The value must be one of BLOCK, FAIL, or GROW.",
87 ConfigOption.Type.MASKABLE, String.class, PoolExhaustedAction.BLOCK.toString(),
88 disallowEmpty(String.class));
90 public static final ConfigOption<Integer> CPOOL_MAX_TOTAL =
91 new ConfigOption<Integer>(CPOOL_NS, "max-total",
92 "Max number of allowed Thrift connections, idle or active (-1 to leave undefined)",
93 ConfigOption.Type.MASKABLE, -1);
95 public static final ConfigOption<Integer> CPOOL_MAX_ACTIVE =
96 new ConfigOption<Integer>(CPOOL_NS, "max-active",
97 "Maximum number of concurrently in-use connections (-1 to leave undefined)",
98 ConfigOption.Type.MASKABLE, 16);
100 public static final ConfigOption<Integer> CPOOL_MAX_IDLE =
101 new ConfigOption<Integer>(CPOOL_NS, "max-idle",
102 "Maximum number of concurrently idle connections (-1 to leave undefined)",
103 ConfigOption.Type.MASKABLE, 4);
105 public static final ConfigOption<Integer> CPOOL_MIN_IDLE =
106 new ConfigOption<Integer>(CPOOL_NS, "min-idle",
107 "Minimum number of idle connections the pool attempts to maintain",
108 ConfigOption.Type.MASKABLE, 0);
110 // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
111 public static final ConfigOption<Long> CPOOL_MAX_WAIT =
112 new ConfigOption<Long>(CPOOL_NS, "max-wait",
113 "Maximum number of milliseconds to block when " + ConfigElement.getPath(CPOOL_WHEN_EXHAUSTED) +
114 " is set to BLOCK. Has no effect when set to actions besides BLOCK. Set to -1 to wait indefinitely.",
115 ConfigOption.Type.MASKABLE, -1L);
117 // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
118 public static final ConfigOption<Long> CPOOL_EVICTOR_PERIOD =
119 new ConfigOption<Long>(CPOOL_NS, "evictor-period",
120 "Approximate number of milliseconds between runs of the idle connection evictor. " +
121 "Set to -1 to never run the idle connection evictor.",
122 ConfigOption.Type.MASKABLE, 30L * 1000L);
124 // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
125 public static final ConfigOption<Long> CPOOL_MIN_EVICTABLE_IDLE_TIME =
126 new ConfigOption<Long>(CPOOL_NS, "min-evictable-idle-time",
127 "Minimum number of milliseconds a connection must be idle before it is eligible for " +
128 "eviction. See also " + ConfigElement.getPath(CPOOL_EVICTOR_PERIOD) + ". Set to -1 to never evict " +
129 "idle connections.", ConfigOption.Type.MASKABLE, 60L * 1000L);
131 public static final ConfigOption<Boolean> CPOOL_IDLE_TESTS =
132 new ConfigOption<Boolean>(CPOOL_NS, "idle-test",
133 "Whether the idle connection evictor validates idle connections and drops those that fail to validate",
134 ConfigOption.Type.MASKABLE, false);
136 public static final ConfigOption<Integer> CPOOL_IDLE_TESTS_PER_EVICTION_RUN =
137 new ConfigOption<Integer>(CPOOL_NS, "idle-tests-per-eviction-run",
138 "When the value is negative, e.g. -n, roughly one nth of the idle connections are tested per run. " +
139 "When the value is positive, e.g. n, the min(idle-count, n) connections are tested per run.",
140 ConfigOption.Type.MASKABLE, 0);
143 private final Map<String, CassandraThriftKeyColumnValueStore> openStores;
144 private final CTConnectionPool pool;
145 private final Deployment deployment;
147 public CassandraThriftStoreManager(Configuration config) throws BackendException {
151 * This is eventually passed to Thrift's TSocket constructor. The
152 * constructor parameter is of type int.
154 int thriftTimeoutMS = (int)config.get(GraphDatabaseConfiguration.CONNECTION_TIMEOUT).toMillis();
156 CTConnectionFactory.Config factoryConfig = new CTConnectionFactory.Config(hostnames, port, username, password)
157 .setTimeoutMS(thriftTimeoutMS)
158 .setFrameSize(thriftFrameSizeBytes);
160 if (config.get(SSL_ENABLED)) {
161 factoryConfig.setSSLTruststoreLocation(config.get(SSL_TRUSTSTORE_LOCATION));
162 factoryConfig.setSSLTruststorePassword(config.get(SSL_TRUSTSTORE_PASSWORD));
165 final PoolExhaustedAction poolExhaustedAction = ConfigOption.getEnumValue(
166 config.get(CPOOL_WHEN_EXHAUSTED), PoolExhaustedAction.class);
168 CTConnectionPool p = new CTConnectionPool(factoryConfig.build());
169 p.setTestOnBorrow(true);
170 p.setTestOnReturn(true);
171 p.setTestWhileIdle(config.get(CPOOL_IDLE_TESTS));
172 p.setNumTestsPerEvictionRun(config.get(CPOOL_IDLE_TESTS_PER_EVICTION_RUN));
173 p.setWhenExhaustedAction(poolExhaustedAction.getByte());
174 p.setMaxActive(config.get(CPOOL_MAX_ACTIVE));
175 p.setMaxTotal(config.get(CPOOL_MAX_TOTAL)); // maxTotal limits active + idle
176 p.setMaxIdle(config.get(CPOOL_MAX_IDLE));
177 p.setMinIdle(config.get(CPOOL_MIN_IDLE));
178 p.setMaxWait(config.get(CPOOL_MAX_WAIT));
179 p.setTimeBetweenEvictionRunsMillis(config.get(CPOOL_EVICTOR_PERIOD));
180 p.setMinEvictableIdleTimeMillis(config.get(CPOOL_MIN_EVICTABLE_IDLE_TIME));
183 this.openStores = new HashMap<String, CassandraThriftKeyColumnValueStore>();
185 // Only watch the ring and change endpoints with BOP
186 if (getCassandraPartitioner() instanceof ByteOrderedPartitioner) {
187 deployment = (hostnames.length == 1)// mark deployment as local only in case we have byte ordered partitioner and local connection
188 ? (NetworkUtil.isLocalConnection(hostnames[0])) ? Deployment.LOCAL : Deployment.REMOTE
191 deployment = Deployment.REMOTE;
196 public Deployment getDeployment() {
201 @SuppressWarnings("unchecked")
202 public IPartitioner getCassandraPartitioner() throws BackendException {
203 CTConnection conn = null;
205 conn = pool.borrowObject(SYSTEM_KS);
206 return FBUtilities.newPartitioner(conn.getClient().describe_partitioner());
207 } catch (Exception e) {
208 throw new TemporaryBackendException(e);
210 pool.returnObjectUnsafe(SYSTEM_KS, conn);
215 public String toString() {
216 return "thriftCassandra" + super.toString();
220 public void close() throws BackendException {
226 public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
227 Preconditions.checkNotNull(mutations);
229 final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
231 ConsistencyLevel consistency = getTx(txh).getWriteConsistencyLevel().getThrift();
233 // Generate Thrift-compatible batch_mutate() datastructure
234 // key -> cf -> cassmutation
236 for (Map<StaticBuffer, KCVMutation> mutation : mutations.values()) size += mutation.size();
237 Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch =
238 new HashMap<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>>(size);
241 for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> keyMutation : mutations.entrySet()) {
242 String columnFamily = keyMutation.getKey();
243 for (Map.Entry<StaticBuffer, KCVMutation> mutEntry : keyMutation.getValue().entrySet()) {
244 ByteBuffer keyBB = mutEntry.getKey().asByteBuffer();
246 // Get or create the single Cassandra Mutation object responsible for this key
247 Map<String, List<org.apache.cassandra.thrift.Mutation>> cfmutation = batch.get(keyBB);
248 if (cfmutation == null) {
249 cfmutation = new HashMap<String, List<org.apache.cassandra.thrift.Mutation>>(3); // Most mutations only modify the edgeStore and indexStore
250 batch.put(keyBB, cfmutation);
253 KCVMutation mutation = mutEntry.getValue();
254 List<org.apache.cassandra.thrift.Mutation> thriftMutation =
255 new ArrayList<org.apache.cassandra.thrift.Mutation>(mutations.size());
257 if (mutation.hasDeletions()) {
258 for (StaticBuffer buf : mutation.getDeletions()) {
259 Deletion d = new Deletion();
260 SlicePredicate sp = new SlicePredicate();
261 sp.addToColumn_names(buf.as(StaticBuffer.BB_FACTORY));
263 d.setTimestamp(commitTime.getDeletionTime(times));
264 org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
266 thriftMutation.add(m);
270 if (mutation.hasAdditions()) {
271 for (Entry ent : mutation.getAdditions()) {
272 ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
273 Column column = new Column(ent.getColumnAs(StaticBuffer.BB_FACTORY));
274 column.setValue(ent.getValueAs(StaticBuffer.BB_FACTORY));
276 column.setTimestamp(commitTime.getAdditionTime(times));
278 Integer ttl = (Integer) ent.getMetaData().get(EntryMetaData.TTL);
279 if (null != ttl && ttl > 0) {
283 cosc.setColumn(column);
284 org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
285 m.setColumn_or_supercolumn(cosc);
286 thriftMutation.add(m);
290 cfmutation.put(columnFamily, thriftMutation);
294 CTConnection conn = null;
296 conn = pool.borrowObject(keySpaceName);
297 Cassandra.Client client = conn.getClient();
299 client.atomic_batch_mutate(batch, consistency);
301 client.batch_mutate(batch, consistency);
303 } catch (Exception ex) {
304 throw CassandraThriftKeyColumnValueStore.convertException(ex);
306 pool.returnObjectUnsafe(keySpaceName, conn);
309 sleepAfterWrite(txh, commitTime);
312 @Override // TODO: *BIG FAT WARNING* 'synchronized is always *bad*, change openStores to use ConcurrentLinkedHashMap
313 public synchronized CassandraThriftKeyColumnValueStore openDatabase(final String name, StoreMetaData.Container metaData) throws BackendException {
314 if (openStores.containsKey(name))
315 return openStores.get(name);
317 ensureColumnFamilyExists(keySpaceName, name);
319 CassandraThriftKeyColumnValueStore store = new CassandraThriftKeyColumnValueStore(keySpaceName, name, this, pool);
320 openStores.put(name, store);
325 public List<KeyRange> getLocalKeyPartition() throws BackendException {
326 CTConnection conn = null;
327 IPartitioner partitioner = getCassandraPartitioner();
329 if (!(partitioner instanceof AbstractByteOrderedPartitioner))
330 throw new UnsupportedOperationException("getLocalKeyPartition() only supported by byte ordered partitioner.");
332 Token.TokenFactory tokenFactory = partitioner.getTokenFactory();
335 // Resist the temptation to describe SYSTEM_KS. It has no ring.
336 // Instead, we'll create our own keyspace (or check that it exists), then describe it.
337 ensureKeyspaceExists(keySpaceName);
339 conn = pool.borrowObject(keySpaceName);
340 List<TokenRange> ranges = conn.getClient().describe_ring(keySpaceName);
341 List<KeyRange> keyRanges = new ArrayList<KeyRange>(ranges.size());
343 for (TokenRange range : ranges) {
344 if (!NetworkUtil.hasLocalAddress(range.endpoints))
347 keyRanges.add(CassandraHelper.transformRange(tokenFactory.fromString(range.start_token), tokenFactory.fromString(range.end_token)));
351 } catch (Exception e) {
352 throw CassandraThriftKeyColumnValueStore.convertException(e);
354 pool.returnObjectUnsafe(keySpaceName, conn);
359 * Connect to Cassandra via Thrift on the specified host and port and attempt to truncate the named keyspace.
361 * This is a utility method intended mainly for testing. It is
362 * equivalent to issuing 'truncate <cf>' for each of the column families in keyspace using
363 * the cassandra-cli tool.
365 * Using truncate is better for a number of reasons, most significantly because it doesn't
366 * involve any schema modifications which can take time to propagate across the cluster such
367 * leaves nodes in the inconsistent state and could result in read/write failures.
368 * Any schema modifications are discouraged until there is no traffic to Keyspace or ColumnFamilies.
370 * @throws com.thinkaurelius.titan.diskstorage.BackendException if any checked Thrift or UnknownHostException is thrown in the body of this method
372 public void clearStorage() throws BackendException {
374 final String lp = "ClearStorage: "; // "log prefix"
376 * log4j is capable of automatically writing the name of a method that
377 * generated a log message, but the docs warn that "generating caller
378 * location information is extremely slow and should be avoided unless
379 * execution speed is not an issue."
382 CTConnection conn = null;
384 conn = pool.borrowObject(SYSTEM_KS);
385 Cassandra.Client client = conn.getClient();
389 client.set_keyspace(keySpaceName);
390 ksDef = client.describe_keyspace(keySpaceName);
391 } catch (NotFoundException e) {
392 log.debug(lp + "Keyspace {} does not exist, not attempting to truncate.", keySpaceName);
394 } catch (InvalidRequestException e) {
395 log.debug(lp + "InvalidRequestException when attempting to describe keyspace {}, not attempting to truncate.", keySpaceName);
401 log.debug(lp + "Received null KsDef for keyspace {}; not truncating its CFs", keySpaceName);
405 List<CfDef> cfDefs = ksDef.getCf_defs();
407 if (null == cfDefs) {
408 log.debug(lp + "Received empty CfDef list for keyspace {}; not truncating CFs", keySpaceName);
412 for (CfDef cfDef : ksDef.getCf_defs()) {
413 client.truncate(cfDef.name);
414 log.info(lp + "Truncated CF {} in keyspace {}", cfDef.name, keySpaceName);
418 * Clearing the CTConnectionPool is unnecessary. This method
419 * removes no keyspaces. All open Cassandra connections will
422 } catch (Exception e) {
423 throw new TemporaryBackendException(e);
425 if (conn != null && conn.getClient() != null) {
427 conn.getClient().set_keyspace(SYSTEM_KS);
428 } catch (InvalidRequestException e) {
429 log.warn("Failed to reset keyspace", e);
430 } catch (TException e) {
431 log.warn("Failed to reset keyspace", e);
434 pool.returnObjectUnsafe(SYSTEM_KS, conn);
438 private KsDef ensureKeyspaceExists(String keyspaceName) throws TException, BackendException {
439 CTConnection connection = null;
442 connection = pool.borrowObject(SYSTEM_KS);
443 Cassandra.Client client = connection.getClient();
446 // Side effect: throws Exception if keyspaceName doesn't exist
447 client.set_keyspace(keyspaceName); // Don't remove
448 client.set_keyspace(SYSTEM_KS);
449 log.debug("Found existing keyspace {}", keyspaceName);
450 } catch (InvalidRequestException e) {
451 // Keyspace didn't exist; create it
452 log.debug("Creating keyspace {}...", keyspaceName);
454 KsDef ksdef = new KsDef().setName(keyspaceName)
455 .setCf_defs(new LinkedList<CfDef>()) // cannot be null but can be empty
456 .setStrategy_class(storageConfig.get(REPLICATION_STRATEGY))
457 .setStrategy_options(strategyOptions);
459 client.set_keyspace(SYSTEM_KS);
461 client.system_add_keyspace(ksdef);
462 retrySetKeyspace(keyspaceName, client);
463 log.debug("Created keyspace {}", keyspaceName);
464 } catch (InvalidRequestException ire) {
465 log.error("system_add_keyspace failed for keyspace=" + keyspaceName, ire);
471 return client.describe_keyspace(keyspaceName);
472 } catch (Exception e) {
473 throw new TemporaryBackendException(e);
475 pool.returnObjectUnsafe(SYSTEM_KS, connection);
479 private void retrySetKeyspace(String ksName, Cassandra.Client client) throws BackendException {
480 final long end = System.currentTimeMillis() + (60L * 1000L);
482 while (System.currentTimeMillis() <= end) {
484 client.set_keyspace(ksName);
486 } catch (Exception e) {
487 log.warn("Exception when changing to keyspace {} after creating it", ksName, e);
490 } catch (InterruptedException ie) {
491 throw new PermanentBackendException("Unexpected interrupt (shutting down?)", ie);
496 throw new PermanentBackendException("Could change to keyspace " + ksName + " after creating it");
499 private void ensureColumnFamilyExists(String ksName, String cfName) throws BackendException {
500 ensureColumnFamilyExists(ksName, cfName, "org.apache.cassandra.db.marshal.BytesType");
503 private void ensureColumnFamilyExists(String ksName, String cfName, String comparator) throws BackendException {
504 CTConnection conn = null;
506 KsDef keyspaceDef = ensureKeyspaceExists(ksName);
508 conn = pool.borrowObject(ksName);
509 Cassandra.Client client = conn.getClient();
511 log.debug("Looking up metadata on keyspace {}...", ksName);
513 boolean foundColumnFamily = false;
514 for (CfDef cfDef : keyspaceDef.getCf_defs()) {
515 String curCfName = cfDef.getName();
516 if (curCfName.equals(cfName))
517 foundColumnFamily = true;
520 if (!foundColumnFamily) {
521 createColumnFamily(client, ksName, cfName, comparator);
523 log.debug("Keyspace {} and ColumnFamily {} were found.", ksName, cfName);
525 } catch (SchemaDisagreementException e) {
526 throw new TemporaryBackendException(e);
527 } catch (Exception e) {
528 throw new PermanentBackendException(e);
530 pool.returnObjectUnsafe(ksName, conn);
534 private void createColumnFamily(Cassandra.Client client,
537 String comparator) throws BackendException {
539 CfDef createColumnFamily = new CfDef();
540 createColumnFamily.setName(cfName);
541 createColumnFamily.setKeyspace(ksName);
542 createColumnFamily.setComparator_type(comparator);
544 ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>();
546 if (compressionEnabled) {
547 compressionOptions.put("sstable_compression", compressionClass)
548 .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB));
551 createColumnFamily.setCompression_options(compressionOptions.build());
553 // Hard-coded caching settings
554 if (cfName.startsWith(Backend.EDGESTORE_NAME)) {
555 createColumnFamily.setCaching("keys_only");
556 } else if (cfName.startsWith(Backend.INDEXSTORE_NAME)) {
557 createColumnFamily.setCaching("rows_only");
560 log.debug("Adding column family {} to keyspace {}...", cfName, ksName);
562 client.system_add_column_family(createColumnFamily);
563 } catch (SchemaDisagreementException e) {
564 throw new TemporaryBackendException("Error in setting up column family", e);
565 } catch (Exception e) {
566 throw new PermanentBackendException(e);
569 log.debug("Added column family {} to keyspace {}.", cfName, ksName);
573 public Map<String, String> getCompressionOptions(String cf) throws BackendException {
574 CTConnection conn = null;
575 Map<String, String> result = null;
578 conn = pool.borrowObject(keySpaceName);
579 Cassandra.Client client = conn.getClient();
581 KsDef ksDef = client.describe_keyspace(keySpaceName);
583 for (CfDef cfDef : ksDef.getCf_defs()) {
584 if (null != cfDef && cfDef.getName().equals(cf)) {
585 result = cfDef.getCompression_options();
591 } catch (InvalidRequestException e) {
592 log.debug("Keyspace {} does not exist", keySpaceName);
594 } catch (Exception e) {
595 throw new TemporaryBackendException(e);
597 pool.returnObjectUnsafe(keySpaceName, conn);
601 private void closePool() {
603 * pool.close() does not affect borrowed connections.
605 * Connections currently borrowed by some thread which are
606 * talking to the old host will eventually be destroyed by
607 * CTConnectionFactory#validateObject() returning false when
608 * those connections are returned to the pool.
612 log.info("Closed Thrift connection pooler.");
613 } catch (Exception e) {
614 log.warn("Failed to close connection pooler. "
615 + "We might be leaking Cassandra connections.", e);
616 // There's still hope: CTConnectionFactory#validateObject()
617 // will be called on borrow() and might tear down the
618 // connections that close() failed to tear down