first commit for new repo
[sdc/sdc-titan-cassandra.git] / src / main / java / com / thinkaurelius / titan / diskstorage / cassandra / astyanax / AstyanaxKeyColumnValueStore.java
1 package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
2
3 import com.google.common.base.Predicate;
4 import com.google.common.collect.*;
5 import com.netflix.astyanax.ExceptionCallback;
6 import com.netflix.astyanax.Keyspace;
7 import com.netflix.astyanax.connectionpool.OperationResult;
8 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
9 import com.netflix.astyanax.model.*;
10 import com.netflix.astyanax.query.AllRowsQuery;
11 import com.netflix.astyanax.query.RowSliceQuery;
12 import com.netflix.astyanax.retry.RetryPolicy;
13 import com.netflix.astyanax.serializers.ByteBufferSerializer;
14 import com.thinkaurelius.titan.diskstorage.*;
15 import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
16 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
17 import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
18 import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
19 import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
20
21 import javax.annotation.Nullable;
22 import java.nio.ByteBuffer;
23 import java.util.*;
24
25 import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.Partitioner;
26 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
27
28 public class AstyanaxKeyColumnValueStore implements KeyColumnValueStore {
29
30     private final Keyspace keyspace;
31     private final String columnFamilyName;
32     private final ColumnFamily<ByteBuffer, ByteBuffer> columnFamily;
33     private final RetryPolicy retryPolicy;
34     private final AstyanaxStoreManager storeManager;
35     private final AstyanaxGetter entryGetter;
36
37     AstyanaxKeyColumnValueStore(String columnFamilyName,
38                                 Keyspace keyspace,
39                                 AstyanaxStoreManager storeManager,
40                                 RetryPolicy retryPolicy) {
41         this.keyspace = keyspace;
42         this.columnFamilyName = columnFamilyName;
43         this.retryPolicy = retryPolicy;
44         this.storeManager = storeManager;
45
46         entryGetter = new AstyanaxGetter(storeManager.getMetaDataSchema(columnFamilyName));
47
48         columnFamily = new ColumnFamily<ByteBuffer, ByteBuffer>(
49                 this.columnFamilyName,
50                 ByteBufferSerializer.get(),
51                 ByteBufferSerializer.get());
52
53     }
54
55
56     ColumnFamily<ByteBuffer, ByteBuffer> getColumnFamily() {
57         return columnFamily;
58     }
59
60     @Override
61     public void close() throws BackendException {
62         //Do nothing
63     }
64
65     @Override
66     public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
67         Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh);
68         return Iterables.getOnlyElement(result.values(),EntryList.EMPTY_LIST);
69     }
70
71     @Override
72     public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
73         return getNamesSlice(keys, query, txh);
74     }
75
76     public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key,
77                                                     SliceQuery query, StoreTransaction txh) throws BackendException {
78         return getNamesSlice(ImmutableList.of(key),query,txh);
79     }
80
81
82     public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys,
83                                                       SliceQuery query, StoreTransaction txh) throws BackendException {
84         /*
85          * RowQuery<K,C> should be parameterized as
86          * RowQuery<ByteBuffer,ByteBuffer>. However, this causes the following
87          * compilation error when attempting to call withColumnRange on a
88          * RowQuery<ByteBuffer,ByteBuffer> instance:
89          *
90          * java.lang.Error: Unresolved compilation problem: The method
91          * withColumnRange(ByteBuffer, ByteBuffer, boolean, int) is ambiguous
92          * for the type RowQuery<ByteBuffer,ByteBuffer>
93          *
94          * The compiler substitutes ByteBuffer=C for both startColumn and
95          * endColumn, compares it to its identical twin with that type
96          * hard-coded, and dies.
97          *
98          */
99         RowSliceQuery rq = keyspace.prepareQuery(columnFamily)
100                 .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax())
101                 .withRetryPolicy(retryPolicy.duplicate())
102                 .getKeySlice(CassandraHelper.convert(keys));
103
104         // Thank you, Astyanax, for making builder pattern useful :(
105         rq.withColumnRange(query.getSliceStart().asByteBuffer(),
106                 query.getSliceEnd().asByteBuffer(),
107                 false,
108                 query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column
109
110         OperationResult<Rows<ByteBuffer, ByteBuffer>> r;
111         try {
112             r = (OperationResult<Rows<ByteBuffer, ByteBuffer>>) rq.execute();
113         } catch (ConnectionException e) {
114             throw new TemporaryBackendException(e);
115         }
116
117         Rows<ByteBuffer,ByteBuffer> rows = r.getResult();
118         Map<StaticBuffer, EntryList> result = new HashMap<StaticBuffer, EntryList>(rows.size());
119
120         for (Row<ByteBuffer, ByteBuffer> row : rows) {
121             assert !result.containsKey(row.getKey());
122             result.put(StaticArrayBuffer.of(row.getKey()),
123                   CassandraHelper.makeEntryList(row.getColumns(),entryGetter, query.getSliceEnd(), query.getLimit()));
124         }
125
126         return result;
127     }
128
129     private static class AstyanaxGetter implements StaticArrayEntry.GetColVal<Column<ByteBuffer>,ByteBuffer> {
130
131         private final EntryMetaData[] schema;
132
133         private AstyanaxGetter(EntryMetaData[] schema) {
134             this.schema = schema;
135         }
136
137
138         @Override
139         public ByteBuffer getColumn(Column<ByteBuffer> element) {
140             return element.getName();
141         }
142
143         @Override
144         public ByteBuffer getValue(Column<ByteBuffer> element) {
145             return element.getByteBufferValue();
146         }
147
148         @Override
149         public EntryMetaData[] getMetaSchema(Column<ByteBuffer> element) {
150             return schema;
151         }
152
153         @Override
154         public Object getMetaData(Column<ByteBuffer> element, EntryMetaData meta) {
155             switch(meta) {
156                 case TIMESTAMP:
157                     return element.getTimestamp();
158                 case TTL:
159                     return element.getTtl();
160                 default:
161                     throw new UnsupportedOperationException("Unsupported meta data: " + meta);
162             }
163         }
164     }
165
166     @Override
167     public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
168         mutateMany(ImmutableMap.of(key, new KCVMutation(additions, deletions)), txh);
169     }
170
171     public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
172         storeManager.mutateMany(ImmutableMap.of(columnFamilyName, mutations), txh);
173     }
174
175     @Override
176     public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
177         throw new UnsupportedOperationException();
178     }
179
180     @Override
181     public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException {
182         if (storeManager.getPartitioner() != Partitioner.RANDOM)
183             throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used.");
184
185         AllRowsQuery allRowsQuery = keyspace.prepareQuery(columnFamily).getAllRows();
186
187         if (sliceQuery != null) {
188             allRowsQuery.withColumnRange(sliceQuery.getSliceStart().asByteBuffer(),
189                     sliceQuery.getSliceEnd().asByteBuffer(),
190                     false,
191                     sliceQuery.getLimit());
192         }
193
194         Rows<ByteBuffer, ByteBuffer> result;
195         try {
196             /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
197             OperationResult op = allRowsQuery.setRowLimit(storeManager.getPageSize()) // pre-fetch that many rows at a time
198                     .setConcurrencyLevel(1) // one execution thread for fetching portion of rows
199                     .setExceptionCallback(new ExceptionCallback() {
200                         private int retries = 0;
201
202                         @Override
203                         public boolean onException(ConnectionException e) {
204                             try {
205                                 return retries > 2; // make 3 re-tries
206                             } finally {
207                                 retries++;
208                             }
209                         }
210                     }).execute();
211
212             result = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) op).getResult();
213         } catch (ConnectionException e) {
214             throw new PermanentBackendException(e);
215         }
216
217         return new RowIterator(result.iterator(), sliceQuery);
218     }
219
220     @Override
221     public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
222         // this query could only be done when byte-ordering partitioner is used
223         // because Cassandra operates on tokens internally which means that even contiguous
224         // range of keys (e.g. time slice) with random partitioner could produce disjoint set of tokens
225         // returning ambiguous results to the user.
226         Partitioner partitioner = storeManager.getPartitioner();
227         if (partitioner != Partitioner.BYTEORDER)
228             throw new PermanentBackendException("getKeys(KeyRangeQuery could only be used with byte-ordering partitioner.");
229
230         ByteBuffer start = query.getKeyStart().asByteBuffer(), end = query.getKeyEnd().asByteBuffer();
231
232         RowSliceQuery rowSlice = keyspace.prepareQuery(columnFamily)
233                 .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax())
234                 .withRetryPolicy(retryPolicy.duplicate())
235                 .getKeyRange(start, end, null, null, Integer.MAX_VALUE);
236
237         // Astyanax is bad at builder pattern :(
238         rowSlice.withColumnRange(query.getSliceStart().asByteBuffer(),
239                 query.getSliceEnd().asByteBuffer(),
240                 false,
241                 query.getLimit());
242
243         // Omit final the query's keyend from the result, if present in result
244         final Rows<ByteBuffer, ByteBuffer> r;
245         try {
246             r = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) rowSlice.execute()).getResult();
247         } catch (ConnectionException e) {
248             throw new TemporaryBackendException(e);
249         }
250         Iterator<Row<ByteBuffer, ByteBuffer>> i =
251                 Iterators.filter(r.iterator(), new KeySkipPredicate(query.getKeyEnd().asByteBuffer()));
252         return new RowIterator(i, query);
253     }
254
255     @Override
256     public String getName() {
257         return columnFamilyName;
258     }
259
260     private static class KeyIterationPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> {
261         @Override
262         public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) {
263             return (row != null) && row.getColumns().size() > 0;
264         }
265     }
266
267     private static class KeySkipPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> {
268
269         private final ByteBuffer skip;
270
271         public KeySkipPredicate(ByteBuffer skip) {
272             this.skip = skip;
273         }
274
275         @Override
276         public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) {
277             return (row != null) && !row.getKey().equals(skip);
278         }
279     }
280
281     private class RowIterator implements KeyIterator {
282         private final Iterator<Row<ByteBuffer, ByteBuffer>> rows;
283         private Row<ByteBuffer, ByteBuffer> currentRow;
284         private final SliceQuery sliceQuery;
285         private boolean isClosed;
286
287         public RowIterator(Iterator<Row<ByteBuffer, ByteBuffer>> rowIter, SliceQuery sliceQuery) {
288             this.rows = Iterators.filter(rowIter, new KeyIterationPredicate());
289             this.sliceQuery = sliceQuery;
290         }
291
292         @Override
293         public RecordIterator<Entry> getEntries() {
294             ensureOpen();
295
296             if (sliceQuery == null)
297                 throw new IllegalStateException("getEntries() requires SliceQuery to be set.");
298
299             return new RecordIterator<Entry>() {
300                 private final Iterator<Entry> columns =
301                         CassandraHelper.makeEntryIterator(currentRow.getColumns(),
302                                 entryGetter,
303                                 sliceQuery.getSliceEnd(),sliceQuery.getLimit());
304
305                 @Override
306                 public boolean hasNext() {
307                     ensureOpen();
308                     return columns.hasNext();
309                 }
310
311                 @Override
312                 public Entry next() {
313                     ensureOpen();
314                     return columns.next();
315                 }
316
317                 @Override
318                 public void close() {
319                     isClosed = true;
320                 }
321
322                 @Override
323                 public void remove() {
324                     throw new UnsupportedOperationException();
325                 }
326             };
327         }
328
329         @Override
330         public boolean hasNext() {
331             ensureOpen();
332             return rows.hasNext();
333         }
334
335         @Override
336         public StaticBuffer next() {
337             ensureOpen();
338
339             currentRow = rows.next();
340             return StaticArrayBuffer.of(currentRow.getKey());
341         }
342
343         @Override
344         public void close() {
345             isClosed = true;
346         }
347
348         @Override
349         public void remove() {
350             throw new UnsupportedOperationException();
351         }
352
353         private void ensureOpen() {
354             if (isClosed)
355                 throw new IllegalStateException("Iterator has been closed.");
356         }
357     }
358 }