1 package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
3 import com.google.common.base.Preconditions;
4 import com.google.common.base.Supplier;
5 import com.google.common.collect.ImmutableMap;
6 import com.netflix.astyanax.AstyanaxContext;
7 import com.netflix.astyanax.Cluster;
8 import com.netflix.astyanax.ColumnListMutation;
9 import com.netflix.astyanax.Keyspace;
10 import com.netflix.astyanax.MutationBatch;
11 import com.netflix.astyanax.connectionpool.Host;
12 import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
13 import com.netflix.astyanax.connectionpool.RetryBackoffStrategy;
14 import com.netflix.astyanax.connectionpool.SSLConnectionContext;
15 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
16 import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
17 import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
18 import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
19 import com.netflix.astyanax.connectionpool.impl.ExponentialRetryBackoffStrategy;
20 import com.netflix.astyanax.connectionpool.impl.SimpleAuthenticationCredentials;
21 import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
22 import com.netflix.astyanax.ddl.KeyspaceDefinition;
23 import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
24 import com.netflix.astyanax.model.ColumnFamily;
25 import com.netflix.astyanax.retry.RetryPolicy;
26 import com.netflix.astyanax.thrift.ThriftFamilyFactory;
27 import com.thinkaurelius.titan.diskstorage.BackendException;
28 import com.thinkaurelius.titan.diskstorage.Entry;
29 import com.thinkaurelius.titan.diskstorage.EntryMetaData;
30 import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
31 import com.thinkaurelius.titan.diskstorage.StaticBuffer;
32 import com.thinkaurelius.titan.diskstorage.StoreMetaData;
33 import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
34 import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
35 import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
36 import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
37 import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
38 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
39 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
40 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
41 import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
42 import org.apache.cassandra.dht.IPartitioner;
43 import org.apache.cassandra.dht.Token;
44 import org.apache.cassandra.exceptions.ConfigurationException;
45 import org.apache.cassandra.utils.FBUtilities;
46 import org.apache.commons.lang.StringUtils;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 import java.lang.reflect.Constructor;
51 import java.nio.ByteBuffer;
52 import java.util.HashMap;
53 import java.util.List;
55 import java.util.concurrent.TimeUnit;
57 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
59 @PreInitializeConfigOptions
60 public class AstyanaxStoreManager extends AbstractCassandraStoreManager {
62 private static final Logger log = LoggerFactory.getLogger(AstyanaxStoreManager.class);
64 //################### ASTYANAX SPECIFIC CONFIGURATION OPTIONS ######################
66 public static final ConfigNamespace ASTYANAX_NS =
67 new ConfigNamespace(CASSANDRA_NS, "astyanax", "Astyanax-specific Cassandra options");
70 * Default name for the Cassandra cluster
73 public static final ConfigOption<String> CLUSTER_NAME =
74 new ConfigOption<String>(ASTYANAX_NS, "cluster-name",
75 "Default name for the Cassandra cluster",
76 ConfigOption.Type.MASKABLE, "Titan Cluster");
79 * Maximum pooled connections per host.
82 public static final ConfigOption<Integer> MAX_CONNECTIONS_PER_HOST =
83 new ConfigOption<Integer>(ASTYANAX_NS, "max-connections-per-host",
84 "Maximum pooled connections per host",
85 ConfigOption.Type.MASKABLE, 32);
88 * Maximum open connections allowed in the pool (counting all hosts).
91 public static final ConfigOption<Integer> MAX_CONNECTIONS =
92 new ConfigOption<Integer>(ASTYANAX_NS, "max-connections",
93 "Maximum open connections allowed in the pool (counting all hosts)",
94 ConfigOption.Type.MASKABLE, -1);
97 * Maximum number of operations allowed per connection before the connection is closed.
100 public static final ConfigOption<Integer> MAX_OPERATIONS_PER_CONNECTION =
101 new ConfigOption<Integer>(ASTYANAX_NS, "max-operations-per-connection",
102 "Maximum number of operations allowed per connection before the connection is closed",
103 ConfigOption.Type.MASKABLE, 100 * 1000);
106 * Maximum pooled "cluster" connections per host.
108 * These connections are mostly idle and only used for DDL operations
109 * (like creating keyspaces). Titan doesn't need many of these connections
110 * in ordinary operation.
112 public static final ConfigOption<Integer> MAX_CLUSTER_CONNECTIONS_PER_HOST =
113 new ConfigOption<Integer>(ASTYANAX_NS, "max-cluster-connections-per-host",
114 "Maximum pooled \"cluster\" connections per host",
115 ConfigOption.Type.MASKABLE, 3);
118 * How Astyanax discovers Cassandra cluster nodes. This must be one of the
119 * values of the Astyanax NodeDiscoveryType enum.
122 public static final ConfigOption<String> NODE_DISCOVERY_TYPE =
123 new ConfigOption<String>(ASTYANAX_NS, "node-discovery-type",
124 "How Astyanax discovers Cassandra cluster nodes",
125 ConfigOption.Type.MASKABLE, "RING_DESCRIBE");
128 * Astyanax specific host supplier useful only when discovery type set to DISCOVERY_SERVICE or TOKEN_AWARE.
129 * Excepts fully qualified class name which extends google.common.base.Supplier<List<Host>>.
131 public static final ConfigOption<String> HOST_SUPPLIER =
132 new ConfigOption<String>(ASTYANAX_NS, "host-supplier",
133 "Host supplier to use when discovery type is set to DISCOVERY_SERVICE or TOKEN_AWARE",
134 ConfigOption.Type.MASKABLE, String.class);
137 * Astyanax's connection pooler implementation. This must be one of the
138 * values of the Astyanax ConnectionPoolType enum.
141 public static final ConfigOption<String> CONNECTION_POOL_TYPE =
142 new ConfigOption<String>(ASTYANAX_NS, "connection-pool-type",
143 "Astyanax's connection pooler implementation",
144 ConfigOption.Type.MASKABLE, "TOKEN_AWARE");
147 * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar
148 * but are used for distinct purposes. RetryPolicy is for retrying failed
149 * operations. RetryBackoffStrategy is for retrying attempts to talk to
150 * uncommunicative hosts. This config option controls RetryPolicy.
152 public static final ConfigOption<String> RETRY_POLICY =
153 new ConfigOption<String>(ASTYANAX_NS, "retry-policy",
154 "Astyanax's retry policy implementation with configuration parameters",
155 ConfigOption.Type.MASKABLE, "com.netflix.astyanax.retry.BoundedExponentialBackoff,100,25000,8");
158 * If non-null, this must be the fully-qualified classname (i.e. the
159 * complete package name, a dot, and then the class name) of an
160 * implementation of Astyanax's RetryBackoffStrategy interface. This string
161 * may be followed by a sequence of integers, separated from the full
162 * classname and from each other by commas; in this case, the integers are
163 * cast to native Java ints and passed to the class constructor as
164 * arguments. Here's an example setting that would instantiate an Astyanax
165 * FixedRetryBackoffStrategy with an delay interval of 1s and suspend time
169 * com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000
172 * If null, then Astyanax uses its default strategy, which is an
173 * ExponentialRetryBackoffStrategy instance. The instance parameters take
174 * Astyanax's built-in default values, which can be overridden via the
175 * following config keys:
177 * <li>{@link #RETRY_DELAY_SLICE}</li>
178 * <li>{@link #RETRY_MAX_DELAY_SLICE}</li>
179 * <li>{@link #RETRY_SUSPEND_WINDOW}</li>
182 * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar
183 * but are used for distinct purposes. RetryPolicy is for retrying failed
184 * operations. RetryBackoffStrategy is for retrying attempts to talk to
185 * uncommunicative hosts. This config option controls RetryBackoffStrategy.
187 public static final ConfigOption<String> RETRY_BACKOFF_STRATEGY =
188 new ConfigOption<String>(ASTYANAX_NS, "retry-backoff-strategy",
189 "Astyanax's retry backoff strategy with configuration parameters",
190 ConfigOption.Type.MASKABLE, "com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000");
193 * Controls the retryDelaySlice parameter on Astyanax
194 * ConnectionPoolConfigurationImpl objects, which is in turn used by
195 * ExponentialRetryBackoffStrategy. See the code for
196 * {@link ConnectionPoolConfigurationImpl},
197 * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
198 * {@link #RETRY_BACKOFF_STRATEGY} for more information.
200 * This parameter is not meaningful for and has no effect on
201 * FixedRetryBackoffStrategy.
203 public static final ConfigOption<Integer> RETRY_DELAY_SLICE =
204 new ConfigOption<Integer>(ASTYANAX_NS, "retry-delay-slice",
205 "Astyanax's connection pool \"retryDelaySlice\" parameter",
206 ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_DELAY_SLICE);
208 * Controls the retryMaxDelaySlice parameter on Astyanax
209 * ConnectionPoolConfigurationImpl objects, which is in turn used by
210 * ExponentialRetryBackoffStrategy. See the code for
211 * {@link ConnectionPoolConfigurationImpl},
212 * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
213 * {@link #RETRY_BACKOFF_STRATEGY} for more information.
215 * This parameter is not meaningful for and has no effect on
216 * FixedRetryBackoffStrategy.
218 public static final ConfigOption<Integer> RETRY_MAX_DELAY_SLICE =
219 new ConfigOption<Integer>(ASTYANAX_NS, "retry-max-delay-slice",
220 "Astyanax's connection pool \"retryMaxDelaySlice\" parameter",
221 ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_MAX_DELAY_SLICE);
224 * Controls the retrySuspendWindow parameter on Astyanax
225 * ConnectionPoolConfigurationImpl objects, which is in turn used by
226 * ExponentialRetryBackoffStrategy. See the code for
227 * {@link ConnectionPoolConfigurationImpl},
228 * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
229 * {@link #RETRY_BACKOFF_STRATEGY} for more information.
231 * This parameter is not meaningful for and has no effect on
232 * FixedRetryBackoffStrategy.
234 public static final ConfigOption<Integer> RETRY_SUSPEND_WINDOW =
235 new ConfigOption<Integer>(ASTYANAX_NS, "retry-suspend-window",
236 "Astyanax's connection pool \"retryMaxDelaySlice\" parameter",
237 ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_SUSPEND_WINDOW);
240 * Controls the frame size of thrift sockets created by Astyanax.
242 public static final ConfigOption<Integer> THRIFT_FRAME_SIZE =
243 new ConfigOption<Integer>(ASTYANAX_NS, "frame-size",
244 "The thrift frame size in mega bytes", ConfigOption.Type.MASKABLE, 15);
246 public static final ConfigOption<String> LOCAL_DATACENTER =
247 new ConfigOption<String>(ASTYANAX_NS, "local-datacenter",
248 "The name of the local or closest Cassandra datacenter. When set and not whitespace, " +
249 "this value will be passed into ConnectionPoolConfigurationImpl.setLocalDatacenter. " +
250 "When unset or set to whitespace, setLocalDatacenter will not be invoked.",
251 /* It's between either LOCAL or MASKABLE. MASKABLE could be useful for cases where
252 all the Titan instances are closest to the same Cassandra DC. */
253 ConfigOption.Type.MASKABLE, String.class);
255 private final String clusterName;
257 private final AstyanaxContext<Keyspace> keyspaceContext;
258 private final AstyanaxContext<Cluster> clusterContext;
260 private final RetryPolicy retryPolicy;
262 private final int retryDelaySlice;
263 private final int retryMaxDelaySlice;
264 private final int retrySuspendWindow;
265 private final RetryBackoffStrategy retryBackoffStrategy;
267 private final String localDatacenter;
269 private final Map<String, AstyanaxKeyColumnValueStore> openStores;
271 public AstyanaxStoreManager(Configuration config) throws BackendException {
274 this.clusterName = config.get(CLUSTER_NAME);
276 retryDelaySlice = config.get(RETRY_DELAY_SLICE);
277 retryMaxDelaySlice = config.get(RETRY_MAX_DELAY_SLICE);
278 retrySuspendWindow = config.get(RETRY_SUSPEND_WINDOW);
279 retryBackoffStrategy = getRetryBackoffStrategy(config.get(RETRY_BACKOFF_STRATEGY));
280 retryPolicy = getRetryPolicy(config.get(RETRY_POLICY));
282 localDatacenter = config.has(LOCAL_DATACENTER) ?
283 config.get(LOCAL_DATACENTER) : "";
285 final int maxConnsPerHost = config.get(MAX_CONNECTIONS_PER_HOST);
287 final int maxClusterConnsPerHost = config.get(MAX_CLUSTER_CONNECTIONS_PER_HOST);
289 this.clusterContext = createCluster(getContextBuilder(config, maxClusterConnsPerHost, "Cluster"));
291 ensureKeyspaceExists(clusterContext.getClient());
293 this.keyspaceContext = getContextBuilder(config, maxConnsPerHost, "Keyspace").buildKeyspace(ThriftFamilyFactory.getInstance());
294 this.keyspaceContext.start();
296 openStores = new HashMap<String, AstyanaxKeyColumnValueStore>(8);
300 public Deployment getDeployment() {
301 return Deployment.REMOTE; // TODO
305 @SuppressWarnings("unchecked")
306 public IPartitioner getCassandraPartitioner() throws BackendException {
307 Cluster cl = clusterContext.getClient();
309 return FBUtilities.newPartitioner(cl.describePartitioner());
310 } catch (ConnectionException e) {
311 throw new TemporaryBackendException(e);
312 } catch (ConfigurationException e) {
313 throw new PermanentBackendException(e);
318 public String toString() {
319 return "astyanax" + super.toString();
323 public void close() {
324 // Shutdown the Astyanax contexts
326 keyspaceContext.shutdown();
327 clusterContext.shutdown();
331 public synchronized AstyanaxKeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException {
332 if (openStores.containsKey(name)) return openStores.get(name);
334 ensureColumnFamilyExists(name);
335 AstyanaxKeyColumnValueStore store = new AstyanaxKeyColumnValueStore(name, keyspaceContext.getClient(), this, retryPolicy);
336 openStores.put(name, store);
342 public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> batch, StoreTransaction txh) throws BackendException {
343 MutationBatch m = keyspaceContext.getClient().prepareMutationBatch().withAtomicBatch(atomicBatch)
344 .setConsistencyLevel(getTx(txh).getWriteConsistencyLevel().getAstyanax())
345 .withRetryPolicy(retryPolicy.duplicate());
347 final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
349 for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> batchentry : batch.entrySet()) {
350 String storeName = batchentry.getKey();
351 Preconditions.checkArgument(openStores.containsKey(storeName), "Store cannot be found: " + storeName);
353 ColumnFamily<ByteBuffer, ByteBuffer> columnFamily = openStores.get(storeName).getColumnFamily();
355 Map<StaticBuffer, KCVMutation> mutations = batchentry.getValue();
356 for (Map.Entry<StaticBuffer, KCVMutation> ent : mutations.entrySet()) {
357 // The CLMs for additions and deletions are separated because
358 // Astyanax's operation timestamp cannot be set on a per-delete
359 // or per-addition basis.
360 KCVMutation titanMutation = ent.getValue();
361 ByteBuffer key = ent.getKey().asByteBuffer();
363 if (titanMutation.hasDeletions()) {
364 ColumnListMutation<ByteBuffer> dels = m.withRow(columnFamily, key);
365 dels.setTimestamp(commitTime.getDeletionTime(times));
367 for (StaticBuffer b : titanMutation.getDeletions())
368 dels.deleteColumn(b.as(StaticBuffer.BB_FACTORY));
371 if (titanMutation.hasAdditions()) {
372 ColumnListMutation<ByteBuffer> upds = m.withRow(columnFamily, key);
373 upds.setTimestamp(commitTime.getAdditionTime(times));
375 for (Entry e : titanMutation.getAdditions()) {
376 Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL);
378 if (null != ttl && ttl > 0) {
379 upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY), ttl);
381 upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY));
390 } catch (ConnectionException e) {
391 throw new TemporaryBackendException(e);
394 sleepAfterWrite(txh, commitTime);
398 public List<KeyRange> getLocalKeyPartition() throws BackendException {
399 throw new UnsupportedOperationException();
403 public void clearStorage() throws BackendException {
405 Cluster cluster = clusterContext.getClient();
407 Keyspace ks = cluster.getKeyspace(keySpaceName);
409 // Not a big deal if Keyspace doesn't not exist (dropped manually by user or tests).
410 // This is called on per test setup basis to make sure that previous test cleaned
411 // everything up, so first invocation would always fail as Keyspace doesn't yet exist.
415 for (ColumnFamilyDefinition cf : cluster.describeKeyspace(keySpaceName).getColumnFamilyList()) {
416 ks.truncateColumnFamily(new ColumnFamily<Object, Object>(cf.getName(), null, null));
418 } catch (ConnectionException e) {
419 throw new PermanentBackendException(e);
423 private void ensureColumnFamilyExists(String name) throws BackendException {
424 ensureColumnFamilyExists(name, "org.apache.cassandra.db.marshal.BytesType");
427 private void ensureColumnFamilyExists(String name, String comparator) throws BackendException {
428 Cluster cl = clusterContext.getClient();
430 KeyspaceDefinition ksDef = cl.describeKeyspace(keySpaceName);
431 boolean found = false;
433 for (ColumnFamilyDefinition cfDef : ksDef.getColumnFamilyList()) {
434 found |= cfDef.getName().equals(name);
438 ColumnFamilyDefinition cfDef =
439 cl.makeColumnFamilyDefinition()
441 .setKeyspace(keySpaceName)
442 .setComparatorType(comparator);
444 ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>();
446 if (compressionEnabled) {
447 compressionOptions.put("sstable_compression", compressionClass)
448 .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB));
451 cl.addColumnFamily(cfDef.setCompressionOptions(compressionOptions.build()));
453 } catch (ConnectionException e) {
454 throw new TemporaryBackendException(e);
458 private static AstyanaxContext<Cluster> createCluster(AstyanaxContext.Builder cb) {
459 AstyanaxContext<Cluster> clusterCtx = cb.buildCluster(ThriftFamilyFactory.getInstance());
465 private AstyanaxContext.Builder getContextBuilder(Configuration config, int maxConnsPerHost, String usedFor) {
467 final ConnectionPoolType poolType = ConnectionPoolType.valueOf(config.get(CONNECTION_POOL_TYPE));
469 final NodeDiscoveryType discType = NodeDiscoveryType.valueOf(config.get(NODE_DISCOVERY_TYPE));
471 final int maxConnections = config.get(MAX_CONNECTIONS);
473 final int maxOperationsPerConnection = config.get(MAX_OPERATIONS_PER_CONNECTION);
475 final int connectionTimeout = (int) connectionTimeoutMS.toMillis();
477 ConnectionPoolConfigurationImpl cpool =
478 new ConnectionPoolConfigurationImpl(usedFor + "TitanConnectionPool")
480 .setMaxOperationsPerConnection(maxOperationsPerConnection)
481 .setMaxConnsPerHost(maxConnsPerHost)
482 .setRetryDelaySlice(retryDelaySlice)
483 .setRetryMaxDelaySlice(retryMaxDelaySlice)
484 .setRetrySuspendWindow(retrySuspendWindow)
485 .setSocketTimeout(connectionTimeout)
486 .setConnectTimeout(connectionTimeout)
487 .setSeeds(StringUtils.join(hostnames, ","));
489 if (null != retryBackoffStrategy) {
490 cpool.setRetryBackoffStrategy(retryBackoffStrategy);
491 log.debug("Custom RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy());
493 log.debug("Default RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy());
496 if (StringUtils.isNotBlank(localDatacenter)) {
497 cpool.setLocalDatacenter(localDatacenter);
498 log.debug("Set local datacenter: {}", cpool.getLocalDatacenter());
501 AstyanaxConfigurationImpl aconf =
502 new AstyanaxConfigurationImpl()
503 .setConnectionPoolType(poolType)
504 .setDiscoveryType(discType)
505 .setTargetCassandraVersion("1.2")
506 .setMaxThriftSize(thriftFrameSizeBytes);
508 if (0 < maxConnections) {
509 cpool.setMaxConns(maxConnections);
512 if (hasAuthentication()) {
513 cpool.setAuthenticationCredentials(new SimpleAuthenticationCredentials(username, password));
516 if (config.get(SSL_ENABLED)) {
517 cpool.setSSLConnectionContext(new SSLConnectionContext(config.get(SSL_TRUSTSTORE_LOCATION), config.get(SSL_TRUSTSTORE_PASSWORD)));
520 AstyanaxContext.Builder ctxBuilder = new AstyanaxContext.Builder();
522 // Standard context builder options
524 .forCluster(clusterName)
525 .forKeyspace(keySpaceName)
526 .withAstyanaxConfiguration(aconf)
527 .withConnectionPoolConfiguration(cpool)
528 .withConnectionPoolMonitor(new CountingConnectionPoolMonitor());
530 // Conditional context builder option: host supplier
531 if (config.has(HOST_SUPPLIER)) {
532 String hostSupplier = config.get(HOST_SUPPLIER);
533 Supplier<List<Host>> supplier = null;
534 if (hostSupplier != null) {
536 supplier = (Supplier<List<Host>>) Class.forName(hostSupplier).newInstance();
537 ctxBuilder.withHostSupplier(supplier);
538 } catch (Exception e) {
539 log.warn("Problem with host supplier class " + hostSupplier + ", going to use default.", e);
547 private void ensureKeyspaceExists(Cluster cl) throws BackendException {
548 KeyspaceDefinition ksDef;
551 ksDef = cl.describeKeyspace(keySpaceName);
553 if (null != ksDef && ksDef.getName().equals(keySpaceName)) {
554 log.debug("Found keyspace {}", keySpaceName);
557 } catch (ConnectionException e) {
558 log.debug("Failed to describe keyspace {}", keySpaceName);
561 log.debug("Creating keyspace {}...", keySpaceName);
563 ksDef = cl.makeKeyspaceDefinition()
564 .setName(keySpaceName)
565 .setStrategyClass(storageConfig.get(REPLICATION_STRATEGY))
566 .setStrategyOptions(strategyOptions);
567 cl.addKeyspace(ksDef);
569 log.debug("Created keyspace {}", keySpaceName);
570 } catch (ConnectionException e) {
571 log.debug("Failed to create keyspace {}", keySpaceName);
572 throw new TemporaryBackendException(e);
576 private static RetryBackoffStrategy getRetryBackoffStrategy(String desc) throws PermanentBackendException {
580 String[] tokens = desc.split(",");
581 String policyClassName = tokens[0];
582 int argCount = tokens.length - 1;
583 Integer[] args = new Integer[argCount];
585 for (int i = 1; i < tokens.length; i++) {
586 args[i - 1] = Integer.valueOf(tokens[i]);
590 RetryBackoffStrategy rbs = instantiate(policyClassName, args, desc);
591 log.debug("Instantiated RetryBackoffStrategy object {} from config string \"{}\"", rbs, desc);
593 } catch (Exception e) {
594 throw new PermanentBackendException("Failed to instantiate Astyanax RetryBackoffStrategy implementation", e);
598 private static RetryPolicy getRetryPolicy(String serializedRetryPolicy) throws BackendException {
599 String[] tokens = serializedRetryPolicy.split(",");
600 String policyClassName = tokens[0];
601 int argCount = tokens.length - 1;
602 Integer[] args = new Integer[argCount];
603 for (int i = 1; i < tokens.length; i++) {
604 args[i - 1] = Integer.valueOf(tokens[i]);
608 RetryPolicy rp = instantiate(policyClassName, args, serializedRetryPolicy);
609 log.debug("Instantiated RetryPolicy object {} from config string \"{}\"", rp, serializedRetryPolicy);
611 } catch (Exception e) {
612 throw new PermanentBackendException("Failed to instantiate Astyanax Retry Policy class", e);
616 @SuppressWarnings("unchecked")
617 private static <V> V instantiate(String policyClassName, Integer[] args, String raw) throws Exception {
618 for (Constructor<?> con : Class.forName(policyClassName).getConstructors()) {
619 Class<?>[] parameterTypes = con.getParameterTypes();
621 // match constructor by number of arguments first
622 if (args.length != parameterTypes.length)
625 // check if the constructor parameter types are compatible with argument types (which are integer)
626 // note that we allow long.class arguments too because integer is cast to long by runtime.
627 boolean intsOrLongs = true;
628 for (Class<?> pc : parameterTypes) {
629 if (!pc.equals(int.class) && !pc.equals(long.class)) {
635 // we found a constructor with required number of parameters but times didn't match, let's carry on
639 if (log.isDebugEnabled())
640 log.debug("About to instantiate class {} with {} arguments", con.toString(), args.length);
642 return (V) con.newInstance(args);
645 throw new Exception("Failed to identify a class matching the Astyanax Retry Policy config string \"" + raw + "\"");
649 public Map<String, String> getCompressionOptions(String cf) throws BackendException {
651 Keyspace k = keyspaceContext.getClient();
653 KeyspaceDefinition kdef = k.describeKeyspace();
656 throw new PermanentBackendException("Keyspace " + k.getKeyspaceName() + " is undefined");
659 ColumnFamilyDefinition cfdef = kdef.getColumnFamily(cf);
662 throw new PermanentBackendException("Column family " + cf + " is undefined");
665 return cfdef.getCompressionOptions();
666 } catch (ConnectionException e) {
667 throw new PermanentBackendException(e);