first commit for new repo
[sdc/sdc-titan-cassandra.git] / src / main / java / com / thinkaurelius / titan / diskstorage / cassandra / thrift / CassandraThriftKeyColumnValueStore.java
1 package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
2
3 import com.google.common.base.Preconditions;
4 import com.google.common.base.Predicate;
5 import com.google.common.collect.ImmutableList;
6 import com.google.common.collect.ImmutableMap;
7 import com.google.common.collect.Iterables;
8 import com.google.common.collect.Iterators;
9 import com.thinkaurelius.titan.diskstorage.*;
10 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
11 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool;
12 import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
13 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
14 import com.thinkaurelius.titan.diskstorage.util.*;
15 import org.apache.cassandra.dht.*;
16 import org.apache.cassandra.thrift.*;
17 import org.apache.cassandra.thrift.ConsistencyLevel;
18 import org.apache.commons.lang.ArrayUtils;
19 import org.apache.thrift.TException;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import javax.annotation.Nullable;
24 import java.nio.ByteBuffer;
25 import java.util.*;
26 import java.util.regex.Matcher;
27 import java.util.regex.Pattern;
28
29 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
30
31 /**
32  * A Titan {@code KeyColumnValueStore} backed by Cassandra.
33  * This uses the Cassandra Thrift API.
34  *
35  * @author Dan LaRocque <dalaro@hopcount.org>
36  * @see CassandraThriftStoreManager
37  */
38 public class CassandraThriftKeyColumnValueStore implements KeyColumnValueStore {
39
40     private static final Logger logger =
41             LoggerFactory.getLogger(CassandraThriftKeyColumnValueStore.class);
42
43     private static final Pattern BROKEN_BYTE_TOKEN_PATTERN = Pattern.compile("^Token\\(bytes\\[(.+)\\]\\)$");
44
45     // Cassandra access
46     private final CassandraThriftStoreManager storeManager;
47     private final String keyspace;
48     private final String columnFamily;
49     private final CTConnectionPool pool;
50     private final ThriftGetter entryGetter;
51
52     public CassandraThriftKeyColumnValueStore(String keyspace, String columnFamily, CassandraThriftStoreManager storeManager,
53                                               CTConnectionPool pool) {
54         this.storeManager = storeManager;
55         this.keyspace = keyspace;
56         this.columnFamily = columnFamily;
57         this.pool = pool;
58         this.entryGetter = new ThriftGetter(storeManager.getMetaDataSchema(columnFamily));
59     }
60
61     /**
62      * Call Cassandra's Thrift get_slice() method.
63      * <p/>
64      * When columnEnd equals columnStart and either startInclusive
65      * or endInclusive is false (or both are false), then this
66      * method returns an empty list without making any Thrift calls.
67      * <p/>
68      * If columnEnd = columnStart + 1, and both startInclusive and
69      * startExclusive are false, then the arguments effectively form
70      * an empty interval.  In this case, as in the one previous,
71      * an empty list is returned.  However, it may not necessarily
72      * be handled efficiently; a Thrift call might still be made
73      * before returning the empty list.
74      *
75      * @throws com.thinkaurelius.titan.diskstorage.BackendException
76      *          when columnEnd < columnStart
77      */
78     @Override
79     public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
80         Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh);
81         return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
82     }
83
84     @Override
85     public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
86         return getNamesSlice(keys, query, txh);
87     }
88
89     public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key,
90                                                       SliceQuery query, StoreTransaction txh) throws BackendException {
91         return getNamesSlice(ImmutableList.of(key),query,txh);
92     }
93
94     public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys,
95                                                       SliceQuery query,
96                                                       StoreTransaction txh) throws BackendException {
97         ColumnParent parent = new ColumnParent(columnFamily);
98         /*
99          * Cassandra cannot handle columnStart = columnEnd.
100                  * Cassandra's Thrift getSlice() throws InvalidRequestException
101                  * if columnStart = columnEnd.
102                  */
103         if (query.getSliceStart().compareTo(query.getSliceEnd()) >= 0) {
104             // Check for invalid arguments where columnEnd < columnStart
105             if (query.getSliceEnd().compareTo(query.getSliceStart())<0) {
106                 throw new PermanentBackendException("columnStart=" + query.getSliceStart() +
107                         " is greater than columnEnd=" + query.getSliceEnd() + ". " +
108                         "columnStart must be less than or equal to columnEnd");
109             }
110             if (0 != query.getSliceStart().length() && 0 != query.getSliceEnd().length()) {
111                 logger.debug("Return empty list due to columnEnd==columnStart and neither empty");
112                 return KCVSUtil.emptyResults(keys);
113             }
114         }
115
116         assert query.getSliceStart().compareTo(query.getSliceEnd()) < 0;
117         ConsistencyLevel consistency = getTx(txh).getReadConsistencyLevel().getThrift();
118         SlicePredicate predicate = new SlicePredicate();
119         SliceRange range = new SliceRange();
120         range.setCount(query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column
121         range.setStart(query.getSliceStart().asByteBuffer());
122         range.setFinish(query.getSliceEnd().asByteBuffer());
123         predicate.setSlice_range(range);
124
125         CTConnection conn = null;
126         try {
127             conn = pool.borrowObject(keyspace);
128             Cassandra.Client client = conn.getClient();
129             Map<ByteBuffer, List<ColumnOrSuperColumn>> rows = client.multiget_slice(CassandraHelper.convert(keys),
130                     parent,
131                     predicate,
132                     consistency);
133
134                         /*
135                          * The final size of the "result" List may be at most rows.size().
136                          * However, "result" could also be up to two elements smaller than
137                          * rows.size(), depending on startInclusive and endInclusive
138                          */
139             Map<StaticBuffer, EntryList> results = new HashMap<StaticBuffer, EntryList>();
140
141             for (ByteBuffer key : rows.keySet()) {
142                 results.put(StaticArrayBuffer.of(key),
143                         CassandraHelper.makeEntryList(rows.get(key), entryGetter, query.getSliceEnd(), query.getLimit()));
144             }
145
146             return results;
147         } catch (Exception e) {
148             throw convertException(e);
149         } finally {
150             pool.returnObjectUnsafe(keyspace, conn);
151         }
152     }
153
154     private static class ThriftGetter implements StaticArrayEntry.GetColVal<ColumnOrSuperColumn,ByteBuffer> {
155
156         private final EntryMetaData[] schema;
157
158         private ThriftGetter(EntryMetaData[] schema) {
159             this.schema = schema;
160         }
161
162         @Override
163         public ByteBuffer getColumn(ColumnOrSuperColumn element) {
164             return element.getColumn().bufferForName();
165         }
166
167         @Override
168         public ByteBuffer getValue(ColumnOrSuperColumn element) {
169             return element.getColumn().bufferForValue();
170         }
171
172         @Override
173         public EntryMetaData[] getMetaSchema(ColumnOrSuperColumn element) {
174             return schema;
175         }
176
177         @Override
178         public Object getMetaData(ColumnOrSuperColumn element, EntryMetaData meta) {
179             switch(meta) {
180                 case TIMESTAMP:
181                     return element.getColumn().getTimestamp();
182                 case TTL:
183                     return element.getColumn().getTtl();
184                 default:
185                     throw new UnsupportedOperationException("Unsupported meta data: " + meta);
186             }
187         }
188     }
189
190     @Override
191     public void close() {
192         // Do nothing
193     }
194
195     @Override
196     public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue,
197                             StoreTransaction txh) throws BackendException {
198         throw new UnsupportedOperationException();
199     }
200
201     @Override
202     public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException {
203         final IPartitioner partitioner = storeManager.getCassandraPartitioner();
204
205         if (!(partitioner instanceof RandomPartitioner) && !(partitioner instanceof Murmur3Partitioner))
206             throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used.");
207
208         try {
209             return new AllTokensIterator(partitioner, sliceQuery, storeManager.getPageSize());
210         } catch (Exception e) {
211             throw convertException(e);
212         }
213     }
214
215     @Override
216     public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException {
217         final IPartitioner partitioner = storeManager.getCassandraPartitioner();
218
219         // see rant about the reason of this limitation in Astyanax implementation of this method.
220         if (!(partitioner instanceof AbstractByteOrderedPartitioner))
221             throw new PermanentBackendException("This operation is only allowed when byte-ordered partitioner is used.");
222
223         try {
224             return new KeyRangeIterator(partitioner, keyRangeQuery, storeManager.getPageSize(),
225                     keyRangeQuery.getKeyStart().asByteBuffer(),
226                     keyRangeQuery.getKeyEnd().asByteBuffer());
227         } catch (Exception e) {
228             throw convertException(e);
229         }
230     }
231
232     @Override
233     public String getName() {
234         return columnFamily;
235     }
236
237     @Override
238     public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
239         Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
240         mutateMany(mutations, txh);
241     }
242
243     public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
244         storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh);
245     }
246
247     static BackendException convertException(Throwable e) {
248         if (e instanceof TException) {
249             return new PermanentBackendException(e);
250         } else if (e instanceof TimedOutException) {
251             return new TemporaryBackendException(e);
252         } else if (e instanceof UnavailableException) {
253             return new TemporaryBackendException(e);
254         } else if (e instanceof InvalidRequestException) {
255             return new PermanentBackendException(e);
256         } else {
257             return new PermanentBackendException(e);
258         }
259     }
260
261     @Override
262     public String toString() {
263         return "CassandraThriftKeyColumnValueStore[ks="
264                 + keyspace + ", cf=" + columnFamily + "]";
265     }
266
267
268     private List<KeySlice> getKeySlice(ByteBuffer startKey,
269                                        ByteBuffer endKey,
270                                        SliceQuery columnSlice,
271                                        int count) throws BackendException {
272         return getRangeSlices(new org.apache.cassandra.thrift.KeyRange().setStart_key(startKey).setEnd_key(endKey).setCount(count), columnSlice);
273     }
274
275     private <T extends Token> List<KeySlice> getTokenSlice(T startToken, T endToken,
276             SliceQuery sliceQuery, int count) throws BackendException {
277
278         String st = sanitizeBrokenByteToken(startToken);
279         String et = sanitizeBrokenByteToken(endToken);
280
281         org.apache.cassandra.thrift.KeyRange kr = new org.apache.cassandra.thrift.KeyRange().setStart_token(st).setEnd_token(et).setCount(count);
282
283         return getRangeSlices(kr, sliceQuery);
284     }
285
286     private String sanitizeBrokenByteToken(Token tok) {
287         /*
288          * Background: https://issues.apache.org/jira/browse/CASSANDRA-5566
289          *
290          * This check is useful for compatibility with Cassandra server versions
291          * 1.2.4 and earlier.
292          */
293         String st = tok.toString();
294         if (!(tok instanceof BytesToken))
295             return st;
296
297         // Do a cheap 1-character startsWith before unleashing the regex
298         if (st.startsWith("T")) {
299             Matcher m = BROKEN_BYTE_TOKEN_PATTERN.matcher(st);
300             if (!m.matches()) {
301                 logger.warn("Unknown token string format: \"{}\"", st);
302             } else {
303                 String old = st;
304                 st = m.group(1);
305                 logger.debug("Rewrote token string: \"{}\" -> \"{}\"", old, st);
306             }
307         }
308         return st;
309     }
310
311     private List<KeySlice> getRangeSlices(org.apache.cassandra.thrift.KeyRange keyRange, @Nullable SliceQuery sliceQuery) throws BackendException {
312         SliceRange sliceRange = new SliceRange();
313
314         if (sliceQuery == null) {
315             sliceRange.setStart(ArrayUtils.EMPTY_BYTE_ARRAY)
316                     .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY)
317                     .setCount(5);
318         } else {
319             sliceRange.setStart(sliceQuery.getSliceStart().asByteBuffer())
320                     .setFinish(sliceQuery.getSliceEnd().asByteBuffer())
321                     .setCount((sliceQuery.hasLimit()) ? sliceQuery.getLimit() : Integer.MAX_VALUE);
322         }
323
324
325         CTConnection connection = null;
326         try {
327             connection = pool.borrowObject(keyspace);
328
329             List<KeySlice> slices =
330                     connection.getClient().get_range_slices(new ColumnParent(columnFamily),
331                             new SlicePredicate()
332                                     .setSlice_range(sliceRange),
333                             keyRange,
334                             ConsistencyLevel.QUORUM);
335
336             for (KeySlice s : slices) {
337                 logger.debug("Key {}", ByteBufferUtil.toString(s.key, "-"));
338             }
339
340             /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
341             List<KeySlice> result = new ArrayList<>(slices.size());
342             KeyIterationPredicate pred = new KeyIterationPredicate();
343             for (KeySlice ks : slices)
344                 if (pred.apply(ks))
345                     result.add(ks);
346             return result;
347         } catch (Exception e) {
348             throw convertException(e);
349         } finally {
350             if (connection != null)
351                 pool.returnObjectUnsafe(keyspace, connection);
352         }
353     }
354
355     private static class KeyIterationPredicate implements Predicate<KeySlice> {
356
357         @Override
358         public boolean apply(@Nullable KeySlice row) {
359             return (row != null) && row.getColumns().size() > 0;
360         }
361     }
362
363     /**
364      * Slices rows and columns using tokens. Recall that the partitioner turns
365      * keys into tokens. For instance, under RandomPartitioner, tokens are the
366      * MD5 hashes of keys.
367      */
368     public class AbstractBufferedRowIter implements KeyIterator {
369
370         private final int pageSize;
371         private final SliceQuery columnSlice;
372
373         private boolean isClosed;
374         private boolean seenEnd;
375         protected Iterator<KeySlice> ksIter;
376         private KeySlice mostRecentRow;
377
378         private final IPartitioner partitioner;
379         private Token nextStartToken;
380         private final Token endToken;
381         private ByteBuffer nextStartKey;
382
383         private boolean omitEndToken;
384
385         public AbstractBufferedRowIter(IPartitioner partitioner,
386                 SliceQuery columnSlice, int pageSize, Token startToken, Token endToken, boolean omitEndToken) {
387             this.pageSize = pageSize;
388             this.partitioner = partitioner;
389             this.nextStartToken = startToken;
390             this.endToken = endToken;
391             this.columnSlice = columnSlice;
392
393             this.seenEnd = false;
394             this.isClosed = false;
395             this.ksIter = Iterators.emptyIterator();
396             this.mostRecentRow = null;
397             this.omitEndToken = omitEndToken;
398         }
399
400         @Override
401         public boolean hasNext() {
402             ensureOpen();
403
404             if (!ksIter.hasNext() && !seenEnd) {
405                 try {
406                     ksIter = rebuffer().iterator();
407                 } catch (BackendException e) {
408                     throw new RuntimeException(e);
409                 }
410             }
411
412             return ksIter.hasNext();
413         }
414
415         @Override
416         public StaticBuffer next() {
417             ensureOpen();
418
419             if (!hasNext())
420                 throw new NoSuchElementException();
421
422             mostRecentRow = ksIter.next();
423
424             Preconditions.checkNotNull(mostRecentRow);
425             return StaticArrayBuffer.of(mostRecentRow.bufferForKey());
426         }
427
428         @Override
429         public void close() {
430             closeIterator();
431         }
432
433         @Override
434         public void remove() {
435             throw new UnsupportedOperationException();
436         }
437
438         @Override
439         public RecordIterator<Entry> getEntries() {
440             ensureOpen();
441
442             return new RecordIterator<Entry>() {
443                 final Iterator<Entry> columns =
444                         CassandraHelper.makeEntryIterator(mostRecentRow.getColumns(),
445                                 entryGetter, columnSlice.getSliceEnd(),
446                                 columnSlice.getLimit());
447
448                 @Override
449                 public boolean hasNext() {
450                     ensureOpen();
451                     return columns.hasNext();
452                 }
453
454                 @Override
455                 public Entry next() {
456                     ensureOpen();
457                     return columns.next();
458                 }
459
460                 @Override
461                 public void close() {
462                     closeIterator();
463                 }
464
465                 @Override
466                 public void remove() {
467                     throw new UnsupportedOperationException();
468                 }
469             };
470         }
471
472         private void ensureOpen() {
473             if (isClosed)
474                 throw new IllegalStateException("Iterator has been closed.");
475         }
476
477         private void closeIterator() {
478             if (!isClosed) {
479                 isClosed = true;
480             }
481         }
482
483         private List<KeySlice> rebuffer() throws BackendException {
484
485             Preconditions.checkArgument(!seenEnd);
486
487             return checkFreshSlices(getNextKeySlices());
488         }
489
490         protected List<KeySlice> checkFreshSlices(List<KeySlice> ks) {
491
492             if (0 == ks.size()) {
493                 seenEnd = true;
494                 return Collections.emptyList();
495             }
496
497             nextStartKey = ks.get(ks.size() - 1).bufferForKey();
498             nextStartToken = partitioner.getToken(nextStartKey);
499
500             if (nextStartToken.equals(endToken)) {
501                 seenEnd = true;
502                 if (omitEndToken)
503                     ks.remove(ks.size() - 1);
504             }
505
506             return ks;
507         }
508
509         protected final List<KeySlice> getNextKeySlices() throws BackendException {
510             return getTokenSlice(nextStartToken, endToken, columnSlice, pageSize);
511         }
512     }
513
514     private final class AllTokensIterator extends AbstractBufferedRowIter {
515         public AllTokensIterator(IPartitioner partitioner, SliceQuery columnSlice, int pageSize) {
516             super(partitioner, columnSlice, pageSize, partitioner.getMinimumToken(), partitioner.getMinimumToken(), false);
517         }
518     }
519
520     private final class KeyRangeIterator extends AbstractBufferedRowIter {
521         public KeyRangeIterator(IPartitioner partitioner, SliceQuery columnSlice,
522                 int pageSize, ByteBuffer startKey, ByteBuffer endKey) throws BackendException {
523             super(partitioner, columnSlice, pageSize, partitioner.getToken(startKey), partitioner.getToken(endKey), true);
524
525             Preconditions.checkArgument(partitioner instanceof AbstractByteOrderedPartitioner);
526
527             // Get first slice with key range instead of token range. Token
528             // ranges are start-exclusive, key ranges are start-inclusive. Both
529             // are end-inclusive. If we don't make the call below, then we will
530             // erroneously miss startKey.
531             List<KeySlice> ks = getKeySlice(startKey, endKey, columnSlice, pageSize);
532
533             this.ksIter = checkFreshSlices(ks).iterator();
534         }
535     }
536 }