1 package com.thinkaurelius.titan.diskstorage.cassandra;
5 import com.google.common.collect.ImmutableMap;
6 import com.thinkaurelius.titan.core.TitanException;
7 import com.thinkaurelius.titan.diskstorage.BackendException;
8 import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
9 import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
10 import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
11 import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
12 import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
13 import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
14 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
15 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures;
16 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
17 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
18 import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
19 import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
21 import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.*;
23 import org.apache.cassandra.dht.IPartitioner;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * @author Matthias Broecheler (me@matthiasb.com)
31 @PreInitializeConfigOptions
32 public abstract class AbstractCassandraStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager {
34 public enum Partitioner {
38 public static Partitioner getPartitioner(IPartitioner partitioner) {
39 return getPartitioner(partitioner.getClass().getSimpleName());
42 public static Partitioner getPartitioner(String className) {
43 if (className.endsWith("RandomPartitioner") || className.endsWith("Murmur3Partitioner"))
44 return Partitioner.RANDOM;
45 else if (className.endsWith("ByteOrderedPartitioner")) return Partitioner.BYTEORDER;
46 else throw new IllegalArgumentException("Unsupported partitioner: " + className);
50 //################### CASSANDRA SPECIFIC CONFIGURATION OPTIONS ######################
52 public static final ConfigNamespace CASSANDRA_NS =
53 new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "cassandra", "Cassandra storage backend options");
55 public static final ConfigOption<String> CASSANDRA_KEYSPACE =
56 new ConfigOption<String>(CASSANDRA_NS, "keyspace",
57 "The name of Titan's keyspace. It will be created if it does not exist.",
58 ConfigOption.Type.LOCAL, "titan");
60 // Consistency Levels and Atomic Batch
61 public static final ConfigOption<String> CASSANDRA_READ_CONSISTENCY =
62 new ConfigOption<String>(CASSANDRA_NS, "read-consistency-level",
63 "The consistency level of read operations against Cassandra",
64 ConfigOption.Type.MASKABLE, "LOCAL_QUORUM");
66 public static final ConfigOption<String> CASSANDRA_WRITE_CONSISTENCY =
67 new ConfigOption<String>(CASSANDRA_NS, "write-consistency-level",
68 "The consistency level of write operations against Cassandra",
69 ConfigOption.Type.MASKABLE, "LOCAL_QUORUM");
71 public static final ConfigOption<Boolean> ATOMIC_BATCH_MUTATE =
72 new ConfigOption<Boolean>(CASSANDRA_NS, "atomic-batch-mutate",
73 "True to use Cassandra atomic batch mutation, false to use non-atomic batches",
74 ConfigOption.Type.MASKABLE, true);
77 public static final ConfigOption<Integer> REPLICATION_FACTOR =
78 new ConfigOption<Integer>(CASSANDRA_NS, "replication-factor",
79 "The number of data replicas (including the original copy) that should be kept. " +
80 "This is only meaningful for storage backends that natively support data replication.",
81 ConfigOption.Type.GLOBAL_OFFLINE, 1);
83 public static final ConfigOption<String> REPLICATION_STRATEGY =
84 new ConfigOption<String>(CASSANDRA_NS, "replication-strategy-class",
85 "The replication strategy to use for Titan keyspace",
86 ConfigOption.Type.FIXED, "org.apache.cassandra.locator.SimpleStrategy");
88 public static final ConfigOption<String[]> REPLICATION_OPTIONS =
89 new ConfigOption<String[]>(CASSANDRA_NS, "replication-strategy-options",
90 "Replication strategy options, e.g. factor or replicas per datacenter. This list is interpreted as a " +
91 "map. It must have an even number of elements in [key,val,key,val,...] form. A replication_factor set " +
92 "here takes precedence over one set with " + ConfigElement.getPath(REPLICATION_FACTOR),
93 ConfigOption.Type.FIXED, String[].class);
96 public static final ConfigOption<Boolean> CF_COMPRESSION =
97 new ConfigOption<Boolean>(CASSANDRA_NS, "compression",
98 "Whether the storage backend should use compression when storing the data", ConfigOption.Type.FIXED, true);
100 public static final ConfigOption<String> CF_COMPRESSION_TYPE =
101 new ConfigOption<String>(CASSANDRA_NS, "compression-type",
102 "The sstable_compression value Titan uses when creating column families. " +
103 "This accepts any value allowed by Cassandra's sstable_compression option. " +
104 "Leave this unset to disable sstable_compression on Titan-created CFs.",
105 ConfigOption.Type.MASKABLE, "LZ4Compressor");
107 public static final ConfigOption<Integer> CF_COMPRESSION_BLOCK_SIZE =
108 new ConfigOption<Integer>(CASSANDRA_NS, "compression-block-size",
109 "The size of the compression blocks in kilobytes", ConfigOption.Type.FIXED, 64);
112 public static final ConfigNamespace SSL_NS =
113 new ConfigNamespace(CASSANDRA_NS, "ssl", "Configuration options for SSL");
115 public static final ConfigNamespace SSL_TRUSTSTORE_NS =
116 new ConfigNamespace(SSL_NS, "truststore", "Configuration options for SSL Truststore.");
118 public static final ConfigOption<Boolean> SSL_ENABLED =
119 new ConfigOption<Boolean>(SSL_NS, "enabled",
120 "Controls use of the SSL connection to Cassandra", ConfigOption.Type.LOCAL, false);
122 public static final ConfigOption<String> SSL_TRUSTSTORE_LOCATION =
123 new ConfigOption<String>(SSL_TRUSTSTORE_NS, "location",
124 "Marks the location of the SSL Truststore.", ConfigOption.Type.LOCAL, "");
126 public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD =
127 new ConfigOption<String>(SSL_TRUSTSTORE_NS, "password",
128 "The password to access SSL Truststore.", ConfigOption.Type.LOCAL, "");
131 public static final ConfigOption<Integer> THRIFT_FRAME_SIZE_MB =
132 new ConfigOption<>(CASSANDRA_NS, "frame-size-mb",
133 "The thrift frame size in megabytes", ConfigOption.Type.MASKABLE, 15);
136 * The default Thrift port used by Cassandra. Set
137 * {@link GraphDatabaseConfiguration#STORAGE_PORT} to override.
141 public static final int PORT_DEFAULT = 9160;
143 public static final String SYSTEM_KS = "system";
145 protected final String keySpaceName;
146 protected final Map<String, String> strategyOptions;
148 protected final boolean compressionEnabled;
149 protected final int compressionChunkSizeKB;
150 protected final String compressionClass;
152 protected final boolean atomicBatch;
154 protected final int thriftFrameSizeBytes;
156 private volatile StoreFeatures features = null;
157 private Partitioner partitioner = null;
159 private static final Logger log =
160 LoggerFactory.getLogger(AbstractCassandraStoreManager.class);
162 public AbstractCassandraStoreManager(Configuration config) {
163 super(config, PORT_DEFAULT);
165 this.keySpaceName = config.get(CASSANDRA_KEYSPACE);
166 this.compressionEnabled = config.get(CF_COMPRESSION);
167 this.compressionChunkSizeKB = config.get(CF_COMPRESSION_BLOCK_SIZE);
168 this.compressionClass = config.get(CF_COMPRESSION_TYPE);
169 this.atomicBatch = config.get(ATOMIC_BATCH_MUTATE);
170 this.thriftFrameSizeBytes = config.get(THRIFT_FRAME_SIZE_MB) * 1024 * 1024;
172 // SSL truststore location sanity check
173 if (config.get(SSL_ENABLED) && config.get(SSL_TRUSTSTORE_LOCATION).isEmpty())
174 throw new IllegalArgumentException(SSL_TRUSTSTORE_LOCATION.getName() + " could not be empty when SSL is enabled.");
176 if (config.has(REPLICATION_OPTIONS)) {
177 String[] options = config.get(REPLICATION_OPTIONS);
179 if (options.length % 2 != 0)
180 throw new IllegalArgumentException(REPLICATION_OPTIONS.getName() + " should have even number of elements.");
182 Map<String, String> converted = new HashMap<String, String>(options.length / 2);
184 for (int i = 0; i < options.length; i += 2) {
185 converted.put(options[i], options[i + 1]);
188 this.strategyOptions = ImmutableMap.copyOf(converted);
190 this.strategyOptions = ImmutableMap.of("replication_factor", String.valueOf(config.get(REPLICATION_FACTOR)));
194 public final Partitioner getPartitioner() {
195 if (partitioner == null) {
197 partitioner = Partitioner.getPartitioner(getCassandraPartitioner());
198 } catch (BackendException e) {
199 throw new TitanException("Could not connect to Cassandra to read partitioner information. Please check the connection", e);
202 assert partitioner != null;
206 public abstract IPartitioner getCassandraPartitioner() throws BackendException;
209 public StoreTransaction beginTransaction(final BaseTransactionConfig config) {
210 return new CassandraTransaction(config);
214 public String toString() {
215 return "[" + keySpaceName + "@" + super.toString() + "]";
219 public StoreFeatures getFeatures() {
221 if (features == null) {
223 Configuration global = GraphDatabaseConfiguration.buildGraphConfiguration()
224 .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM")
225 .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM")
226 .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT);
228 Configuration local = GraphDatabaseConfiguration.buildGraphConfiguration()
229 .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM")
230 .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM")
231 .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT);
233 StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder();
235 fb.batchMutation(true).distributed(true);
236 fb.timestamps(true).cellTTL(true);
237 fb.keyConsistent(global, local);
241 switch (getPartitioner()) {
244 fb.keyOrdered(keyOrdered).orderedScan(false).unorderedScan(true);
249 fb.keyOrdered(keyOrdered).orderedScan(true).unorderedScan(false);
253 throw new IllegalArgumentException("Unrecognized partitioner: " + getPartitioner());
256 switch (getDeployment()) {
262 fb.multiQuery(true).localKeyPartition(keyOrdered);
266 fb.multiQuery(false).localKeyPartition(keyOrdered);
270 throw new IllegalArgumentException("Unrecognized deployment mode: " + getDeployment());
273 features = fb.build();
280 * Returns a map of compression options for the column family {@code cf}.
281 * The contents of the returned map must be identical to the contents of the
283 * {@link org.apache.cassandra.thrift.CfDef#getCompression_options()}, even
284 * for implementations of this method that don't use Thrift.
286 * @param cf the name of the column family for which to return compression
288 * @return map of compression option names to compression option values
289 * @throws com.thinkaurelius.titan.diskstorage.BackendException if reading from Cassandra fails
291 public abstract Map<String, String> getCompressionOptions(String cf) throws BackendException;
293 public String getName() {
294 return getClass().getSimpleName() + keySpaceName;