first commit for new repo
[sdc/sdc-titan-cassandra.git] / src / main / java / com / thinkaurelius / titan / diskstorage / cassandra / thrift / CassandraThriftStoreManager.java
1 package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
2
3 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
4
5 import java.nio.ByteBuffer;
6 import java.util.ArrayList;
7 import java.util.HashMap;
8 import java.util.LinkedList;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.concurrent.TimeUnit;
12
13 import com.thinkaurelius.titan.diskstorage.EntryMetaData;
14 import com.thinkaurelius.titan.diskstorage.*;
15 import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
16 import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
17 import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
18 import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
19 import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
20 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
21 import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
22 import com.thinkaurelius.titan.util.system.NetworkUtil;
23
24 import org.apache.cassandra.dht.AbstractByteOrderedPartitioner;
25 import org.apache.cassandra.dht.ByteOrderedPartitioner;
26 import org.apache.cassandra.dht.IPartitioner;
27 import org.apache.cassandra.dht.Token;
28 import org.apache.cassandra.thrift.*;
29 import org.apache.cassandra.utils.FBUtilities;
30 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
31 import org.apache.thrift.TException;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import com.google.common.base.Preconditions;
36 import com.google.common.collect.ImmutableMap;
37 import com.thinkaurelius.titan.diskstorage.BackendException;
38 import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
39 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
40 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory;
41 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool;
42 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
43 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
44 import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
45
46 import static com.thinkaurelius.titan.diskstorage.configuration.ConfigOption.disallowEmpty;
47
48 /**
49  * This class creates {@see CassandraThriftKeyColumnValueStore}s and
50  * handles Cassandra-backed allocation of vertex IDs for Titan (when so
51  * configured).
52  *
53  * @author Dan LaRocque <dalaro@hopcount.org>
54  */
55 @PreInitializeConfigOptions
56 public class CassandraThriftStoreManager extends AbstractCassandraStoreManager {
57
58     public enum PoolExhaustedAction {
59         BLOCK(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK),
60         FAIL(GenericKeyedObjectPool.WHEN_EXHAUSTED_FAIL),
61         GROW(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);
62
63         private final byte b;
64
65         PoolExhaustedAction(byte b) {
66             this.b = b;
67         }
68
69         public byte getByte() {
70             return b;
71         }
72     }
73
74     private static final Logger log = LoggerFactory.getLogger(CassandraThriftStoreManager.class);
75
76     public static final ConfigNamespace THRIFT_NS =
77             new ConfigNamespace(AbstractCassandraStoreManager.CASSANDRA_NS, "thrift",
78                     "Options for Titan's own Thrift Cassandra backend");
79
80     public static final ConfigNamespace CPOOL_NS =
81             new ConfigNamespace(THRIFT_NS, "cpool", "Options for the Apache commons-pool connection manager");
82
83     public static final ConfigOption<String> CPOOL_WHEN_EXHAUSTED =
84             new ConfigOption<>(CPOOL_NS, "when-exhausted",
85             "What to do when clients concurrently request more active connections than are allowed " +
86             "by the pool.  The value must be one of BLOCK, FAIL, or GROW.",
87             ConfigOption.Type.MASKABLE, String.class, PoolExhaustedAction.BLOCK.toString(),
88             disallowEmpty(String.class));
89
90     public static final ConfigOption<Integer> CPOOL_MAX_TOTAL =
91             new ConfigOption<Integer>(CPOOL_NS, "max-total",
92             "Max number of allowed Thrift connections, idle or active (-1 to leave undefined)",
93             ConfigOption.Type.MASKABLE, -1);
94
95     public static final ConfigOption<Integer> CPOOL_MAX_ACTIVE =
96             new ConfigOption<Integer>(CPOOL_NS, "max-active",
97             "Maximum number of concurrently in-use connections (-1 to leave undefined)",
98             ConfigOption.Type.MASKABLE, 16);
99
100     public static final ConfigOption<Integer> CPOOL_MAX_IDLE =
101             new ConfigOption<Integer>(CPOOL_NS, "max-idle",
102             "Maximum number of concurrently idle connections (-1 to leave undefined)",
103             ConfigOption.Type.MASKABLE, 4);
104
105     public static final ConfigOption<Integer> CPOOL_MIN_IDLE =
106             new ConfigOption<Integer>(CPOOL_NS, "min-idle",
107             "Minimum number of idle connections the pool attempts to maintain",
108             ConfigOption.Type.MASKABLE, 0);
109
110     // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
111     public static final ConfigOption<Long> CPOOL_MAX_WAIT =
112             new ConfigOption<Long>(CPOOL_NS, "max-wait",
113             "Maximum number of milliseconds to block when " + ConfigElement.getPath(CPOOL_WHEN_EXHAUSTED) +
114             " is set to BLOCK.  Has no effect when set to actions besides BLOCK.  Set to -1 to wait indefinitely.",
115             ConfigOption.Type.MASKABLE, -1L);
116
117     // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
118     public static final ConfigOption<Long> CPOOL_EVICTOR_PERIOD =
119             new ConfigOption<Long>(CPOOL_NS, "evictor-period",
120             "Approximate number of milliseconds between runs of the idle connection evictor.  " +
121             "Set to -1 to never run the idle connection evictor.",
122             ConfigOption.Type.MASKABLE, 30L * 1000L);
123
124     // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
125     public static final ConfigOption<Long> CPOOL_MIN_EVICTABLE_IDLE_TIME =
126             new ConfigOption<Long>(CPOOL_NS, "min-evictable-idle-time",
127             "Minimum number of milliseconds a connection must be idle before it is eligible for " +
128             "eviction.  See also " + ConfigElement.getPath(CPOOL_EVICTOR_PERIOD) + ".  Set to -1 to never evict " +
129             "idle connections.", ConfigOption.Type.MASKABLE, 60L * 1000L);
130
131     public static final ConfigOption<Boolean> CPOOL_IDLE_TESTS =
132             new ConfigOption<Boolean>(CPOOL_NS, "idle-test",
133             "Whether the idle connection evictor validates idle connections and drops those that fail to validate",
134             ConfigOption.Type.MASKABLE, false);
135
136     public static final ConfigOption<Integer> CPOOL_IDLE_TESTS_PER_EVICTION_RUN =
137             new ConfigOption<Integer>(CPOOL_NS, "idle-tests-per-eviction-run",
138             "When the value is negative, e.g. -n, roughly one nth of the idle connections are tested per run.  " +
139             "When the value is positive, e.g. n, the min(idle-count, n) connections are tested per run.",
140             ConfigOption.Type.MASKABLE, 0);
141
142
143     private final Map<String, CassandraThriftKeyColumnValueStore> openStores;
144     private final CTConnectionPool pool;
145     private final Deployment deployment;
146
147     public CassandraThriftStoreManager(Configuration config) throws BackendException {
148         super(config);
149
150         /*
151          * This is eventually passed to Thrift's TSocket constructor. The
152          * constructor parameter is of type int.
153          */
154         int thriftTimeoutMS = (int)config.get(GraphDatabaseConfiguration.CONNECTION_TIMEOUT).toMillis();
155
156         CTConnectionFactory.Config factoryConfig = new CTConnectionFactory.Config(hostnames, port, username, password)
157                                                                             .setTimeoutMS(thriftTimeoutMS)
158                                                                             .setFrameSize(thriftFrameSizeBytes);
159
160         if (config.get(SSL_ENABLED)) {
161             factoryConfig.setSSLTruststoreLocation(config.get(SSL_TRUSTSTORE_LOCATION));
162             factoryConfig.setSSLTruststorePassword(config.get(SSL_TRUSTSTORE_PASSWORD));
163         }
164
165         final PoolExhaustedAction poolExhaustedAction = ConfigOption.getEnumValue(
166                 config.get(CPOOL_WHEN_EXHAUSTED), PoolExhaustedAction.class);
167
168         CTConnectionPool p = new CTConnectionPool(factoryConfig.build());
169         p.setTestOnBorrow(true);
170         p.setTestOnReturn(true);
171         p.setTestWhileIdle(config.get(CPOOL_IDLE_TESTS));
172         p.setNumTestsPerEvictionRun(config.get(CPOOL_IDLE_TESTS_PER_EVICTION_RUN));
173         p.setWhenExhaustedAction(poolExhaustedAction.getByte());
174         p.setMaxActive(config.get(CPOOL_MAX_ACTIVE));
175         p.setMaxTotal(config.get(CPOOL_MAX_TOTAL)); // maxTotal limits active + idle
176         p.setMaxIdle(config.get(CPOOL_MAX_IDLE));
177         p.setMinIdle(config.get(CPOOL_MIN_IDLE));
178         p.setMaxWait(config.get(CPOOL_MAX_WAIT));
179         p.setTimeBetweenEvictionRunsMillis(config.get(CPOOL_EVICTOR_PERIOD));
180         p.setMinEvictableIdleTimeMillis(config.get(CPOOL_MIN_EVICTABLE_IDLE_TIME));
181         this.pool = p;
182
183         this.openStores = new HashMap<String, CassandraThriftKeyColumnValueStore>();
184
185         // Only watch the ring and change endpoints with BOP
186         if (getCassandraPartitioner() instanceof ByteOrderedPartitioner) {
187             deployment = (hostnames.length == 1)// mark deployment as local only in case we have byte ordered partitioner and local connection
188                           ? (NetworkUtil.isLocalConnection(hostnames[0])) ? Deployment.LOCAL : Deployment.REMOTE
189                           : Deployment.REMOTE;
190         } else {
191             deployment = Deployment.REMOTE;
192         }
193     }
194
195     @Override
196     public Deployment getDeployment() {
197         return deployment;
198     }
199
200     @Override
201     @SuppressWarnings("unchecked")
202     public IPartitioner getCassandraPartitioner() throws BackendException {
203         CTConnection conn = null;
204         try {
205             conn = pool.borrowObject(SYSTEM_KS);
206             return FBUtilities.newPartitioner(conn.getClient().describe_partitioner());
207         } catch (Exception e) {
208             throw new TemporaryBackendException(e);
209         } finally {
210             pool.returnObjectUnsafe(SYSTEM_KS, conn);
211         }
212     }
213
214     @Override
215     public String toString() {
216         return "thriftCassandra" + super.toString();
217     }
218
219     @Override
220     public void close() throws BackendException {
221         openStores.clear();
222         closePool();
223     }
224
225     @Override
226     public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
227         Preconditions.checkNotNull(mutations);
228
229         final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
230
231         ConsistencyLevel consistency = getTx(txh).getWriteConsistencyLevel().getThrift();
232
233         // Generate Thrift-compatible batch_mutate() datastructure
234         // key -> cf -> cassmutation
235         int size = 0;
236         for (Map<StaticBuffer, KCVMutation> mutation : mutations.values()) size += mutation.size();
237         Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch =
238                 new HashMap<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>>(size);
239
240
241         for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> keyMutation : mutations.entrySet()) {
242             String columnFamily = keyMutation.getKey();
243             for (Map.Entry<StaticBuffer, KCVMutation> mutEntry : keyMutation.getValue().entrySet()) {
244                 ByteBuffer keyBB = mutEntry.getKey().asByteBuffer();
245
246                 // Get or create the single Cassandra Mutation object responsible for this key
247                 Map<String, List<org.apache.cassandra.thrift.Mutation>> cfmutation = batch.get(keyBB);
248                 if (cfmutation == null) {
249                     cfmutation = new HashMap<String, List<org.apache.cassandra.thrift.Mutation>>(3); // Most mutations only modify the edgeStore and indexStore
250                     batch.put(keyBB, cfmutation);
251                 }
252
253                 KCVMutation mutation = mutEntry.getValue();
254                 List<org.apache.cassandra.thrift.Mutation> thriftMutation =
255                         new ArrayList<org.apache.cassandra.thrift.Mutation>(mutations.size());
256
257                 if (mutation.hasDeletions()) {
258                     for (StaticBuffer buf : mutation.getDeletions()) {
259                         Deletion d = new Deletion();
260                         SlicePredicate sp = new SlicePredicate();
261                         sp.addToColumn_names(buf.as(StaticBuffer.BB_FACTORY));
262                         d.setPredicate(sp);
263                         d.setTimestamp(commitTime.getDeletionTime(times));
264                         org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
265                         m.setDeletion(d);
266                         thriftMutation.add(m);
267                     }
268                 }
269
270                 if (mutation.hasAdditions()) {
271                     for (Entry ent : mutation.getAdditions()) {
272                         ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
273                         Column column = new Column(ent.getColumnAs(StaticBuffer.BB_FACTORY));
274                         column.setValue(ent.getValueAs(StaticBuffer.BB_FACTORY));
275
276                         column.setTimestamp(commitTime.getAdditionTime(times));
277
278                         Integer ttl = (Integer) ent.getMetaData().get(EntryMetaData.TTL);
279                         if (null != ttl && ttl > 0) {
280                             column.setTtl(ttl);
281                         }
282
283                         cosc.setColumn(column);
284                         org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
285                         m.setColumn_or_supercolumn(cosc);
286                         thriftMutation.add(m);
287                     }
288                 }
289
290                 cfmutation.put(columnFamily, thriftMutation);
291             }
292         }
293
294         CTConnection conn = null;
295         try {
296             conn = pool.borrowObject(keySpaceName);
297             Cassandra.Client client = conn.getClient();
298             if (atomicBatch) {
299                 client.atomic_batch_mutate(batch, consistency);
300             } else {
301                 client.batch_mutate(batch, consistency);
302             }
303         } catch (Exception ex) {
304             throw CassandraThriftKeyColumnValueStore.convertException(ex);
305         } finally {
306             pool.returnObjectUnsafe(keySpaceName, conn);
307         }
308
309         sleepAfterWrite(txh, commitTime);
310     }
311
312     @Override // TODO: *BIG FAT WARNING* 'synchronized is always *bad*, change openStores to use ConcurrentLinkedHashMap
313     public synchronized CassandraThriftKeyColumnValueStore openDatabase(final String name, StoreMetaData.Container metaData) throws BackendException {
314         if (openStores.containsKey(name))
315             return openStores.get(name);
316
317         ensureColumnFamilyExists(keySpaceName, name);
318
319         CassandraThriftKeyColumnValueStore store = new CassandraThriftKeyColumnValueStore(keySpaceName, name, this, pool);
320         openStores.put(name, store);
321         return store;
322     }
323
324     @Override
325     public List<KeyRange> getLocalKeyPartition() throws BackendException {
326         CTConnection conn = null;
327         IPartitioner partitioner = getCassandraPartitioner();
328
329         if (!(partitioner instanceof AbstractByteOrderedPartitioner))
330             throw new UnsupportedOperationException("getLocalKeyPartition() only supported by byte ordered partitioner.");
331
332         Token.TokenFactory tokenFactory = partitioner.getTokenFactory();
333
334         try {
335             // Resist the temptation to describe SYSTEM_KS.  It has no ring.
336             // Instead, we'll create our own keyspace (or check that it exists), then describe it.
337             ensureKeyspaceExists(keySpaceName);
338
339             conn = pool.borrowObject(keySpaceName);
340             List<TokenRange> ranges  = conn.getClient().describe_ring(keySpaceName);
341             List<KeyRange> keyRanges = new ArrayList<KeyRange>(ranges.size());
342
343             for (TokenRange range : ranges) {
344                 if (!NetworkUtil.hasLocalAddress(range.endpoints))
345                     continue;
346
347                 keyRanges.add(CassandraHelper.transformRange(tokenFactory.fromString(range.start_token), tokenFactory.fromString(range.end_token)));
348             }
349
350             return keyRanges;
351         } catch (Exception e) {
352             throw CassandraThriftKeyColumnValueStore.convertException(e);
353         } finally {
354             pool.returnObjectUnsafe(keySpaceName, conn);
355         }
356     }
357
358     /**
359      * Connect to Cassandra via Thrift on the specified host and port and attempt to truncate the named keyspace.
360      * <p/>
361      * This is a utility method intended mainly for testing. It is
362      * equivalent to issuing 'truncate <cf>' for each of the column families in keyspace using
363      * the cassandra-cli tool.
364      * <p/>
365      * Using truncate is better for a number of reasons, most significantly because it doesn't
366      * involve any schema modifications which can take time to propagate across the cluster such
367      * leaves nodes in the inconsistent state and could result in read/write failures.
368      * Any schema modifications are discouraged until there is no traffic to Keyspace or ColumnFamilies.
369      *
370      * @throws com.thinkaurelius.titan.diskstorage.BackendException if any checked Thrift or UnknownHostException is thrown in the body of this method
371      */
372     public void clearStorage() throws BackendException {
373         openStores.clear();
374         final String lp = "ClearStorage: "; // "log prefix"
375         /*
376          * log4j is capable of automatically writing the name of a method that
377          * generated a log message, but the docs warn that "generating caller
378          * location information is extremely slow and should be avoided unless
379          * execution speed is not an issue."
380          */
381
382         CTConnection conn = null;
383         try {
384             conn = pool.borrowObject(SYSTEM_KS);
385             Cassandra.Client client = conn.getClient();
386
387             KsDef ksDef;
388             try {
389                 client.set_keyspace(keySpaceName);
390                 ksDef = client.describe_keyspace(keySpaceName);
391             } catch (NotFoundException e) {
392                 log.debug(lp + "Keyspace {} does not exist, not attempting to truncate.", keySpaceName);
393                 return;
394             } catch (InvalidRequestException e) {
395                 log.debug(lp + "InvalidRequestException when attempting to describe keyspace {}, not attempting to truncate.", keySpaceName);
396                 return;
397             }
398
399
400             if (null == ksDef) {
401                 log.debug(lp + "Received null KsDef for keyspace {}; not truncating its CFs", keySpaceName);
402                 return;
403             }
404
405             List<CfDef> cfDefs = ksDef.getCf_defs();
406
407             if (null == cfDefs) {
408                 log.debug(lp + "Received empty CfDef list for keyspace {}; not truncating CFs", keySpaceName);
409                 return;
410             }
411
412             for (CfDef cfDef : ksDef.getCf_defs()) {
413                 client.truncate(cfDef.name);
414                 log.info(lp + "Truncated CF {} in keyspace {}", cfDef.name, keySpaceName);
415             }
416
417             /*
418              * Clearing the CTConnectionPool is unnecessary. This method
419              * removes no keyspaces. All open Cassandra connections will
420              * remain valid.
421              */
422         } catch (Exception e) {
423             throw new TemporaryBackendException(e);
424         } finally {
425             if (conn != null && conn.getClient() != null) {
426                 try {
427                     conn.getClient().set_keyspace(SYSTEM_KS);
428                 } catch (InvalidRequestException e) {
429                     log.warn("Failed to reset keyspace", e);
430                 } catch (TException e) {
431                     log.warn("Failed to reset keyspace", e);
432                 }
433             }
434             pool.returnObjectUnsafe(SYSTEM_KS, conn);
435         }
436     }
437
438     private KsDef ensureKeyspaceExists(String keyspaceName) throws TException, BackendException {
439         CTConnection connection = null;
440
441         try {
442             connection = pool.borrowObject(SYSTEM_KS);
443             Cassandra.Client client = connection.getClient();
444
445             try {
446                 // Side effect: throws Exception if keyspaceName doesn't exist
447                 client.set_keyspace(keyspaceName); // Don't remove
448                 client.set_keyspace(SYSTEM_KS);
449                 log.debug("Found existing keyspace {}", keyspaceName);
450             } catch (InvalidRequestException e) {
451                 // Keyspace didn't exist; create it
452                 log.debug("Creating keyspace {}...", keyspaceName);
453
454                 KsDef ksdef = new KsDef().setName(keyspaceName)
455                         .setCf_defs(new LinkedList<CfDef>()) // cannot be null but can be empty
456                         .setStrategy_class(storageConfig.get(REPLICATION_STRATEGY))
457                         .setStrategy_options(strategyOptions);
458
459                 client.set_keyspace(SYSTEM_KS);
460                 try {
461                     client.system_add_keyspace(ksdef);
462                     retrySetKeyspace(keyspaceName, client);
463                     log.debug("Created keyspace {}", keyspaceName);
464                 } catch (InvalidRequestException ire) {
465                     log.error("system_add_keyspace failed for keyspace=" + keyspaceName, ire);
466                     throw ire;
467                 }
468
469             }
470
471             return client.describe_keyspace(keyspaceName);
472         } catch (Exception e) {
473             throw new TemporaryBackendException(e);
474         } finally {
475             pool.returnObjectUnsafe(SYSTEM_KS, connection);
476         }
477     }
478
479     private void retrySetKeyspace(String ksName, Cassandra.Client client) throws BackendException {
480         final long end = System.currentTimeMillis() + (60L * 1000L);
481
482         while (System.currentTimeMillis() <= end) {
483             try {
484                 client.set_keyspace(ksName);
485                 return;
486             } catch (Exception e) {
487                 log.warn("Exception when changing to keyspace {} after creating it", ksName, e);
488                 try {
489                     Thread.sleep(1000L);
490                 } catch (InterruptedException ie) {
491                     throw new PermanentBackendException("Unexpected interrupt (shutting down?)", ie);
492                 }
493             }
494         }
495
496         throw new PermanentBackendException("Could change to keyspace " + ksName + " after creating it");
497     }
498
499     private void ensureColumnFamilyExists(String ksName, String cfName) throws BackendException {
500         ensureColumnFamilyExists(ksName, cfName, "org.apache.cassandra.db.marshal.BytesType");
501     }
502
503     private void ensureColumnFamilyExists(String ksName, String cfName, String comparator) throws BackendException {
504         CTConnection conn = null;
505         try {
506             KsDef keyspaceDef = ensureKeyspaceExists(ksName);
507
508             conn = pool.borrowObject(ksName);
509             Cassandra.Client client = conn.getClient();
510
511             log.debug("Looking up metadata on keyspace {}...", ksName);
512
513             boolean foundColumnFamily = false;
514             for (CfDef cfDef : keyspaceDef.getCf_defs()) {
515                 String curCfName = cfDef.getName();
516                 if (curCfName.equals(cfName))
517                     foundColumnFamily = true;
518             }
519
520             if (!foundColumnFamily) {
521                 createColumnFamily(client, ksName, cfName, comparator);
522             } else {
523                 log.debug("Keyspace {} and ColumnFamily {} were found.", ksName, cfName);
524             }
525         } catch (SchemaDisagreementException e) {
526             throw new TemporaryBackendException(e);
527         } catch (Exception e) {
528             throw new PermanentBackendException(e);
529         } finally {
530             pool.returnObjectUnsafe(ksName, conn);
531         }
532     }
533
534     private void createColumnFamily(Cassandra.Client client,
535                                     String ksName,
536                                     String cfName,
537                                     String comparator) throws BackendException {
538
539         CfDef createColumnFamily = new CfDef();
540         createColumnFamily.setName(cfName);
541         createColumnFamily.setKeyspace(ksName);
542         createColumnFamily.setComparator_type(comparator);
543
544         ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>();
545
546         if (compressionEnabled) {
547             compressionOptions.put("sstable_compression", compressionClass)
548                     .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB));
549         }
550
551         createColumnFamily.setCompression_options(compressionOptions.build());
552
553         // Hard-coded caching settings
554         if (cfName.startsWith(Backend.EDGESTORE_NAME)) {
555             createColumnFamily.setCaching("keys_only");
556         } else if (cfName.startsWith(Backend.INDEXSTORE_NAME)) {
557             createColumnFamily.setCaching("rows_only");
558         }
559
560         log.debug("Adding column family {} to keyspace {}...", cfName, ksName);
561         try {
562             client.system_add_column_family(createColumnFamily);
563         } catch (SchemaDisagreementException e) {
564             throw new TemporaryBackendException("Error in setting up column family", e);
565         } catch (Exception e) {
566             throw new PermanentBackendException(e);
567         }
568
569         log.debug("Added column family {} to keyspace {}.", cfName, ksName);
570     }
571
572     @Override
573     public Map<String, String> getCompressionOptions(String cf) throws BackendException {
574         CTConnection conn = null;
575         Map<String, String> result = null;
576
577         try {
578             conn = pool.borrowObject(keySpaceName);
579             Cassandra.Client client = conn.getClient();
580
581             KsDef ksDef = client.describe_keyspace(keySpaceName);
582
583             for (CfDef cfDef : ksDef.getCf_defs()) {
584                 if (null != cfDef && cfDef.getName().equals(cf)) {
585                     result = cfDef.getCompression_options();
586                     break;
587                 }
588             }
589
590             return result;
591         } catch (InvalidRequestException e) {
592             log.debug("Keyspace {} does not exist", keySpaceName);
593             return null;
594         } catch (Exception e) {
595             throw new TemporaryBackendException(e);
596         } finally {
597             pool.returnObjectUnsafe(keySpaceName, conn);
598         }
599     }
600
601     private void closePool() {
602         /*
603          * pool.close() does not affect borrowed connections.
604          *
605          * Connections currently borrowed by some thread which are
606          * talking to the old host will eventually be destroyed by
607          * CTConnectionFactory#validateObject() returning false when
608          * those connections are returned to the pool.
609          */
610         try {
611             pool.close();
612             log.info("Closed Thrift connection pooler.");
613         } catch (Exception e) {
614             log.warn("Failed to close connection pooler.  "
615                     + "We might be leaking Cassandra connections.", e);
616             // There's still hope: CTConnectionFactory#validateObject()
617             // will be called on borrow() and might tear down the
618             // connections that close() failed to tear down
619         }
620     }
621 }