first commit for new repo
[sdc/sdc-titan-cassandra.git] / src / main / java / com / thinkaurelius / titan / diskstorage / cassandra / embedded / CassandraEmbeddedKeyColumnValueStore.java
1 package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
2
3 import com.google.common.base.Predicate;
4 import com.google.common.collect.ImmutableMap;
5 import com.google.common.collect.Iterables;
6 import com.google.common.collect.Iterators;
7 import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
8 import com.thinkaurelius.titan.diskstorage.*;
9 import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
10 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
11 import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
12 import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
13 import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
14
15 import org.apache.cassandra.config.CFMetaData;
16 import org.apache.cassandra.config.Schema;
17 import org.apache.cassandra.db.*;
18 import org.apache.cassandra.db.ConsistencyLevel;
19 import org.apache.cassandra.db.composites.CellNames;
20 import org.apache.cassandra.db.composites.Composite;
21 import org.apache.cassandra.db.filter.IDiskAtomFilter;
22 import org.apache.cassandra.db.filter.SliceQueryFilter;
23 import org.apache.cassandra.dht.*;
24 import org.apache.cassandra.exceptions.InvalidRequestException;
25 import org.apache.cassandra.exceptions.IsBootstrappingException;
26 import org.apache.cassandra.exceptions.RequestTimeoutException;
27 import org.apache.cassandra.exceptions.UnavailableException;
28 import org.apache.cassandra.service.StorageProxy;
29 import org.apache.cassandra.service.StorageService;
30 import org.apache.cassandra.thrift.SlicePredicate;
31 import org.apache.cassandra.thrift.SliceRange;
32 import org.apache.cassandra.thrift.ThriftValidation;
33 import org.apache.commons.lang.ArrayUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import javax.annotation.Nullable;
38
39 import java.nio.ByteBuffer;
40 import java.util.*;
41 import java.util.concurrent.TimeUnit;
42
43 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
44
45 public class CassandraEmbeddedKeyColumnValueStore implements KeyColumnValueStore {
46
47     private static final Logger log = LoggerFactory.getLogger(CassandraEmbeddedKeyColumnValueStore.class);
48
49     private final String keyspace;
50     private final String columnFamily;
51     private final CassandraEmbeddedStoreManager storeManager;
52     private final TimestampProvider times;
53     private final CassandraEmbeddedGetter entryGetter;
54
55     public CassandraEmbeddedKeyColumnValueStore(
56             String keyspace,
57             String columnFamily,
58             CassandraEmbeddedStoreManager storeManager) throws RuntimeException {
59         this.keyspace = keyspace;
60         this.columnFamily = columnFamily;
61         this.storeManager = storeManager;
62         this.times = this.storeManager.getTimestampProvider();
63         entryGetter = new CassandraEmbeddedGetter(storeManager.getMetaDataSchema(columnFamily),times);
64     }
65
66     @Override
67     public void close() throws BackendException {
68     }
69
70     @Override
71     public void acquireLock(StaticBuffer key, StaticBuffer column,
72                             StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
73         throw new UnsupportedOperationException();
74     }
75
76     @Override
77     public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException {
78         IPartitioner partitioner = StorageService.getPartitioner();
79
80         // see rant about this in Astyanax implementation
81         if (partitioner instanceof RandomPartitioner || partitioner instanceof Murmur3Partitioner)
82             throw new PermanentBackendException("This operation is only supported when byte-ordered partitioner is used.");
83
84         return new RowIterator(keyRangeQuery, storeManager.getPageSize(), txh);
85     }
86
87     @Override
88     public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
89         return new RowIterator(getMinimumToken(), getMaximumToken(), query, storeManager.getPageSize(), txh);
90     }
91
92
93     /**
94      * Create a RangeSliceCommand and run it against the StorageProxy.
95      * <p>
96      * To match the behavior of the standard Cassandra thrift API endpoint, the
97      * {@code nowMillis} argument should be the number of milliseconds since the
98      * UNIX Epoch (e.g. System.currentTimeMillis() or equivalent obtained
99      * through a {@link TimestampProvider}). This is per
100      * {@link org.apache.cassandra.thrift.CassandraServer#get_range_slices(ColumnParent, SlicePredicate, KeyRange, ConsistencyLevel)},
101      * which passes the server's System.currentTimeMillis() to the
102      * {@code RangeSliceCommand} constructor.
103      */
104     private List<Row> getKeySlice(Token start,
105                                   Token end,
106                                   @Nullable SliceQuery sliceQuery,
107                                   int pageSize,
108                                   long nowMillis) throws BackendException {
109         IPartitioner partitioner = StorageService.getPartitioner();
110
111         SliceRange columnSlice = new SliceRange();
112         if (sliceQuery == null) {
113             columnSlice.setStart(ArrayUtils.EMPTY_BYTE_ARRAY)
114                     .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY)
115                     .setCount(5);
116         } else {
117             columnSlice.setStart(sliceQuery.getSliceStart().asByteBuffer())
118                     .setFinish(sliceQuery.getSliceEnd().asByteBuffer())
119                     .setCount(sliceQuery.hasLimit() ? sliceQuery.getLimit() : Integer.MAX_VALUE);
120         }
121         /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
122         SlicePredicate predicate = new SlicePredicate().setSlice_range(columnSlice);
123
124         RowPosition startPosition = start.minKeyBound(partitioner);
125         RowPosition endPosition = end.minKeyBound(partitioner);
126
127         List<Row> rows;
128
129         try {
130             CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
131             IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, cfm, null);
132
133             RangeSliceCommand cmd = new RangeSliceCommand(keyspace, columnFamily, nowMillis, filter, new Bounds<RowPosition>(startPosition, endPosition), pageSize);
134
135             rows = StorageProxy.getRangeSlice(cmd, ConsistencyLevel.QUORUM);
136         } catch (Exception e) {
137             throw new PermanentBackendException(e);
138         }
139
140         return rows;
141     }
142
143     @Override
144     public String getName() {
145         return columnFamily;
146     }
147
148     @Override
149     public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
150
151         /**
152          * This timestamp mimics the timestamp used by
153          * {@link org.apache.cassandra.thrift.CassandraServer#get(ByteBuffer,ColumnPath,ConsistencyLevel)}.
154          *
155          * That method passes the server's System.currentTimeMillis() to
156          * {@link ReadCommand#create(String, ByteBuffer, String, long, IDiskAtomFilter)}.
157          * {@code create(...)} in turn passes that timestamp to the SliceFromReadCommand constructor.
158          */
159         final long nowMillis = times.getTime().toEpochMilli();
160         Composite startComposite = CellNames.simpleDense(query.getSliceStart().asByteBuffer());
161         Composite endComposite = CellNames.simpleDense(query.getSliceEnd().asByteBuffer());
162         SliceQueryFilter sqf = new SliceQueryFilter(startComposite, endComposite,
163                 false, query.getLimit() + (query.hasLimit()?1:0));
164         ReadCommand sliceCmd = new SliceFromReadCommand(keyspace, query.getKey().asByteBuffer(), columnFamily, nowMillis, sqf);
165
166         List<Row> slice = read(sliceCmd, getTx(txh).getReadConsistencyLevel().getDB());
167
168         if (null == slice || 0 == slice.size())
169             return EntryList.EMPTY_LIST;
170
171         int sliceSize = slice.size();
172         if (1 < sliceSize)
173             throw new PermanentBackendException("Received " + sliceSize + " rows for single key");
174
175         Row r = slice.get(0);
176
177         if (null == r) {
178             log.warn("Null Row object retrieved from Cassandra StorageProxy");
179             return EntryList.EMPTY_LIST;
180         }
181
182         ColumnFamily cf = r.cf;
183
184         if (null == cf) {
185             log.debug("null ColumnFamily (\"{}\")", columnFamily);
186             return EntryList.EMPTY_LIST;
187         }
188
189         if (cf.isMarkedForDelete())
190             return EntryList.EMPTY_LIST;
191
192         return CassandraHelper.makeEntryList(
193                 Iterables.filter(cf.getSortedColumns(), new FilterDeletedColumns(nowMillis)),
194                 entryGetter,
195                 query.getSliceEnd(),
196                 query.getLimit());
197
198     }
199
200     private class FilterDeletedColumns implements Predicate<Cell> {
201
202         private final long tsMillis;
203         private final int tsSeconds;
204
205         private FilterDeletedColumns(long tsMillis) {
206             this.tsMillis = tsMillis;
207             this.tsSeconds = (int)(this.tsMillis / 1000L);
208         }
209
210         @Override
211         public boolean apply(Cell input) {
212             if (!input.isLive(tsMillis))
213                 return false;
214
215             // Don't do this.  getTimeToLive() is a duration divorced from any particular clock.
216             // For instance, if TTL=10 seconds, getTimeToLive() will have value 10 (not 10 + epoch seconds), and
217             // this will always return false.
218             //if (input instanceof ExpiringCell)
219             //    return tsSeconds < ((ExpiringCell)input).getTimeToLive();
220
221             return true;
222         }
223     }
224
225     @Override
226     public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
227         throw new UnsupportedOperationException();
228     }
229
230     @Override
231     public void mutate(StaticBuffer key, List<Entry> additions,
232                        List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
233         Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new
234                 KCVMutation(additions, deletions));
235         mutateMany(mutations, txh);
236     }
237
238
239     public void mutateMany(Map<StaticBuffer, KCVMutation> mutations,
240                            StoreTransaction txh) throws BackendException {
241         storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh);
242     }
243
244     private static List<Row> read(ReadCommand cmd, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException {
245         ArrayList<ReadCommand> cmdHolder = new ArrayList<ReadCommand>(1);
246         cmdHolder.add(cmd);
247         return read(cmdHolder, clvl);
248     }
249
250     private static List<Row> read(List<ReadCommand> cmds, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException {
251         try {
252             return StorageProxy.read(cmds, clvl);
253         } catch (UnavailableException e) {
254             throw new TemporaryBackendException(e);
255         } catch (RequestTimeoutException e) {
256             throw new PermanentBackendException(e);
257         } catch (IsBootstrappingException e) {
258             throw new TemporaryBackendException(e);
259         } catch (InvalidRequestException e) {
260             throw new PermanentBackendException(e);
261         }
262     }
263
264     private static class CassandraEmbeddedGetter implements StaticArrayEntry.GetColVal<Cell,ByteBuffer> {
265
266         private final EntryMetaData[] schema;
267         private final TimestampProvider times;
268
269         private CassandraEmbeddedGetter(EntryMetaData[] schema, TimestampProvider times) {
270             this.schema = schema;
271             this.times = times;
272         }
273
274         @Override
275         public ByteBuffer getColumn(Cell element) {
276             return org.apache.cassandra.utils.ByteBufferUtil.clone(element.name().toByteBuffer());
277         }
278
279         @Override
280         public ByteBuffer getValue(Cell element) {
281             return org.apache.cassandra.utils.ByteBufferUtil.clone(element.value());
282         }
283
284         @Override
285         public EntryMetaData[] getMetaSchema(Cell element) {
286             return schema;
287         }
288
289         @Override
290         public Object getMetaData(Cell element, EntryMetaData meta) {
291             switch (meta) {
292                 case TIMESTAMP:
293                     return element.timestamp();
294                 case TTL:
295                     return ((element instanceof ExpiringCell)
296                                     ? ((ExpiringCell) element).getTimeToLive()
297                                     : 0);
298                 default:
299                     throw new UnsupportedOperationException("Unsupported meta data: " + meta);
300             }
301         }
302     }
303
304     private class RowIterator implements KeyIterator {
305         private final Token maximumToken;
306         private final SliceQuery sliceQuery;
307         private final StoreTransaction txh;
308
309         /**
310          * This RowIterator will use this timestamp for its entire lifetime,
311          * even if the iterator runs more than one distinct slice query while
312          * paging. <b>This field must be in units of milliseconds since
313          * the UNIX Epoch</b>.
314          * <p>
315          * This timestamp is passed to three methods/constructors:
316          * <ul>
317          *  <li>{@link org.apache.cassandra.db.Column#isMarkedForDelete(long now)}</li>
318          *  <li>{@link org.apache.cassandra.db.ColumnFamily#hasOnlyTombstones(long)}</li>
319          *  <li>
320          *   the {@link RangeSliceCommand} constructor via the last argument
321          *   to {@link CassandraEmbeddedKeyColumnValueStore#getKeySlice(Token, Token, SliceQuery, int, long)}
322          *  </li>
323          * </ul>
324          * The second list entry just calls the first and almost doesn't deserve
325          * a mention at present, but maybe the implementation will change in the future.
326          * <p>
327          * When this value needs to be compared to TTL seconds expressed in seconds,
328          * Cassandra internals do the conversion.
329          * Consider {@link ExpiringColumn#isMarkedForDelete(long)}, which is implemented,
330          * as of 2.0.6, by the following one-liner:
331          * <p>
332          * {@code return (int) (now / 1000) >= getLocalDeletionTime()}
333          * <p>
334          * The {@code now / 1000} does the conversion from milliseconds to seconds
335          * (the units of getLocalDeletionTime()).
336          */
337         private final long nowMillis;
338
339         private Iterator<Row> keys;
340         private ByteBuffer lastSeenKey = null;
341         private Row currentRow;
342         private int pageSize;
343
344         private boolean isClosed;
345
346         public RowIterator(KeyRangeQuery keyRangeQuery, int pageSize, StoreTransaction txh) throws BackendException {
347             this(StorageService.getPartitioner().getToken(keyRangeQuery.getKeyStart().asByteBuffer()),
348                     StorageService.getPartitioner().getToken(keyRangeQuery.getKeyEnd().asByteBuffer()),
349                     keyRangeQuery,
350                     pageSize,
351                     txh);
352         }
353
354         public RowIterator(Token minimum, Token maximum, SliceQuery sliceQuery, int pageSize, StoreTransaction txh) throws BackendException {
355             this.pageSize = pageSize;
356             this.sliceQuery = sliceQuery;
357             this.maximumToken = maximum;
358             this.txh = txh;
359             this.nowMillis = times.getTime().toEpochMilli();
360             this.keys = getRowsIterator(getKeySlice(minimum, maximum, sliceQuery, pageSize, nowMillis));
361         }
362
363         @Override
364         public boolean hasNext() {
365             try {
366                 return hasNextInternal();
367             } catch (BackendException e) {
368                 throw new RuntimeException(e);
369             }
370         }
371
372         @Override
373         public StaticBuffer next() {
374             ensureOpen();
375
376             if (!hasNext())
377                 throw new NoSuchElementException();
378
379             currentRow = keys.next();
380             ByteBuffer currentKey = currentRow.key.getKey().duplicate();
381
382             try {
383                 return StaticArrayBuffer.of(currentKey);
384             } finally {
385                 lastSeenKey = currentKey;
386             }
387         }
388
389         @Override
390         public void close() {
391             isClosed = true;
392         }
393
394         @Override
395         public void remove() {
396             throw new UnsupportedOperationException();
397         }
398
399         @Override
400         public RecordIterator<Entry> getEntries() {
401             ensureOpen();
402
403             if (sliceQuery == null)
404                 throw new IllegalStateException("getEntries() requires SliceQuery to be set.");
405
406             return new RecordIterator<Entry>() {
407                 final Iterator<Entry> columns = CassandraHelper.makeEntryIterator(
408                         Iterables.filter(currentRow.cf.getSortedColumns(), new FilterDeletedColumns(nowMillis)),
409                         entryGetter,
410                         sliceQuery.getSliceEnd(),
411                         sliceQuery.getLimit());
412
413                  //cfToEntries(currentRow.cf, sliceQuery).iterator();
414
415                 @Override
416                 public boolean hasNext() {
417                     ensureOpen();
418                     return columns.hasNext();
419                 }
420
421                 @Override
422                 public Entry next() {
423                     ensureOpen();
424                     return columns.next();
425                 }
426
427                 @Override
428                 public void close() {
429                     isClosed = true;
430                 }
431
432                 @Override
433                 public void remove() {
434                     throw new UnsupportedOperationException();
435                 }
436             };
437
438         }
439
440         private final boolean hasNextInternal() throws BackendException {
441             ensureOpen();
442
443             if (keys == null)
444                 return false;
445
446             boolean hasNext = keys.hasNext();
447
448             if (!hasNext && lastSeenKey != null) {
449                 Token lastSeenToken = StorageService.getPartitioner().getToken(lastSeenKey.duplicate());
450
451                 // let's check if we reached key upper bound already so we can skip one useless call to Cassandra
452                 if (maximumToken != getMinimumToken() && lastSeenToken.equals(maximumToken)) {
453                     return false;
454                 }
455
456                 List<Row> newKeys = getKeySlice(StorageService.getPartitioner().getToken(lastSeenKey), maximumToken, sliceQuery, pageSize, nowMillis);
457
458                 keys = getRowsIterator(newKeys, lastSeenKey);
459                 hasNext = keys.hasNext();
460             }
461
462             return hasNext;
463         }
464
465         private void ensureOpen() {
466             if (isClosed)
467                 throw new IllegalStateException("Iterator has been closed.");
468         }
469
470         private Iterator<Row> getRowsIterator(List<Row> rows) {
471             if (rows == null)
472                 return null;
473
474             return Iterators.filter(rows.iterator(), new Predicate<Row>() {
475                 @Override
476                 public boolean apply(@Nullable Row row) {
477                     // The hasOnlyTombstones(x) call below ultimately calls Column.isMarkedForDelete(x)
478                     return !(row == null || row.cf == null || row.cf.isMarkedForDelete() || row.cf.hasOnlyTombstones(nowMillis));
479                 }
480             });
481         }
482
483         private Iterator<Row> getRowsIterator(List<Row> rows, final ByteBuffer exceptKey) {
484             Iterator<Row> rowIterator = getRowsIterator(rows);
485
486             if (rowIterator == null)
487                 return null;
488
489             return Iterators.filter(rowIterator, new Predicate<Row>() {
490                 @Override
491                 public boolean apply(@Nullable Row row) {
492                     return row != null && !row.key.getKey().equals(exceptKey);
493                 }
494             });
495         }
496     }
497
498     private static Token getMinimumToken() throws PermanentBackendException {
499         IPartitioner partitioner = StorageService.getPartitioner();
500
501         if (partitioner instanceof RandomPartitioner) {
502             return ((RandomPartitioner) partitioner).getMinimumToken();
503         } else if (partitioner instanceof Murmur3Partitioner) {
504             return ((Murmur3Partitioner) partitioner).getMinimumToken();
505         } else if (partitioner instanceof ByteOrderedPartitioner) {
506             //TODO: This makes the assumption that its an EdgeStore (i.e. 8 byte keys)
507             return new BytesToken(com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil.zeroByteBuffer(8));
508         } else {
509             throw new PermanentBackendException("Unsupported partitioner: " + partitioner);
510         }
511     }
512
513     private static Token getMaximumToken() throws PermanentBackendException {
514         IPartitioner partitioner = StorageService.getPartitioner();
515
516         if (partitioner instanceof RandomPartitioner) {
517             return new BigIntegerToken(RandomPartitioner.MAXIMUM);
518         } else if (partitioner instanceof Murmur3Partitioner) {
519             return new LongToken(Murmur3Partitioner.MAXIMUM);
520         } else if (partitioner instanceof ByteOrderedPartitioner) {
521             //TODO: This makes the assumption that its an EdgeStore (i.e. 8 byte keys)
522             return new BytesToken(com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil.oneByteBuffer(8));
523         } else {
524             throw new PermanentBackendException("Unsupported partitioner: " + partitioner);
525         }
526     }
527 }