1 package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
3 import com.google.common.base.Preconditions;
4 import com.google.common.base.Predicate;
5 import com.google.common.collect.ImmutableList;
6 import com.google.common.collect.ImmutableMap;
7 import com.google.common.collect.Iterables;
8 import com.google.common.collect.Iterators;
9 import com.thinkaurelius.titan.diskstorage.*;
10 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
11 import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool;
12 import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
13 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
14 import com.thinkaurelius.titan.diskstorage.util.*;
15 import org.apache.cassandra.dht.*;
16 import org.apache.cassandra.thrift.*;
17 import org.apache.cassandra.thrift.ConsistencyLevel;
18 import org.apache.commons.lang.ArrayUtils;
19 import org.apache.thrift.TException;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
23 import javax.annotation.Nullable;
24 import java.nio.ByteBuffer;
26 import java.util.regex.Matcher;
27 import java.util.regex.Pattern;
29 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
32 * A Titan {@code KeyColumnValueStore} backed by Cassandra.
33 * This uses the Cassandra Thrift API.
35 * @author Dan LaRocque <dalaro@hopcount.org>
36 * @see CassandraThriftStoreManager
38 public class CassandraThriftKeyColumnValueStore implements KeyColumnValueStore {
40 private static final Logger logger =
41 LoggerFactory.getLogger(CassandraThriftKeyColumnValueStore.class);
43 private static final Pattern BROKEN_BYTE_TOKEN_PATTERN = Pattern.compile("^Token\\(bytes\\[(.+)\\]\\)$");
46 private final CassandraThriftStoreManager storeManager;
47 private final String keyspace;
48 private final String columnFamily;
49 private final CTConnectionPool pool;
50 private final ThriftGetter entryGetter;
52 public CassandraThriftKeyColumnValueStore(String keyspace, String columnFamily, CassandraThriftStoreManager storeManager,
53 CTConnectionPool pool) {
54 this.storeManager = storeManager;
55 this.keyspace = keyspace;
56 this.columnFamily = columnFamily;
58 this.entryGetter = new ThriftGetter(storeManager.getMetaDataSchema(columnFamily));
62 * Call Cassandra's Thrift get_slice() method.
64 * When columnEnd equals columnStart and either startInclusive
65 * or endInclusive is false (or both are false), then this
66 * method returns an empty list without making any Thrift calls.
68 * If columnEnd = columnStart + 1, and both startInclusive and
69 * startExclusive are false, then the arguments effectively form
70 * an empty interval. In this case, as in the one previous,
71 * an empty list is returned. However, it may not necessarily
72 * be handled efficiently; a Thrift call might still be made
73 * before returning the empty list.
75 * @throws com.thinkaurelius.titan.diskstorage.BackendException
76 * when columnEnd < columnStart
79 public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
80 Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh);
81 return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
85 public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
86 return getNamesSlice(keys, query, txh);
89 public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key,
90 SliceQuery query, StoreTransaction txh) throws BackendException {
91 return getNamesSlice(ImmutableList.of(key),query,txh);
94 public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys,
96 StoreTransaction txh) throws BackendException {
97 ColumnParent parent = new ColumnParent(columnFamily);
99 * Cassandra cannot handle columnStart = columnEnd.
100 * Cassandra's Thrift getSlice() throws InvalidRequestException
101 * if columnStart = columnEnd.
103 if (query.getSliceStart().compareTo(query.getSliceEnd()) >= 0) {
104 // Check for invalid arguments where columnEnd < columnStart
105 if (query.getSliceEnd().compareTo(query.getSliceStart())<0) {
106 throw new PermanentBackendException("columnStart=" + query.getSliceStart() +
107 " is greater than columnEnd=" + query.getSliceEnd() + ". " +
108 "columnStart must be less than or equal to columnEnd");
110 if (0 != query.getSliceStart().length() && 0 != query.getSliceEnd().length()) {
111 logger.debug("Return empty list due to columnEnd==columnStart and neither empty");
112 return KCVSUtil.emptyResults(keys);
116 assert query.getSliceStart().compareTo(query.getSliceEnd()) < 0;
117 ConsistencyLevel consistency = getTx(txh).getReadConsistencyLevel().getThrift();
118 SlicePredicate predicate = new SlicePredicate();
119 SliceRange range = new SliceRange();
120 range.setCount(query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column
121 range.setStart(query.getSliceStart().asByteBuffer());
122 range.setFinish(query.getSliceEnd().asByteBuffer());
123 predicate.setSlice_range(range);
125 CTConnection conn = null;
127 conn = pool.borrowObject(keyspace);
128 Cassandra.Client client = conn.getClient();
129 Map<ByteBuffer, List<ColumnOrSuperColumn>> rows = client.multiget_slice(CassandraHelper.convert(keys),
135 * The final size of the "result" List may be at most rows.size().
136 * However, "result" could also be up to two elements smaller than
137 * rows.size(), depending on startInclusive and endInclusive
139 Map<StaticBuffer, EntryList> results = new HashMap<StaticBuffer, EntryList>();
141 for (ByteBuffer key : rows.keySet()) {
142 results.put(StaticArrayBuffer.of(key),
143 CassandraHelper.makeEntryList(rows.get(key), entryGetter, query.getSliceEnd(), query.getLimit()));
147 } catch (Exception e) {
148 throw convertException(e);
150 pool.returnObjectUnsafe(keyspace, conn);
154 private static class ThriftGetter implements StaticArrayEntry.GetColVal<ColumnOrSuperColumn,ByteBuffer> {
156 private final EntryMetaData[] schema;
158 private ThriftGetter(EntryMetaData[] schema) {
159 this.schema = schema;
163 public ByteBuffer getColumn(ColumnOrSuperColumn element) {
164 return element.getColumn().bufferForName();
168 public ByteBuffer getValue(ColumnOrSuperColumn element) {
169 return element.getColumn().bufferForValue();
173 public EntryMetaData[] getMetaSchema(ColumnOrSuperColumn element) {
178 public Object getMetaData(ColumnOrSuperColumn element, EntryMetaData meta) {
181 return element.getColumn().getTimestamp();
183 return element.getColumn().getTtl();
185 throw new UnsupportedOperationException("Unsupported meta data: " + meta);
191 public void close() {
196 public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue,
197 StoreTransaction txh) throws BackendException {
198 throw new UnsupportedOperationException();
202 public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException {
203 final IPartitioner partitioner = storeManager.getCassandraPartitioner();
205 if (!(partitioner instanceof RandomPartitioner) && !(partitioner instanceof Murmur3Partitioner))
206 throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used.");
209 return new AllTokensIterator(partitioner, sliceQuery, storeManager.getPageSize());
210 } catch (Exception e) {
211 throw convertException(e);
216 public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException {
217 final IPartitioner partitioner = storeManager.getCassandraPartitioner();
219 // see rant about the reason of this limitation in Astyanax implementation of this method.
220 if (!(partitioner instanceof AbstractByteOrderedPartitioner))
221 throw new PermanentBackendException("This operation is only allowed when byte-ordered partitioner is used.");
224 return new KeyRangeIterator(partitioner, keyRangeQuery, storeManager.getPageSize(),
225 keyRangeQuery.getKeyStart().asByteBuffer(),
226 keyRangeQuery.getKeyEnd().asByteBuffer());
227 } catch (Exception e) {
228 throw convertException(e);
233 public String getName() {
238 public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
239 Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
240 mutateMany(mutations, txh);
243 public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
244 storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh);
247 static BackendException convertException(Throwable e) {
248 if (e instanceof TException) {
249 return new PermanentBackendException(e);
250 } else if (e instanceof TimedOutException) {
251 return new TemporaryBackendException(e);
252 } else if (e instanceof UnavailableException) {
253 return new TemporaryBackendException(e);
254 } else if (e instanceof InvalidRequestException) {
255 return new PermanentBackendException(e);
257 return new PermanentBackendException(e);
262 public String toString() {
263 return "CassandraThriftKeyColumnValueStore[ks="
264 + keyspace + ", cf=" + columnFamily + "]";
268 private List<KeySlice> getKeySlice(ByteBuffer startKey,
270 SliceQuery columnSlice,
271 int count) throws BackendException {
272 return getRangeSlices(new org.apache.cassandra.thrift.KeyRange().setStart_key(startKey).setEnd_key(endKey).setCount(count), columnSlice);
275 private <T extends Token> List<KeySlice> getTokenSlice(T startToken, T endToken,
276 SliceQuery sliceQuery, int count) throws BackendException {
278 String st = sanitizeBrokenByteToken(startToken);
279 String et = sanitizeBrokenByteToken(endToken);
281 org.apache.cassandra.thrift.KeyRange kr = new org.apache.cassandra.thrift.KeyRange().setStart_token(st).setEnd_token(et).setCount(count);
283 return getRangeSlices(kr, sliceQuery);
286 private String sanitizeBrokenByteToken(Token tok) {
288 * Background: https://issues.apache.org/jira/browse/CASSANDRA-5566
290 * This check is useful for compatibility with Cassandra server versions
293 String st = tok.toString();
294 if (!(tok instanceof BytesToken))
297 // Do a cheap 1-character startsWith before unleashing the regex
298 if (st.startsWith("T")) {
299 Matcher m = BROKEN_BYTE_TOKEN_PATTERN.matcher(st);
301 logger.warn("Unknown token string format: \"{}\"", st);
305 logger.debug("Rewrote token string: \"{}\" -> \"{}\"", old, st);
311 private List<KeySlice> getRangeSlices(org.apache.cassandra.thrift.KeyRange keyRange, @Nullable SliceQuery sliceQuery) throws BackendException {
312 SliceRange sliceRange = new SliceRange();
314 if (sliceQuery == null) {
315 sliceRange.setStart(ArrayUtils.EMPTY_BYTE_ARRAY)
316 .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY)
319 sliceRange.setStart(sliceQuery.getSliceStart().asByteBuffer())
320 .setFinish(sliceQuery.getSliceEnd().asByteBuffer())
321 .setCount((sliceQuery.hasLimit()) ? sliceQuery.getLimit() : Integer.MAX_VALUE);
325 CTConnection connection = null;
327 connection = pool.borrowObject(keyspace);
329 List<KeySlice> slices =
330 connection.getClient().get_range_slices(new ColumnParent(columnFamily),
332 .setSlice_range(sliceRange),
334 ConsistencyLevel.QUORUM);
336 for (KeySlice s : slices) {
337 logger.debug("Key {}", ByteBufferUtil.toString(s.key, "-"));
340 /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
341 List<KeySlice> result = new ArrayList<>(slices.size());
342 KeyIterationPredicate pred = new KeyIterationPredicate();
343 for (KeySlice ks : slices)
347 } catch (Exception e) {
348 throw convertException(e);
350 if (connection != null)
351 pool.returnObjectUnsafe(keyspace, connection);
355 private static class KeyIterationPredicate implements Predicate<KeySlice> {
358 public boolean apply(@Nullable KeySlice row) {
359 return (row != null) && row.getColumns().size() > 0;
364 * Slices rows and columns using tokens. Recall that the partitioner turns
365 * keys into tokens. For instance, under RandomPartitioner, tokens are the
366 * MD5 hashes of keys.
368 public class AbstractBufferedRowIter implements KeyIterator {
370 private final int pageSize;
371 private final SliceQuery columnSlice;
373 private boolean isClosed;
374 private boolean seenEnd;
375 protected Iterator<KeySlice> ksIter;
376 private KeySlice mostRecentRow;
378 private final IPartitioner partitioner;
379 private Token nextStartToken;
380 private final Token endToken;
381 private ByteBuffer nextStartKey;
383 private boolean omitEndToken;
385 public AbstractBufferedRowIter(IPartitioner partitioner,
386 SliceQuery columnSlice, int pageSize, Token startToken, Token endToken, boolean omitEndToken) {
387 this.pageSize = pageSize;
388 this.partitioner = partitioner;
389 this.nextStartToken = startToken;
390 this.endToken = endToken;
391 this.columnSlice = columnSlice;
393 this.seenEnd = false;
394 this.isClosed = false;
395 this.ksIter = Iterators.emptyIterator();
396 this.mostRecentRow = null;
397 this.omitEndToken = omitEndToken;
401 public boolean hasNext() {
404 if (!ksIter.hasNext() && !seenEnd) {
406 ksIter = rebuffer().iterator();
407 } catch (BackendException e) {
408 throw new RuntimeException(e);
412 return ksIter.hasNext();
416 public StaticBuffer next() {
420 throw new NoSuchElementException();
422 mostRecentRow = ksIter.next();
424 Preconditions.checkNotNull(mostRecentRow);
425 return StaticArrayBuffer.of(mostRecentRow.bufferForKey());
429 public void close() {
434 public void remove() {
435 throw new UnsupportedOperationException();
439 public RecordIterator<Entry> getEntries() {
442 return new RecordIterator<Entry>() {
443 final Iterator<Entry> columns =
444 CassandraHelper.makeEntryIterator(mostRecentRow.getColumns(),
445 entryGetter, columnSlice.getSliceEnd(),
446 columnSlice.getLimit());
449 public boolean hasNext() {
451 return columns.hasNext();
455 public Entry next() {
457 return columns.next();
461 public void close() {
466 public void remove() {
467 throw new UnsupportedOperationException();
472 private void ensureOpen() {
474 throw new IllegalStateException("Iterator has been closed.");
477 private void closeIterator() {
483 private List<KeySlice> rebuffer() throws BackendException {
485 Preconditions.checkArgument(!seenEnd);
487 return checkFreshSlices(getNextKeySlices());
490 protected List<KeySlice> checkFreshSlices(List<KeySlice> ks) {
492 if (0 == ks.size()) {
494 return Collections.emptyList();
497 nextStartKey = ks.get(ks.size() - 1).bufferForKey();
498 nextStartToken = partitioner.getToken(nextStartKey);
500 if (nextStartToken.equals(endToken)) {
503 ks.remove(ks.size() - 1);
509 protected final List<KeySlice> getNextKeySlices() throws BackendException {
510 return getTokenSlice(nextStartToken, endToken, columnSlice, pageSize);
514 private final class AllTokensIterator extends AbstractBufferedRowIter {
515 public AllTokensIterator(IPartitioner partitioner, SliceQuery columnSlice, int pageSize) {
516 super(partitioner, columnSlice, pageSize, partitioner.getMinimumToken(), partitioner.getMinimumToken(), false);
520 private final class KeyRangeIterator extends AbstractBufferedRowIter {
521 public KeyRangeIterator(IPartitioner partitioner, SliceQuery columnSlice,
522 int pageSize, ByteBuffer startKey, ByteBuffer endKey) throws BackendException {
523 super(partitioner, columnSlice, pageSize, partitioner.getToken(startKey), partitioner.getToken(endKey), true);
525 Preconditions.checkArgument(partitioner instanceof AbstractByteOrderedPartitioner);
527 // Get first slice with key range instead of token range. Token
528 // ranges are start-exclusive, key ranges are start-inclusive. Both
529 // are end-inclusive. If we don't make the call below, then we will
530 // erroneously miss startKey.
531 List<KeySlice> ks = getKeySlice(startKey, endKey, columnSlice, pageSize);
533 this.ksIter = checkFreshSlices(ks).iterator();