first commit for new repo
[sdc/sdc-titan-cassandra.git] / src / main / java / com / thinkaurelius / titan / diskstorage / cassandra / astyanax / AstyanaxStoreManager.java
1 package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
2
3 import com.google.common.base.Preconditions;
4 import com.google.common.base.Supplier;
5 import com.google.common.collect.ImmutableMap;
6 import com.netflix.astyanax.AstyanaxContext;
7 import com.netflix.astyanax.Cluster;
8 import com.netflix.astyanax.ColumnListMutation;
9 import com.netflix.astyanax.Keyspace;
10 import com.netflix.astyanax.MutationBatch;
11 import com.netflix.astyanax.connectionpool.Host;
12 import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
13 import com.netflix.astyanax.connectionpool.RetryBackoffStrategy;
14 import com.netflix.astyanax.connectionpool.SSLConnectionContext;
15 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
16 import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
17 import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
18 import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
19 import com.netflix.astyanax.connectionpool.impl.ExponentialRetryBackoffStrategy;
20 import com.netflix.astyanax.connectionpool.impl.SimpleAuthenticationCredentials;
21 import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
22 import com.netflix.astyanax.ddl.KeyspaceDefinition;
23 import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
24 import com.netflix.astyanax.model.ColumnFamily;
25 import com.netflix.astyanax.retry.RetryPolicy;
26 import com.netflix.astyanax.thrift.ThriftFamilyFactory;
27 import com.thinkaurelius.titan.diskstorage.BackendException;
28 import com.thinkaurelius.titan.diskstorage.Entry;
29 import com.thinkaurelius.titan.diskstorage.EntryMetaData;
30 import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
31 import com.thinkaurelius.titan.diskstorage.StaticBuffer;
32 import com.thinkaurelius.titan.diskstorage.StoreMetaData;
33 import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
34 import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
35 import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
36 import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
37 import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
38 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
39 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
40 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
41 import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
42 import org.apache.cassandra.dht.IPartitioner;
43 import org.apache.cassandra.dht.Token;
44 import org.apache.cassandra.exceptions.ConfigurationException;
45 import org.apache.cassandra.utils.FBUtilities;
46 import org.apache.commons.lang.StringUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import java.lang.reflect.Constructor;
51 import java.nio.ByteBuffer;
52 import java.util.HashMap;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.concurrent.TimeUnit;
56
57 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
58
59 @PreInitializeConfigOptions
60 public class AstyanaxStoreManager extends AbstractCassandraStoreManager {
61
62     private static final Logger log = LoggerFactory.getLogger(AstyanaxStoreManager.class);
63
64     //################### ASTYANAX SPECIFIC CONFIGURATION OPTIONS ######################
65
66     public static final ConfigNamespace ASTYANAX_NS =
67             new ConfigNamespace(CASSANDRA_NS, "astyanax", "Astyanax-specific Cassandra options");
68
69     /**
70      * Default name for the Cassandra cluster
71      * <p/>
72      */
73     public static final ConfigOption<String> CLUSTER_NAME =
74             new ConfigOption<String>(ASTYANAX_NS, "cluster-name",
75             "Default name for the Cassandra cluster",
76             ConfigOption.Type.MASKABLE, "Titan Cluster");
77
78     /**
79      * Maximum pooled connections per host.
80      * <p/>
81      */
82     public static final ConfigOption<Integer> MAX_CONNECTIONS_PER_HOST =
83             new ConfigOption<Integer>(ASTYANAX_NS, "max-connections-per-host",
84             "Maximum pooled connections per host",
85             ConfigOption.Type.MASKABLE, 32);
86
87     /**
88      * Maximum open connections allowed in the pool (counting all hosts).
89      * <p/>
90      */
91     public static final ConfigOption<Integer> MAX_CONNECTIONS =
92             new ConfigOption<Integer>(ASTYANAX_NS, "max-connections",
93             "Maximum open connections allowed in the pool (counting all hosts)",
94             ConfigOption.Type.MASKABLE, -1);
95
96     /**
97      * Maximum number of operations allowed per connection before the connection is closed.
98      * <p/>
99      */
100     public static final ConfigOption<Integer> MAX_OPERATIONS_PER_CONNECTION =
101             new ConfigOption<Integer>(ASTYANAX_NS, "max-operations-per-connection",
102             "Maximum number of operations allowed per connection before the connection is closed",
103             ConfigOption.Type.MASKABLE, 100 * 1000);
104
105     /**
106      * Maximum pooled "cluster" connections per host.
107      * <p/>
108      * These connections are mostly idle and only used for DDL operations
109      * (like creating keyspaces).  Titan doesn't need many of these connections
110      * in ordinary operation.
111      */
112     public static final ConfigOption<Integer> MAX_CLUSTER_CONNECTIONS_PER_HOST =
113             new ConfigOption<Integer>(ASTYANAX_NS, "max-cluster-connections-per-host",
114             "Maximum pooled \"cluster\" connections per host",
115             ConfigOption.Type.MASKABLE, 3);
116
117     /**
118      * How Astyanax discovers Cassandra cluster nodes. This must be one of the
119      * values of the Astyanax NodeDiscoveryType enum.
120      * <p/>
121      */
122     public static final ConfigOption<String> NODE_DISCOVERY_TYPE =
123             new ConfigOption<String>(ASTYANAX_NS, "node-discovery-type",
124             "How Astyanax discovers Cassandra cluster nodes",
125             ConfigOption.Type.MASKABLE, "RING_DESCRIBE");
126
127     /**
128      * Astyanax specific host supplier useful only when discovery type set to DISCOVERY_SERVICE or TOKEN_AWARE.
129      * Excepts fully qualified class name which extends google.common.base.Supplier<List<Host>>.
130      */
131     public static final ConfigOption<String> HOST_SUPPLIER =
132             new ConfigOption<String>(ASTYANAX_NS, "host-supplier",
133             "Host supplier to use when discovery type is set to DISCOVERY_SERVICE or TOKEN_AWARE",
134             ConfigOption.Type.MASKABLE, String.class);
135
136     /**
137      * Astyanax's connection pooler implementation. This must be one of the
138      * values of the Astyanax ConnectionPoolType enum.
139      * <p/>
140      */
141     public static final ConfigOption<String> CONNECTION_POOL_TYPE =
142             new ConfigOption<String>(ASTYANAX_NS, "connection-pool-type",
143             "Astyanax's connection pooler implementation",
144             ConfigOption.Type.MASKABLE, "TOKEN_AWARE");
145
146     /**
147      * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar
148      * but are used for distinct purposes. RetryPolicy is for retrying failed
149      * operations. RetryBackoffStrategy is for retrying attempts to talk to
150      * uncommunicative hosts. This config option controls RetryPolicy.
151      */
152     public static final ConfigOption<String> RETRY_POLICY =
153             new ConfigOption<String>(ASTYANAX_NS, "retry-policy",
154             "Astyanax's retry policy implementation with configuration parameters",
155             ConfigOption.Type.MASKABLE, "com.netflix.astyanax.retry.BoundedExponentialBackoff,100,25000,8");
156
157     /**
158      * If non-null, this must be the fully-qualified classname (i.e. the
159      * complete package name, a dot, and then the class name) of an
160      * implementation of Astyanax's RetryBackoffStrategy interface. This string
161      * may be followed by a sequence of integers, separated from the full
162      * classname and from each other by commas; in this case, the integers are
163      * cast to native Java ints and passed to the class constructor as
164      * arguments. Here's an example setting that would instantiate an Astyanax
165      * FixedRetryBackoffStrategy with an delay interval of 1s and suspend time
166      * of 5s:
167      * <p/>
168      * <code>
169      * com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000
170      * </code>
171      * <p/>
172      * If null, then Astyanax uses its default strategy, which is an
173      * ExponentialRetryBackoffStrategy instance. The instance parameters take
174      * Astyanax's built-in default values, which can be overridden via the
175      * following config keys:
176      * <ul>
177      * <li>{@link #RETRY_DELAY_SLICE}</li>
178      * <li>{@link #RETRY_MAX_DELAY_SLICE}</li>
179      * <li>{@link #RETRY_SUSPEND_WINDOW}</li>
180      * </ul>
181      * <p/>
182      * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar
183      * but are used for distinct purposes. RetryPolicy is for retrying failed
184      * operations. RetryBackoffStrategy is for retrying attempts to talk to
185      * uncommunicative hosts. This config option controls RetryBackoffStrategy.
186      */
187     public static final ConfigOption<String> RETRY_BACKOFF_STRATEGY =
188             new ConfigOption<String>(ASTYANAX_NS, "retry-backoff-strategy",
189             "Astyanax's retry backoff strategy with configuration parameters",
190             ConfigOption.Type.MASKABLE, "com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000");
191
192     /**
193      * Controls the retryDelaySlice parameter on Astyanax
194      * ConnectionPoolConfigurationImpl objects, which is in turn used by
195      * ExponentialRetryBackoffStrategy. See the code for
196      * {@link ConnectionPoolConfigurationImpl},
197      * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
198      * {@link #RETRY_BACKOFF_STRATEGY} for more information.
199      * <p/>
200      * This parameter is not meaningful for and has no effect on
201      * FixedRetryBackoffStrategy.
202      */
203     public static final ConfigOption<Integer> RETRY_DELAY_SLICE =
204             new ConfigOption<Integer>(ASTYANAX_NS, "retry-delay-slice",
205             "Astyanax's connection pool \"retryDelaySlice\" parameter",
206             ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_DELAY_SLICE);
207     /**
208      * Controls the retryMaxDelaySlice parameter on Astyanax
209      * ConnectionPoolConfigurationImpl objects, which is in turn used by
210      * ExponentialRetryBackoffStrategy. See the code for
211      * {@link ConnectionPoolConfigurationImpl},
212      * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
213      * {@link #RETRY_BACKOFF_STRATEGY} for more information.
214      * <p/>
215      * This parameter is not meaningful for and has no effect on
216      * FixedRetryBackoffStrategy.
217      */
218     public static final ConfigOption<Integer> RETRY_MAX_DELAY_SLICE =
219             new ConfigOption<Integer>(ASTYANAX_NS, "retry-max-delay-slice",
220             "Astyanax's connection pool \"retryMaxDelaySlice\" parameter",
221             ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_MAX_DELAY_SLICE);
222
223     /**
224      * Controls the retrySuspendWindow parameter on Astyanax
225      * ConnectionPoolConfigurationImpl objects, which is in turn used by
226      * ExponentialRetryBackoffStrategy. See the code for
227      * {@link ConnectionPoolConfigurationImpl},
228      * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
229      * {@link #RETRY_BACKOFF_STRATEGY} for more information.
230      * <p/>
231      * This parameter is not meaningful for and has no effect on
232      * FixedRetryBackoffStrategy.
233      */
234     public static final ConfigOption<Integer> RETRY_SUSPEND_WINDOW =
235             new ConfigOption<Integer>(ASTYANAX_NS, "retry-suspend-window",
236             "Astyanax's connection pool \"retryMaxDelaySlice\" parameter",
237             ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_SUSPEND_WINDOW);
238
239     /**
240      * Controls the frame size of thrift sockets created by Astyanax.
241      */
242     public static final ConfigOption<Integer> THRIFT_FRAME_SIZE =
243             new ConfigOption<Integer>(ASTYANAX_NS, "frame-size",
244             "The thrift frame size in mega bytes", ConfigOption.Type.MASKABLE, 15);
245
246     public static final ConfigOption<String> LOCAL_DATACENTER =
247             new ConfigOption<String>(ASTYANAX_NS, "local-datacenter",
248             "The name of the local or closest Cassandra datacenter.  When set and not whitespace, " +
249             "this value will be passed into ConnectionPoolConfigurationImpl.setLocalDatacenter. " +
250             "When unset or set to whitespace, setLocalDatacenter will not be invoked.",
251             /* It's between either LOCAL or MASKABLE.  MASKABLE could be useful for cases where
252                all the Titan instances are closest to the same Cassandra DC. */
253             ConfigOption.Type.MASKABLE, String.class);
254
255     private final String clusterName;
256
257     private final AstyanaxContext<Keyspace> keyspaceContext;
258     private final AstyanaxContext<Cluster> clusterContext;
259
260     private final RetryPolicy retryPolicy;
261
262     private final int retryDelaySlice;
263     private final int retryMaxDelaySlice;
264     private final int retrySuspendWindow;
265     private final RetryBackoffStrategy retryBackoffStrategy;
266
267     private final String localDatacenter;
268
269     private final Map<String, AstyanaxKeyColumnValueStore> openStores;
270
271     public AstyanaxStoreManager(Configuration config) throws BackendException {
272         super(config);
273
274         this.clusterName = config.get(CLUSTER_NAME);
275
276         retryDelaySlice = config.get(RETRY_DELAY_SLICE);
277         retryMaxDelaySlice = config.get(RETRY_MAX_DELAY_SLICE);
278         retrySuspendWindow = config.get(RETRY_SUSPEND_WINDOW);
279         retryBackoffStrategy = getRetryBackoffStrategy(config.get(RETRY_BACKOFF_STRATEGY));
280         retryPolicy = getRetryPolicy(config.get(RETRY_POLICY));
281
282         localDatacenter = config.has(LOCAL_DATACENTER) ?
283                 config.get(LOCAL_DATACENTER) : "";
284
285         final int maxConnsPerHost = config.get(MAX_CONNECTIONS_PER_HOST);
286
287         final int maxClusterConnsPerHost = config.get(MAX_CLUSTER_CONNECTIONS_PER_HOST);
288
289         this.clusterContext = createCluster(getContextBuilder(config, maxClusterConnsPerHost, "Cluster"));
290
291         ensureKeyspaceExists(clusterContext.getClient());
292
293         this.keyspaceContext = getContextBuilder(config, maxConnsPerHost, "Keyspace").buildKeyspace(ThriftFamilyFactory.getInstance());
294         this.keyspaceContext.start();
295
296         openStores = new HashMap<String, AstyanaxKeyColumnValueStore>(8);
297     }
298
299     @Override
300     public Deployment getDeployment() {
301         return Deployment.REMOTE; // TODO
302     }
303
304     @Override
305     @SuppressWarnings("unchecked")
306     public IPartitioner getCassandraPartitioner() throws BackendException {
307         Cluster cl = clusterContext.getClient();
308         try {
309             return FBUtilities.newPartitioner(cl.describePartitioner());
310         } catch (ConnectionException e) {
311             throw new TemporaryBackendException(e);
312         } catch (ConfigurationException e) {
313             throw new PermanentBackendException(e);
314         }
315     }
316
317     @Override
318     public String toString() {
319         return "astyanax" + super.toString();
320     }
321
322     @Override
323     public void close() {
324         // Shutdown the Astyanax contexts
325         openStores.clear();
326         keyspaceContext.shutdown();
327         clusterContext.shutdown();
328     }
329
330     @Override
331     public synchronized AstyanaxKeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException {
332         if (openStores.containsKey(name)) return openStores.get(name);
333         else {
334             ensureColumnFamilyExists(name);
335             AstyanaxKeyColumnValueStore store = new AstyanaxKeyColumnValueStore(name, keyspaceContext.getClient(), this, retryPolicy);
336             openStores.put(name, store);
337             return store;
338         }
339     }
340
341     @Override
342     public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> batch, StoreTransaction txh) throws BackendException {
343         MutationBatch m = keyspaceContext.getClient().prepareMutationBatch().withAtomicBatch(atomicBatch)
344                 .setConsistencyLevel(getTx(txh).getWriteConsistencyLevel().getAstyanax())
345                 .withRetryPolicy(retryPolicy.duplicate());
346
347         final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
348
349         for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> batchentry : batch.entrySet()) {
350             String storeName = batchentry.getKey();
351             Preconditions.checkArgument(openStores.containsKey(storeName), "Store cannot be found: " + storeName);
352
353             ColumnFamily<ByteBuffer, ByteBuffer> columnFamily = openStores.get(storeName).getColumnFamily();
354
355             Map<StaticBuffer, KCVMutation> mutations = batchentry.getValue();
356             for (Map.Entry<StaticBuffer, KCVMutation> ent : mutations.entrySet()) {
357                 // The CLMs for additions and deletions are separated because
358                 // Astyanax's operation timestamp cannot be set on a per-delete
359                 // or per-addition basis.
360                 KCVMutation titanMutation = ent.getValue();
361                 ByteBuffer key = ent.getKey().asByteBuffer();
362
363                 if (titanMutation.hasDeletions()) {
364                     ColumnListMutation<ByteBuffer> dels = m.withRow(columnFamily, key);
365                     dels.setTimestamp(commitTime.getDeletionTime(times));
366
367                     for (StaticBuffer b : titanMutation.getDeletions())
368                         dels.deleteColumn(b.as(StaticBuffer.BB_FACTORY));
369                 }
370
371                 if (titanMutation.hasAdditions()) {
372                     ColumnListMutation<ByteBuffer> upds = m.withRow(columnFamily, key);
373                     upds.setTimestamp(commitTime.getAdditionTime(times));
374
375                     for (Entry e : titanMutation.getAdditions()) {
376                         Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL);
377
378                         if (null != ttl && ttl > 0) {
379                             upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY), ttl);
380                         } else {
381                             upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY));
382                         }
383                     }
384                 }
385             }
386         }
387
388         try {
389             m.execute();
390         } catch (ConnectionException e) {
391             throw new TemporaryBackendException(e);
392         }
393
394         sleepAfterWrite(txh, commitTime);
395     }
396
397     @Override
398     public List<KeyRange> getLocalKeyPartition() throws BackendException {
399         throw new UnsupportedOperationException();
400     }
401
402     @Override
403     public void clearStorage() throws BackendException {
404         try {
405             Cluster cluster = clusterContext.getClient();
406
407             Keyspace ks = cluster.getKeyspace(keySpaceName);
408
409             // Not a big deal if Keyspace doesn't not exist (dropped manually by user or tests).
410             // This is called on per test setup basis to make sure that previous test cleaned
411             // everything up, so first invocation would always fail as Keyspace doesn't yet exist.
412             if (ks == null)
413                 return;
414
415             for (ColumnFamilyDefinition cf : cluster.describeKeyspace(keySpaceName).getColumnFamilyList()) {
416                 ks.truncateColumnFamily(new ColumnFamily<Object, Object>(cf.getName(), null, null));
417             }
418         } catch (ConnectionException e) {
419             throw new PermanentBackendException(e);
420         }
421     }
422
423     private void ensureColumnFamilyExists(String name) throws BackendException {
424         ensureColumnFamilyExists(name, "org.apache.cassandra.db.marshal.BytesType");
425     }
426
427     private void ensureColumnFamilyExists(String name, String comparator) throws BackendException {
428         Cluster cl = clusterContext.getClient();
429         try {
430             KeyspaceDefinition ksDef = cl.describeKeyspace(keySpaceName);
431             boolean found = false;
432             if (null != ksDef) {
433                 for (ColumnFamilyDefinition cfDef : ksDef.getColumnFamilyList()) {
434                     found |= cfDef.getName().equals(name);
435                 }
436             }
437             if (!found) {
438                 ColumnFamilyDefinition cfDef =
439                         cl.makeColumnFamilyDefinition()
440                                 .setName(name)
441                                 .setKeyspace(keySpaceName)
442                                 .setComparatorType(comparator);
443
444                 ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>();
445
446                 if (compressionEnabled) {
447                     compressionOptions.put("sstable_compression", compressionClass)
448                             .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB));
449                 }
450
451                 cl.addColumnFamily(cfDef.setCompressionOptions(compressionOptions.build()));
452             }
453         } catch (ConnectionException e) {
454             throw new TemporaryBackendException(e);
455         }
456     }
457
458     private static AstyanaxContext<Cluster> createCluster(AstyanaxContext.Builder cb) {
459         AstyanaxContext<Cluster> clusterCtx = cb.buildCluster(ThriftFamilyFactory.getInstance());
460         clusterCtx.start();
461
462         return clusterCtx;
463     }
464
465     private AstyanaxContext.Builder getContextBuilder(Configuration config, int maxConnsPerHost, String usedFor) {
466
467         final ConnectionPoolType poolType = ConnectionPoolType.valueOf(config.get(CONNECTION_POOL_TYPE));
468
469         final NodeDiscoveryType discType = NodeDiscoveryType.valueOf(config.get(NODE_DISCOVERY_TYPE));
470
471         final int maxConnections = config.get(MAX_CONNECTIONS);
472
473         final int maxOperationsPerConnection = config.get(MAX_OPERATIONS_PER_CONNECTION);
474
475         final int connectionTimeout = (int) connectionTimeoutMS.toMillis();
476
477         ConnectionPoolConfigurationImpl cpool =
478                 new ConnectionPoolConfigurationImpl(usedFor + "TitanConnectionPool")
479                         .setPort(port)
480                         .setMaxOperationsPerConnection(maxOperationsPerConnection)
481                         .setMaxConnsPerHost(maxConnsPerHost)
482                         .setRetryDelaySlice(retryDelaySlice)
483                         .setRetryMaxDelaySlice(retryMaxDelaySlice)
484                         .setRetrySuspendWindow(retrySuspendWindow)
485                         .setSocketTimeout(connectionTimeout)
486                         .setConnectTimeout(connectionTimeout)
487                         .setSeeds(StringUtils.join(hostnames, ","));
488
489         if (null != retryBackoffStrategy) {
490             cpool.setRetryBackoffStrategy(retryBackoffStrategy);
491             log.debug("Custom RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy());
492         } else {
493             log.debug("Default RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy());
494         }
495
496         if (StringUtils.isNotBlank(localDatacenter)) {
497             cpool.setLocalDatacenter(localDatacenter);
498             log.debug("Set local datacenter: {}", cpool.getLocalDatacenter());
499         }
500
501         AstyanaxConfigurationImpl aconf =
502                 new AstyanaxConfigurationImpl()
503                         .setConnectionPoolType(poolType)
504                         .setDiscoveryType(discType)
505                         .setTargetCassandraVersion("1.2")
506                         .setMaxThriftSize(thriftFrameSizeBytes);
507
508         if (0 < maxConnections) {
509             cpool.setMaxConns(maxConnections);
510         }
511
512         if (hasAuthentication()) {
513             cpool.setAuthenticationCredentials(new SimpleAuthenticationCredentials(username, password));
514         }
515
516         if (config.get(SSL_ENABLED)) {
517             cpool.setSSLConnectionContext(new SSLConnectionContext(config.get(SSL_TRUSTSTORE_LOCATION), config.get(SSL_TRUSTSTORE_PASSWORD)));
518         }
519
520         AstyanaxContext.Builder ctxBuilder = new AstyanaxContext.Builder();
521
522         // Standard context builder options
523         ctxBuilder
524             .forCluster(clusterName)
525             .forKeyspace(keySpaceName)
526             .withAstyanaxConfiguration(aconf)
527             .withConnectionPoolConfiguration(cpool)
528             .withConnectionPoolMonitor(new CountingConnectionPoolMonitor());
529
530         // Conditional context builder option: host supplier
531         if (config.has(HOST_SUPPLIER)) {
532             String hostSupplier = config.get(HOST_SUPPLIER);
533             Supplier<List<Host>> supplier = null;
534             if (hostSupplier != null) {
535                 try {
536                     supplier = (Supplier<List<Host>>) Class.forName(hostSupplier).newInstance();
537                     ctxBuilder.withHostSupplier(supplier);
538                 } catch (Exception e) {
539                     log.warn("Problem with host supplier class " + hostSupplier + ", going to use default.", e);
540                 }
541             }
542         }
543
544         return ctxBuilder;
545     }
546
547     private void ensureKeyspaceExists(Cluster cl) throws BackendException {
548         KeyspaceDefinition ksDef;
549
550         try {
551             ksDef = cl.describeKeyspace(keySpaceName);
552
553             if (null != ksDef && ksDef.getName().equals(keySpaceName)) {
554                 log.debug("Found keyspace {}", keySpaceName);
555                 return;
556             }
557         } catch (ConnectionException e) {
558             log.debug("Failed to describe keyspace {}", keySpaceName);
559         }
560
561         log.debug("Creating keyspace {}...", keySpaceName);
562         try {
563             ksDef = cl.makeKeyspaceDefinition()
564                     .setName(keySpaceName)
565                     .setStrategyClass(storageConfig.get(REPLICATION_STRATEGY))
566                     .setStrategyOptions(strategyOptions);
567             cl.addKeyspace(ksDef);
568
569             log.debug("Created keyspace {}", keySpaceName);
570         } catch (ConnectionException e) {
571             log.debug("Failed to create keyspace {}", keySpaceName);
572             throw new TemporaryBackendException(e);
573         }
574     }
575
576     private static RetryBackoffStrategy getRetryBackoffStrategy(String desc) throws PermanentBackendException {
577         if (null == desc)
578             return null;
579
580         String[] tokens = desc.split(",");
581         String policyClassName = tokens[0];
582         int argCount = tokens.length - 1;
583         Integer[] args = new Integer[argCount];
584
585         for (int i = 1; i < tokens.length; i++) {
586             args[i - 1] = Integer.valueOf(tokens[i]);
587         }
588
589         try {
590             RetryBackoffStrategy rbs = instantiate(policyClassName, args, desc);
591             log.debug("Instantiated RetryBackoffStrategy object {} from config string \"{}\"", rbs, desc);
592             return rbs;
593         } catch (Exception e) {
594             throw new PermanentBackendException("Failed to instantiate Astyanax RetryBackoffStrategy implementation", e);
595         }
596     }
597
598     private static RetryPolicy getRetryPolicy(String serializedRetryPolicy) throws BackendException {
599         String[] tokens = serializedRetryPolicy.split(",");
600         String policyClassName = tokens[0];
601         int argCount = tokens.length - 1;
602         Integer[] args = new Integer[argCount];
603         for (int i = 1; i < tokens.length; i++) {
604             args[i - 1] = Integer.valueOf(tokens[i]);
605         }
606
607         try {
608             RetryPolicy rp = instantiate(policyClassName, args, serializedRetryPolicy);
609             log.debug("Instantiated RetryPolicy object {} from config string \"{}\"", rp, serializedRetryPolicy);
610             return rp;
611         } catch (Exception e) {
612             throw new PermanentBackendException("Failed to instantiate Astyanax Retry Policy class", e);
613         }
614     }
615
616     @SuppressWarnings("unchecked")
617     private static <V> V instantiate(String policyClassName, Integer[] args, String raw) throws Exception {
618         for (Constructor<?> con : Class.forName(policyClassName).getConstructors()) {
619             Class<?>[] parameterTypes = con.getParameterTypes();
620
621             // match constructor by number of arguments first
622             if (args.length != parameterTypes.length)
623                 continue;
624
625             // check if the constructor parameter types are compatible with argument types (which are integer)
626             // note that we allow long.class arguments too because integer is cast to long by runtime.
627             boolean intsOrLongs = true;
628             for (Class<?> pc : parameterTypes) {
629                 if (!pc.equals(int.class) && !pc.equals(long.class)) {
630                     intsOrLongs = false;
631                     break;
632                 }
633             }
634
635             // we found a constructor with required number of parameters but times didn't match, let's carry on
636             if (!intsOrLongs)
637                 continue;
638
639             if (log.isDebugEnabled())
640                 log.debug("About to instantiate class {} with {} arguments", con.toString(), args.length);
641
642             return (V) con.newInstance(args);
643         }
644
645         throw new Exception("Failed to identify a class matching the Astyanax Retry Policy config string \"" + raw + "\"");
646     }
647
648     @Override
649     public Map<String, String> getCompressionOptions(String cf) throws BackendException {
650         try {
651             Keyspace k = keyspaceContext.getClient();
652
653             KeyspaceDefinition kdef = k.describeKeyspace();
654
655             if (null == kdef) {
656                 throw new PermanentBackendException("Keyspace " + k.getKeyspaceName() + " is undefined");
657             }
658
659             ColumnFamilyDefinition cfdef = kdef.getColumnFamily(cf);
660
661             if (null == cfdef) {
662                 throw new PermanentBackendException("Column family " + cf + " is undefined");
663             }
664
665             return cfdef.getCompressionOptions();
666         } catch (ConnectionException e) {
667             throw new PermanentBackendException(e);
668         }
669     }
670 }
671
672