1 package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
3 import com.google.common.base.Predicate;
4 import com.google.common.collect.*;
5 import com.netflix.astyanax.ExceptionCallback;
6 import com.netflix.astyanax.Keyspace;
7 import com.netflix.astyanax.connectionpool.OperationResult;
8 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
9 import com.netflix.astyanax.model.*;
10 import com.netflix.astyanax.query.AllRowsQuery;
11 import com.netflix.astyanax.query.RowSliceQuery;
12 import com.netflix.astyanax.retry.RetryPolicy;
13 import com.netflix.astyanax.serializers.ByteBufferSerializer;
14 import com.thinkaurelius.titan.diskstorage.*;
15 import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
16 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
17 import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
18 import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
19 import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
21 import javax.annotation.Nullable;
22 import java.nio.ByteBuffer;
25 import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.Partitioner;
26 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
28 public class AstyanaxKeyColumnValueStore implements KeyColumnValueStore {
30 private final Keyspace keyspace;
31 private final String columnFamilyName;
32 private final ColumnFamily<ByteBuffer, ByteBuffer> columnFamily;
33 private final RetryPolicy retryPolicy;
34 private final AstyanaxStoreManager storeManager;
35 private final AstyanaxGetter entryGetter;
37 AstyanaxKeyColumnValueStore(String columnFamilyName,
39 AstyanaxStoreManager storeManager,
40 RetryPolicy retryPolicy) {
41 this.keyspace = keyspace;
42 this.columnFamilyName = columnFamilyName;
43 this.retryPolicy = retryPolicy;
44 this.storeManager = storeManager;
46 entryGetter = new AstyanaxGetter(storeManager.getMetaDataSchema(columnFamilyName));
48 columnFamily = new ColumnFamily<ByteBuffer, ByteBuffer>(
49 this.columnFamilyName,
50 ByteBufferSerializer.get(),
51 ByteBufferSerializer.get());
56 ColumnFamily<ByteBuffer, ByteBuffer> getColumnFamily() {
61 public void close() throws BackendException {
66 public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
67 Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh);
68 return Iterables.getOnlyElement(result.values(),EntryList.EMPTY_LIST);
72 public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
73 return getNamesSlice(keys, query, txh);
76 public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key,
77 SliceQuery query, StoreTransaction txh) throws BackendException {
78 return getNamesSlice(ImmutableList.of(key),query,txh);
82 public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys,
83 SliceQuery query, StoreTransaction txh) throws BackendException {
85 * RowQuery<K,C> should be parameterized as
86 * RowQuery<ByteBuffer,ByteBuffer>. However, this causes the following
87 * compilation error when attempting to call withColumnRange on a
88 * RowQuery<ByteBuffer,ByteBuffer> instance:
90 * java.lang.Error: Unresolved compilation problem: The method
91 * withColumnRange(ByteBuffer, ByteBuffer, boolean, int) is ambiguous
92 * for the type RowQuery<ByteBuffer,ByteBuffer>
94 * The compiler substitutes ByteBuffer=C for both startColumn and
95 * endColumn, compares it to its identical twin with that type
96 * hard-coded, and dies.
99 RowSliceQuery rq = keyspace.prepareQuery(columnFamily)
100 .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax())
101 .withRetryPolicy(retryPolicy.duplicate())
102 .getKeySlice(CassandraHelper.convert(keys));
104 // Thank you, Astyanax, for making builder pattern useful :(
105 rq.withColumnRange(query.getSliceStart().asByteBuffer(),
106 query.getSliceEnd().asByteBuffer(),
108 query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column
110 OperationResult<Rows<ByteBuffer, ByteBuffer>> r;
112 r = (OperationResult<Rows<ByteBuffer, ByteBuffer>>) rq.execute();
113 } catch (ConnectionException e) {
114 throw new TemporaryBackendException(e);
117 Rows<ByteBuffer,ByteBuffer> rows = r.getResult();
118 Map<StaticBuffer, EntryList> result = new HashMap<StaticBuffer, EntryList>(rows.size());
120 for (Row<ByteBuffer, ByteBuffer> row : rows) {
121 assert !result.containsKey(row.getKey());
122 result.put(StaticArrayBuffer.of(row.getKey()),
123 CassandraHelper.makeEntryList(row.getColumns(),entryGetter, query.getSliceEnd(), query.getLimit()));
129 private static class AstyanaxGetter implements StaticArrayEntry.GetColVal<Column<ByteBuffer>,ByteBuffer> {
131 private final EntryMetaData[] schema;
133 private AstyanaxGetter(EntryMetaData[] schema) {
134 this.schema = schema;
139 public ByteBuffer getColumn(Column<ByteBuffer> element) {
140 return element.getName();
144 public ByteBuffer getValue(Column<ByteBuffer> element) {
145 return element.getByteBufferValue();
149 public EntryMetaData[] getMetaSchema(Column<ByteBuffer> element) {
154 public Object getMetaData(Column<ByteBuffer> element, EntryMetaData meta) {
157 return element.getTimestamp();
159 return element.getTtl();
161 throw new UnsupportedOperationException("Unsupported meta data: " + meta);
167 public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
168 mutateMany(ImmutableMap.of(key, new KCVMutation(additions, deletions)), txh);
171 public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
172 storeManager.mutateMany(ImmutableMap.of(columnFamilyName, mutations), txh);
176 public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
177 throw new UnsupportedOperationException();
181 public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException {
182 if (storeManager.getPartitioner() != Partitioner.RANDOM)
183 throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used.");
185 AllRowsQuery allRowsQuery = keyspace.prepareQuery(columnFamily).getAllRows();
187 if (sliceQuery != null) {
188 allRowsQuery.withColumnRange(sliceQuery.getSliceStart().asByteBuffer(),
189 sliceQuery.getSliceEnd().asByteBuffer(),
191 sliceQuery.getLimit());
194 Rows<ByteBuffer, ByteBuffer> result;
196 /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
197 OperationResult op = allRowsQuery.setRowLimit(storeManager.getPageSize()) // pre-fetch that many rows at a time
198 .setConcurrencyLevel(1) // one execution thread for fetching portion of rows
199 .setExceptionCallback(new ExceptionCallback() {
200 private int retries = 0;
203 public boolean onException(ConnectionException e) {
205 return retries > 2; // make 3 re-tries
212 result = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) op).getResult();
213 } catch (ConnectionException e) {
214 throw new PermanentBackendException(e);
217 return new RowIterator(result.iterator(), sliceQuery);
221 public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
222 // this query could only be done when byte-ordering partitioner is used
223 // because Cassandra operates on tokens internally which means that even contiguous
224 // range of keys (e.g. time slice) with random partitioner could produce disjoint set of tokens
225 // returning ambiguous results to the user.
226 Partitioner partitioner = storeManager.getPartitioner();
227 if (partitioner != Partitioner.BYTEORDER)
228 throw new PermanentBackendException("getKeys(KeyRangeQuery could only be used with byte-ordering partitioner.");
230 ByteBuffer start = query.getKeyStart().asByteBuffer(), end = query.getKeyEnd().asByteBuffer();
232 RowSliceQuery rowSlice = keyspace.prepareQuery(columnFamily)
233 .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax())
234 .withRetryPolicy(retryPolicy.duplicate())
235 .getKeyRange(start, end, null, null, Integer.MAX_VALUE);
237 // Astyanax is bad at builder pattern :(
238 rowSlice.withColumnRange(query.getSliceStart().asByteBuffer(),
239 query.getSliceEnd().asByteBuffer(),
243 // Omit final the query's keyend from the result, if present in result
244 final Rows<ByteBuffer, ByteBuffer> r;
246 r = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) rowSlice.execute()).getResult();
247 } catch (ConnectionException e) {
248 throw new TemporaryBackendException(e);
250 Iterator<Row<ByteBuffer, ByteBuffer>> i =
251 Iterators.filter(r.iterator(), new KeySkipPredicate(query.getKeyEnd().asByteBuffer()));
252 return new RowIterator(i, query);
256 public String getName() {
257 return columnFamilyName;
260 private static class KeyIterationPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> {
262 public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) {
263 return (row != null) && row.getColumns().size() > 0;
267 private static class KeySkipPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> {
269 private final ByteBuffer skip;
271 public KeySkipPredicate(ByteBuffer skip) {
276 public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) {
277 return (row != null) && !row.getKey().equals(skip);
281 private class RowIterator implements KeyIterator {
282 private final Iterator<Row<ByteBuffer, ByteBuffer>> rows;
283 private Row<ByteBuffer, ByteBuffer> currentRow;
284 private final SliceQuery sliceQuery;
285 private boolean isClosed;
287 public RowIterator(Iterator<Row<ByteBuffer, ByteBuffer>> rowIter, SliceQuery sliceQuery) {
288 this.rows = Iterators.filter(rowIter, new KeyIterationPredicate());
289 this.sliceQuery = sliceQuery;
293 public RecordIterator<Entry> getEntries() {
296 if (sliceQuery == null)
297 throw new IllegalStateException("getEntries() requires SliceQuery to be set.");
299 return new RecordIterator<Entry>() {
300 private final Iterator<Entry> columns =
301 CassandraHelper.makeEntryIterator(currentRow.getColumns(),
303 sliceQuery.getSliceEnd(),sliceQuery.getLimit());
306 public boolean hasNext() {
308 return columns.hasNext();
312 public Entry next() {
314 return columns.next();
318 public void close() {
323 public void remove() {
324 throw new UnsupportedOperationException();
330 public boolean hasNext() {
332 return rows.hasNext();
336 public StaticBuffer next() {
339 currentRow = rows.next();
340 return StaticArrayBuffer.of(currentRow.getKey());
344 public void close() {
349 public void remove() {
350 throw new UnsupportedOperationException();
353 private void ensureOpen() {
355 throw new IllegalStateException("Iterator has been closed.");