first commit for new repo
[sdc/sdc-titan-cassandra.git] / src / main / java / com / thinkaurelius / titan / diskstorage / cassandra / AbstractCassandraStoreManager.java
1 package com.thinkaurelius.titan.diskstorage.cassandra;
2
3 import java.util.*;
4
5 import com.google.common.collect.ImmutableMap;
6 import com.thinkaurelius.titan.core.TitanException;
7 import com.thinkaurelius.titan.diskstorage.BackendException;
8 import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
9 import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
10 import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
11 import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
12 import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
13 import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
14 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
15 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures;
16 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
17 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
18 import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
19 import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
20
21 import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.*;
22
23 import org.apache.cassandra.dht.IPartitioner;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * @author Matthias Broecheler (me@matthiasb.com)
30  */
31 @PreInitializeConfigOptions
32 public abstract class AbstractCassandraStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager {
33
34     public enum Partitioner {
35
36         RANDOM, BYTEORDER;
37
38         public static Partitioner getPartitioner(IPartitioner partitioner) {
39             return getPartitioner(partitioner.getClass().getSimpleName());
40         }
41
42         public static Partitioner getPartitioner(String className) {
43             if (className.endsWith("RandomPartitioner") || className.endsWith("Murmur3Partitioner"))
44                 return Partitioner.RANDOM;
45             else if (className.endsWith("ByteOrderedPartitioner")) return Partitioner.BYTEORDER;
46             else throw new IllegalArgumentException("Unsupported partitioner: " + className);
47         }
48     }
49
50     //################### CASSANDRA SPECIFIC CONFIGURATION OPTIONS ######################
51
52     public static final ConfigNamespace CASSANDRA_NS =
53             new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "cassandra", "Cassandra storage backend options");
54
55     public static final ConfigOption<String> CASSANDRA_KEYSPACE =
56             new ConfigOption<String>(CASSANDRA_NS, "keyspace",
57                     "The name of Titan's keyspace.  It will be created if it does not exist.",
58                     ConfigOption.Type.LOCAL, "titan");
59
60     // Consistency Levels and Atomic Batch
61     public static final ConfigOption<String> CASSANDRA_READ_CONSISTENCY =
62             new ConfigOption<String>(CASSANDRA_NS, "read-consistency-level",
63             "The consistency level of read operations against Cassandra",
64             ConfigOption.Type.MASKABLE, "LOCAL_QUORUM");
65
66     public static final ConfigOption<String> CASSANDRA_WRITE_CONSISTENCY =
67             new ConfigOption<String>(CASSANDRA_NS, "write-consistency-level",
68             "The consistency level of write operations against Cassandra",
69             ConfigOption.Type.MASKABLE, "LOCAL_QUORUM");
70
71     public static final ConfigOption<Boolean> ATOMIC_BATCH_MUTATE =
72             new ConfigOption<Boolean>(CASSANDRA_NS, "atomic-batch-mutate",
73             "True to use Cassandra atomic batch mutation, false to use non-atomic batches",
74             ConfigOption.Type.MASKABLE, true);
75
76     // Replication
77     public static final ConfigOption<Integer> REPLICATION_FACTOR =
78             new ConfigOption<Integer>(CASSANDRA_NS, "replication-factor",
79             "The number of data replicas (including the original copy) that should be kept. " +
80                     "This is only meaningful for storage backends that natively support data replication.",
81             ConfigOption.Type.GLOBAL_OFFLINE, 1);
82
83     public static final ConfigOption<String> REPLICATION_STRATEGY =
84             new ConfigOption<String>(CASSANDRA_NS, "replication-strategy-class",
85             "The replication strategy to use for Titan keyspace",
86             ConfigOption.Type.FIXED, "org.apache.cassandra.locator.SimpleStrategy");
87
88     public static final ConfigOption<String[]> REPLICATION_OPTIONS =
89             new ConfigOption<String[]>(CASSANDRA_NS, "replication-strategy-options",
90             "Replication strategy options, e.g. factor or replicas per datacenter.  This list is interpreted as a " +
91             "map.  It must have an even number of elements in [key,val,key,val,...] form.  A replication_factor set " +
92             "here takes precedence over one set with " + ConfigElement.getPath(REPLICATION_FACTOR),
93             ConfigOption.Type.FIXED, String[].class);
94
95     // Compression
96     public static final ConfigOption<Boolean> CF_COMPRESSION =
97             new ConfigOption<Boolean>(CASSANDRA_NS, "compression",
98             "Whether the storage backend should use compression when storing the data", ConfigOption.Type.FIXED, true);
99
100     public static final ConfigOption<String> CF_COMPRESSION_TYPE =
101             new ConfigOption<String>(CASSANDRA_NS, "compression-type",
102             "The sstable_compression value Titan uses when creating column families. " +
103             "This accepts any value allowed by Cassandra's sstable_compression option. " +
104             "Leave this unset to disable sstable_compression on Titan-created CFs.",
105             ConfigOption.Type.MASKABLE, "LZ4Compressor");
106
107     public static final ConfigOption<Integer> CF_COMPRESSION_BLOCK_SIZE =
108             new ConfigOption<Integer>(CASSANDRA_NS, "compression-block-size",
109             "The size of the compression blocks in kilobytes", ConfigOption.Type.FIXED, 64);
110
111     // SSL
112     public static final ConfigNamespace SSL_NS =
113             new ConfigNamespace(CASSANDRA_NS, "ssl", "Configuration options for SSL");
114
115     public static final ConfigNamespace SSL_TRUSTSTORE_NS =
116             new ConfigNamespace(SSL_NS, "truststore", "Configuration options for SSL Truststore.");
117
118     public static final ConfigOption<Boolean> SSL_ENABLED =
119             new ConfigOption<Boolean>(SSL_NS, "enabled",
120             "Controls use of the SSL connection to Cassandra", ConfigOption.Type.LOCAL, false);
121
122     public static final ConfigOption<String> SSL_TRUSTSTORE_LOCATION =
123             new ConfigOption<String>(SSL_TRUSTSTORE_NS, "location",
124             "Marks the location of the SSL Truststore.", ConfigOption.Type.LOCAL, "");
125
126     public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD =
127             new ConfigOption<String>(SSL_TRUSTSTORE_NS, "password",
128             "The password to access SSL Truststore.", ConfigOption.Type.LOCAL, "");
129
130     // Thrift transport
131     public static final ConfigOption<Integer> THRIFT_FRAME_SIZE_MB =
132             new ConfigOption<>(CASSANDRA_NS, "frame-size-mb",
133             "The thrift frame size in megabytes", ConfigOption.Type.MASKABLE, 15);
134
135     /**
136      * The default Thrift port used by Cassandra. Set
137      * {@link GraphDatabaseConfiguration#STORAGE_PORT} to override.
138      * <p>
139      * Value = {@value}
140      */
141     public static final int PORT_DEFAULT = 9160;
142
143     public static final String SYSTEM_KS = "system";
144
145     protected final String keySpaceName;
146     protected final Map<String, String> strategyOptions;
147
148     protected final boolean compressionEnabled;
149     protected final int compressionChunkSizeKB;
150     protected final String compressionClass;
151
152     protected final boolean atomicBatch;
153
154     protected final int thriftFrameSizeBytes;
155
156     private volatile StoreFeatures features = null;
157     private Partitioner partitioner = null;
158
159     private static final Logger log =
160             LoggerFactory.getLogger(AbstractCassandraStoreManager.class);
161
162     public AbstractCassandraStoreManager(Configuration config) {
163         super(config, PORT_DEFAULT);
164
165         this.keySpaceName = config.get(CASSANDRA_KEYSPACE);
166         this.compressionEnabled = config.get(CF_COMPRESSION);
167         this.compressionChunkSizeKB = config.get(CF_COMPRESSION_BLOCK_SIZE);
168         this.compressionClass = config.get(CF_COMPRESSION_TYPE);
169         this.atomicBatch = config.get(ATOMIC_BATCH_MUTATE);
170         this.thriftFrameSizeBytes = config.get(THRIFT_FRAME_SIZE_MB) * 1024 * 1024;
171
172         // SSL truststore location sanity check
173         if (config.get(SSL_ENABLED) && config.get(SSL_TRUSTSTORE_LOCATION).isEmpty())
174             throw new IllegalArgumentException(SSL_TRUSTSTORE_LOCATION.getName() + " could not be empty when SSL is enabled.");
175
176         if (config.has(REPLICATION_OPTIONS)) {
177             String[] options = config.get(REPLICATION_OPTIONS);
178
179             if (options.length % 2 != 0)
180                 throw new IllegalArgumentException(REPLICATION_OPTIONS.getName() + " should have even number of elements.");
181
182             Map<String, String> converted = new HashMap<String, String>(options.length / 2);
183
184             for (int i = 0; i < options.length; i += 2) {
185                 converted.put(options[i], options[i + 1]);
186             }
187
188             this.strategyOptions = ImmutableMap.copyOf(converted);
189         } else {
190             this.strategyOptions = ImmutableMap.of("replication_factor", String.valueOf(config.get(REPLICATION_FACTOR)));
191         }
192     }
193
194     public final Partitioner getPartitioner() {
195         if (partitioner == null) {
196             try {
197                 partitioner = Partitioner.getPartitioner(getCassandraPartitioner());
198             } catch (BackendException e) {
199                 throw new TitanException("Could not connect to Cassandra to read partitioner information. Please check the connection", e);
200             }
201         }
202         assert partitioner != null;
203         return partitioner;
204     }
205
206     public abstract IPartitioner getCassandraPartitioner() throws BackendException;
207
208     @Override
209     public StoreTransaction beginTransaction(final BaseTransactionConfig config) {
210         return new CassandraTransaction(config);
211     }
212
213     @Override
214     public String toString() {
215         return "[" + keySpaceName + "@" + super.toString() + "]";
216     }
217
218     @Override
219     public StoreFeatures getFeatures() {
220
221         if (features == null) {
222
223             Configuration global = GraphDatabaseConfiguration.buildGraphConfiguration()
224                     .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM")
225                     .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM")
226                     .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT);
227
228             Configuration local = GraphDatabaseConfiguration.buildGraphConfiguration()
229                     .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM")
230                     .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM")
231                     .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT);
232
233             StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder();
234
235             fb.batchMutation(true).distributed(true);
236             fb.timestamps(true).cellTTL(true);
237             fb.keyConsistent(global, local);
238
239             boolean keyOrdered;
240
241             switch (getPartitioner()) {
242                 case RANDOM:
243                     keyOrdered = false;
244                     fb.keyOrdered(keyOrdered).orderedScan(false).unorderedScan(true);
245                     break;
246
247                 case BYTEORDER:
248                     keyOrdered = true;
249                     fb.keyOrdered(keyOrdered).orderedScan(true).unorderedScan(false);
250                     break;
251
252                 default:
253                     throw new IllegalArgumentException("Unrecognized partitioner: " + getPartitioner());
254             }
255
256             switch (getDeployment()) {
257                 case REMOTE:
258                     fb.multiQuery(true);
259                     break;
260
261                 case LOCAL:
262                     fb.multiQuery(true).localKeyPartition(keyOrdered);
263                     break;
264
265                 case EMBEDDED:
266                     fb.multiQuery(false).localKeyPartition(keyOrdered);
267                     break;
268
269                 default:
270                     throw new IllegalArgumentException("Unrecognized deployment mode: " + getDeployment());
271             }
272
273             features = fb.build();
274         }
275
276         return features;
277     }
278
279     /**
280      * Returns a map of compression options for the column family {@code cf}.
281      * The contents of the returned map must be identical to the contents of the
282      * map returned by
283      * {@link org.apache.cassandra.thrift.CfDef#getCompression_options()}, even
284      * for implementations of this method that don't use Thrift.
285      *
286      * @param cf the name of the column family for which to return compression
287      *           options
288      * @return map of compression option names to compression option values
289      * @throws com.thinkaurelius.titan.diskstorage.BackendException if reading from Cassandra fails
290      */
291     public abstract Map<String, String> getCompressionOptions(String cf) throws BackendException;
292
293     public String getName() {
294         return getClass().getSimpleName() + keySpaceName;
295     }
296
297 }