1 package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
3 import com.google.common.base.Predicate;
4 import com.google.common.collect.ImmutableMap;
5 import com.google.common.collect.Iterables;
6 import com.google.common.collect.Iterators;
7 import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
8 import com.thinkaurelius.titan.diskstorage.*;
9 import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
10 import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
11 import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
12 import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
13 import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
15 import org.apache.cassandra.config.CFMetaData;
16 import org.apache.cassandra.config.Schema;
17 import org.apache.cassandra.db.*;
18 import org.apache.cassandra.db.ConsistencyLevel;
19 import org.apache.cassandra.db.composites.CellNames;
20 import org.apache.cassandra.db.composites.Composite;
21 import org.apache.cassandra.db.filter.IDiskAtomFilter;
22 import org.apache.cassandra.db.filter.SliceQueryFilter;
23 import org.apache.cassandra.dht.*;
24 import org.apache.cassandra.exceptions.InvalidRequestException;
25 import org.apache.cassandra.exceptions.IsBootstrappingException;
26 import org.apache.cassandra.exceptions.RequestTimeoutException;
27 import org.apache.cassandra.exceptions.UnavailableException;
28 import org.apache.cassandra.service.StorageProxy;
29 import org.apache.cassandra.service.StorageService;
30 import org.apache.cassandra.thrift.SlicePredicate;
31 import org.apache.cassandra.thrift.SliceRange;
32 import org.apache.cassandra.thrift.ThriftValidation;
33 import org.apache.commons.lang.ArrayUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import javax.annotation.Nullable;
39 import java.nio.ByteBuffer;
41 import java.util.concurrent.TimeUnit;
43 import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
45 public class CassandraEmbeddedKeyColumnValueStore implements KeyColumnValueStore {
47 private static final Logger log = LoggerFactory.getLogger(CassandraEmbeddedKeyColumnValueStore.class);
49 private final String keyspace;
50 private final String columnFamily;
51 private final CassandraEmbeddedStoreManager storeManager;
52 private final TimestampProvider times;
53 private final CassandraEmbeddedGetter entryGetter;
55 public CassandraEmbeddedKeyColumnValueStore(
58 CassandraEmbeddedStoreManager storeManager) throws RuntimeException {
59 this.keyspace = keyspace;
60 this.columnFamily = columnFamily;
61 this.storeManager = storeManager;
62 this.times = this.storeManager.getTimestampProvider();
63 entryGetter = new CassandraEmbeddedGetter(storeManager.getMetaDataSchema(columnFamily),times);
67 public void close() throws BackendException {
71 public void acquireLock(StaticBuffer key, StaticBuffer column,
72 StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
73 throw new UnsupportedOperationException();
77 public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException {
78 IPartitioner partitioner = StorageService.getPartitioner();
80 // see rant about this in Astyanax implementation
81 if (partitioner instanceof RandomPartitioner || partitioner instanceof Murmur3Partitioner)
82 throw new PermanentBackendException("This operation is only supported when byte-ordered partitioner is used.");
84 return new RowIterator(keyRangeQuery, storeManager.getPageSize(), txh);
88 public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
89 return new RowIterator(getMinimumToken(), getMaximumToken(), query, storeManager.getPageSize(), txh);
94 * Create a RangeSliceCommand and run it against the StorageProxy.
96 * To match the behavior of the standard Cassandra thrift API endpoint, the
97 * {@code nowMillis} argument should be the number of milliseconds since the
98 * UNIX Epoch (e.g. System.currentTimeMillis() or equivalent obtained
99 * through a {@link TimestampProvider}). This is per
100 * {@link org.apache.cassandra.thrift.CassandraServer#get_range_slices(ColumnParent, SlicePredicate, KeyRange, ConsistencyLevel)},
101 * which passes the server's System.currentTimeMillis() to the
102 * {@code RangeSliceCommand} constructor.
104 private List<Row> getKeySlice(Token start,
106 @Nullable SliceQuery sliceQuery,
108 long nowMillis) throws BackendException {
109 IPartitioner partitioner = StorageService.getPartitioner();
111 SliceRange columnSlice = new SliceRange();
112 if (sliceQuery == null) {
113 columnSlice.setStart(ArrayUtils.EMPTY_BYTE_ARRAY)
114 .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY)
117 columnSlice.setStart(sliceQuery.getSliceStart().asByteBuffer())
118 .setFinish(sliceQuery.getSliceEnd().asByteBuffer())
119 .setCount(sliceQuery.hasLimit() ? sliceQuery.getLimit() : Integer.MAX_VALUE);
121 /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
122 SlicePredicate predicate = new SlicePredicate().setSlice_range(columnSlice);
124 RowPosition startPosition = start.minKeyBound(partitioner);
125 RowPosition endPosition = end.minKeyBound(partitioner);
130 CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
131 IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, cfm, null);
133 RangeSliceCommand cmd = new RangeSliceCommand(keyspace, columnFamily, nowMillis, filter, new Bounds<RowPosition>(startPosition, endPosition), pageSize);
135 rows = StorageProxy.getRangeSlice(cmd, ConsistencyLevel.QUORUM);
136 } catch (Exception e) {
137 throw new PermanentBackendException(e);
144 public String getName() {
149 public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
152 * This timestamp mimics the timestamp used by
153 * {@link org.apache.cassandra.thrift.CassandraServer#get(ByteBuffer,ColumnPath,ConsistencyLevel)}.
155 * That method passes the server's System.currentTimeMillis() to
156 * {@link ReadCommand#create(String, ByteBuffer, String, long, IDiskAtomFilter)}.
157 * {@code create(...)} in turn passes that timestamp to the SliceFromReadCommand constructor.
159 final long nowMillis = times.getTime().toEpochMilli();
160 Composite startComposite = CellNames.simpleDense(query.getSliceStart().asByteBuffer());
161 Composite endComposite = CellNames.simpleDense(query.getSliceEnd().asByteBuffer());
162 SliceQueryFilter sqf = new SliceQueryFilter(startComposite, endComposite,
163 false, query.getLimit() + (query.hasLimit()?1:0));
164 ReadCommand sliceCmd = new SliceFromReadCommand(keyspace, query.getKey().asByteBuffer(), columnFamily, nowMillis, sqf);
166 List<Row> slice = read(sliceCmd, getTx(txh).getReadConsistencyLevel().getDB());
168 if (null == slice || 0 == slice.size())
169 return EntryList.EMPTY_LIST;
171 int sliceSize = slice.size();
173 throw new PermanentBackendException("Received " + sliceSize + " rows for single key");
175 Row r = slice.get(0);
178 log.warn("Null Row object retrieved from Cassandra StorageProxy");
179 return EntryList.EMPTY_LIST;
182 ColumnFamily cf = r.cf;
185 log.debug("null ColumnFamily (\"{}\")", columnFamily);
186 return EntryList.EMPTY_LIST;
189 if (cf.isMarkedForDelete())
190 return EntryList.EMPTY_LIST;
192 return CassandraHelper.makeEntryList(
193 Iterables.filter(cf.getSortedColumns(), new FilterDeletedColumns(nowMillis)),
200 private class FilterDeletedColumns implements Predicate<Cell> {
202 private final long tsMillis;
203 private final int tsSeconds;
205 private FilterDeletedColumns(long tsMillis) {
206 this.tsMillis = tsMillis;
207 this.tsSeconds = (int)(this.tsMillis / 1000L);
211 public boolean apply(Cell input) {
212 if (!input.isLive(tsMillis))
215 // Don't do this. getTimeToLive() is a duration divorced from any particular clock.
216 // For instance, if TTL=10 seconds, getTimeToLive() will have value 10 (not 10 + epoch seconds), and
217 // this will always return false.
218 //if (input instanceof ExpiringCell)
219 // return tsSeconds < ((ExpiringCell)input).getTimeToLive();
226 public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
227 throw new UnsupportedOperationException();
231 public void mutate(StaticBuffer key, List<Entry> additions,
232 List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
233 Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new
234 KCVMutation(additions, deletions));
235 mutateMany(mutations, txh);
239 public void mutateMany(Map<StaticBuffer, KCVMutation> mutations,
240 StoreTransaction txh) throws BackendException {
241 storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh);
244 private static List<Row> read(ReadCommand cmd, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException {
245 ArrayList<ReadCommand> cmdHolder = new ArrayList<ReadCommand>(1);
247 return read(cmdHolder, clvl);
250 private static List<Row> read(List<ReadCommand> cmds, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException {
252 return StorageProxy.read(cmds, clvl);
253 } catch (UnavailableException e) {
254 throw new TemporaryBackendException(e);
255 } catch (RequestTimeoutException e) {
256 throw new PermanentBackendException(e);
257 } catch (IsBootstrappingException e) {
258 throw new TemporaryBackendException(e);
259 } catch (InvalidRequestException e) {
260 throw new PermanentBackendException(e);
264 private static class CassandraEmbeddedGetter implements StaticArrayEntry.GetColVal<Cell,ByteBuffer> {
266 private final EntryMetaData[] schema;
267 private final TimestampProvider times;
269 private CassandraEmbeddedGetter(EntryMetaData[] schema, TimestampProvider times) {
270 this.schema = schema;
275 public ByteBuffer getColumn(Cell element) {
276 return org.apache.cassandra.utils.ByteBufferUtil.clone(element.name().toByteBuffer());
280 public ByteBuffer getValue(Cell element) {
281 return org.apache.cassandra.utils.ByteBufferUtil.clone(element.value());
285 public EntryMetaData[] getMetaSchema(Cell element) {
290 public Object getMetaData(Cell element, EntryMetaData meta) {
293 return element.timestamp();
295 return ((element instanceof ExpiringCell)
296 ? ((ExpiringCell) element).getTimeToLive()
299 throw new UnsupportedOperationException("Unsupported meta data: " + meta);
304 private class RowIterator implements KeyIterator {
305 private final Token maximumToken;
306 private final SliceQuery sliceQuery;
307 private final StoreTransaction txh;
310 * This RowIterator will use this timestamp for its entire lifetime,
311 * even if the iterator runs more than one distinct slice query while
312 * paging. <b>This field must be in units of milliseconds since
313 * the UNIX Epoch</b>.
315 * This timestamp is passed to three methods/constructors:
317 * <li>{@link org.apache.cassandra.db.Column#isMarkedForDelete(long now)}</li>
318 * <li>{@link org.apache.cassandra.db.ColumnFamily#hasOnlyTombstones(long)}</li>
320 * the {@link RangeSliceCommand} constructor via the last argument
321 * to {@link CassandraEmbeddedKeyColumnValueStore#getKeySlice(Token, Token, SliceQuery, int, long)}
324 * The second list entry just calls the first and almost doesn't deserve
325 * a mention at present, but maybe the implementation will change in the future.
327 * When this value needs to be compared to TTL seconds expressed in seconds,
328 * Cassandra internals do the conversion.
329 * Consider {@link ExpiringColumn#isMarkedForDelete(long)}, which is implemented,
330 * as of 2.0.6, by the following one-liner:
332 * {@code return (int) (now / 1000) >= getLocalDeletionTime()}
334 * The {@code now / 1000} does the conversion from milliseconds to seconds
335 * (the units of getLocalDeletionTime()).
337 private final long nowMillis;
339 private Iterator<Row> keys;
340 private ByteBuffer lastSeenKey = null;
341 private Row currentRow;
342 private int pageSize;
344 private boolean isClosed;
346 public RowIterator(KeyRangeQuery keyRangeQuery, int pageSize, StoreTransaction txh) throws BackendException {
347 this(StorageService.getPartitioner().getToken(keyRangeQuery.getKeyStart().asByteBuffer()),
348 StorageService.getPartitioner().getToken(keyRangeQuery.getKeyEnd().asByteBuffer()),
354 public RowIterator(Token minimum, Token maximum, SliceQuery sliceQuery, int pageSize, StoreTransaction txh) throws BackendException {
355 this.pageSize = pageSize;
356 this.sliceQuery = sliceQuery;
357 this.maximumToken = maximum;
359 this.nowMillis = times.getTime().toEpochMilli();
360 this.keys = getRowsIterator(getKeySlice(minimum, maximum, sliceQuery, pageSize, nowMillis));
364 public boolean hasNext() {
366 return hasNextInternal();
367 } catch (BackendException e) {
368 throw new RuntimeException(e);
373 public StaticBuffer next() {
377 throw new NoSuchElementException();
379 currentRow = keys.next();
380 ByteBuffer currentKey = currentRow.key.getKey().duplicate();
383 return StaticArrayBuffer.of(currentKey);
385 lastSeenKey = currentKey;
390 public void close() {
395 public void remove() {
396 throw new UnsupportedOperationException();
400 public RecordIterator<Entry> getEntries() {
403 if (sliceQuery == null)
404 throw new IllegalStateException("getEntries() requires SliceQuery to be set.");
406 return new RecordIterator<Entry>() {
407 final Iterator<Entry> columns = CassandraHelper.makeEntryIterator(
408 Iterables.filter(currentRow.cf.getSortedColumns(), new FilterDeletedColumns(nowMillis)),
410 sliceQuery.getSliceEnd(),
411 sliceQuery.getLimit());
413 //cfToEntries(currentRow.cf, sliceQuery).iterator();
416 public boolean hasNext() {
418 return columns.hasNext();
422 public Entry next() {
424 return columns.next();
428 public void close() {
433 public void remove() {
434 throw new UnsupportedOperationException();
440 private final boolean hasNextInternal() throws BackendException {
446 boolean hasNext = keys.hasNext();
448 if (!hasNext && lastSeenKey != null) {
449 Token lastSeenToken = StorageService.getPartitioner().getToken(lastSeenKey.duplicate());
451 // let's check if we reached key upper bound already so we can skip one useless call to Cassandra
452 if (maximumToken != getMinimumToken() && lastSeenToken.equals(maximumToken)) {
456 List<Row> newKeys = getKeySlice(StorageService.getPartitioner().getToken(lastSeenKey), maximumToken, sliceQuery, pageSize, nowMillis);
458 keys = getRowsIterator(newKeys, lastSeenKey);
459 hasNext = keys.hasNext();
465 private void ensureOpen() {
467 throw new IllegalStateException("Iterator has been closed.");
470 private Iterator<Row> getRowsIterator(List<Row> rows) {
474 return Iterators.filter(rows.iterator(), new Predicate<Row>() {
476 public boolean apply(@Nullable Row row) {
477 // The hasOnlyTombstones(x) call below ultimately calls Column.isMarkedForDelete(x)
478 return !(row == null || row.cf == null || row.cf.isMarkedForDelete() || row.cf.hasOnlyTombstones(nowMillis));
483 private Iterator<Row> getRowsIterator(List<Row> rows, final ByteBuffer exceptKey) {
484 Iterator<Row> rowIterator = getRowsIterator(rows);
486 if (rowIterator == null)
489 return Iterators.filter(rowIterator, new Predicate<Row>() {
491 public boolean apply(@Nullable Row row) {
492 return row != null && !row.key.getKey().equals(exceptKey);
498 private static Token getMinimumToken() throws PermanentBackendException {
499 IPartitioner partitioner = StorageService.getPartitioner();
501 if (partitioner instanceof RandomPartitioner) {
502 return ((RandomPartitioner) partitioner).getMinimumToken();
503 } else if (partitioner instanceof Murmur3Partitioner) {
504 return ((Murmur3Partitioner) partitioner).getMinimumToken();
505 } else if (partitioner instanceof ByteOrderedPartitioner) {
506 //TODO: This makes the assumption that its an EdgeStore (i.e. 8 byte keys)
507 return new BytesToken(com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil.zeroByteBuffer(8));
509 throw new PermanentBackendException("Unsupported partitioner: " + partitioner);
513 private static Token getMaximumToken() throws PermanentBackendException {
514 IPartitioner partitioner = StorageService.getPartitioner();
516 if (partitioner instanceof RandomPartitioner) {
517 return new BigIntegerToken(RandomPartitioner.MAXIMUM);
518 } else if (partitioner instanceof Murmur3Partitioner) {
519 return new LongToken(Murmur3Partitioner.MAXIMUM);
520 } else if (partitioner instanceof ByteOrderedPartitioner) {
521 //TODO: This makes the assumption that its an EdgeStore (i.e. 8 byte keys)
522 return new BytesToken(com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil.oneByteBuffer(8));
524 throw new PermanentBackendException("Unsupported partitioner: " + partitioner);