The goal of this document is to provide an overview of the main changes done in CASSANDRA-8099 so it's easier to dive into the patch. This assumes knowledge of the pre-existing code.
CASSANDRA-8099 refactors the abstractions used by the storage engine and as such impact most of the code of said engine. The changes can be though of as following two main guidelines:
- the new abstractions are much more iterator-based (i.e. it tries harder to avoid materializing everything in memory),
- and they are closer to the CQL representation of things (i.e. the storage engine is aware of more structure, making it able to optimize accordingly). Note that while those changes have heavy impact on the actual code, the basic mechanisms of the read and write paths are largely unchanged.
In the following, I'll start by describe the new abstractions introduced by the patch. I'll then provide a quick reference of existing class to what it becomes in the patch, after which I'll discuss how the refactor handles a number of more specific points. Lastly, the patch introduces some change to the on-wire and on-disk format so I'll discuss those quickly.
Where the existing storage engine is mainly handling cells, the new engine
groups cells into rows, and rows becomes the more central building block. A
Row
is identified by the value of it's clustering columns which are stored in
a Clustering
object (see below), and it associate a number of cells to each
of its non-PK non-static columns (we'll discuss static columns more
specifically later).
The patch distinguishes 2 kind of columns: simple and complex ones. The simple columns can have only 1 cell associated to them (or none), while the complex ones will have an arbitrary number of cells associated. Currently, the complex columns are only the non frozen collections (but we'll have non-frozen udt at some point and who knows what in the future).
Like before, we also have to deal with range tombstones. However, instead of
dealing with full range tombstones, we generally deal with
RangeTombstoneMarker
which is just one of the bound of the range tombstone
(so that a range tombstone is composed of 2 "marker" in practice, its start and
its end). I'll discuss the reasoning for this a bit more later. A
RangeTombstoneMarker
is identified by a Slice.Bound
(which is to RT markers
what the Clustering
is to Row
) and simply store its deletion information.
The engine thus mainly work with rows and range tombstone markers, and they are
both grouped under the common Atom
interface. An "unfiltered" is thus just that:
either a row or a range tombstone marker.
Side Note: the "Atom" naming is pretty bad. I've reused it mainly because it plays a similar role to the existing OnDiskAtom, but it's arguably crappy now because a row is definitively not "indivisible". Anyway, renaming suggestions are more than welcome. The only alternative I've come up so far are "Block" or "Element" but I'm not entirely convinced by either.
Atoms are sorted (within a partition). They are ordered by their
ClusteringPrefix
, which is mainly a common interface for the Clustering
of
Row
, and the Slice.Bound
of RangeTombstoneMarker
. More generally, a
ClusteringPrefix
is a prefix of the clustering values for the clustering
columns of the table involved, with a Clustering
being the special case where
all values are provided. A Slice.Bound
can be a true prefix however, having
only some of the clustering values. Further, a Slice.Bound
can be either a
start or end bound, and it can be inclusive or exclusive. Sorting make sure that
a start bound is before anything it "selects" (and conversely for the end). A
Slice
is then just 2 Slice.Bound
: a start and a end, and selects anything
that sorts between those.
ClusteringPrefix
are compared through the table ClusteringComparator
, which
is like our existing table comparator except that it only include comparators
for the clustering column values. In particular, it includes neither the
comparator for the column names themselves, nor the post-column-name comparator
for collections (the latter being handled through the CellPath
, see below).
There is a also a Clusterable
interface that Atom
implements and that
simply marks object that can be compared by a ClusteringComparator
, i.e.
objects that have a ClusteringPrefix
.
A cell holds the informations on a single value. It corresponds to a (CQL)
column, has a value, a timestamp and optional ttl and local deletion time.
Further, as said above, complex columns (collections) will have multiple
associated cells. Those cells are distinguished by their CellPath
, which are
compared through a comparator that depends on the column type. The cells of
simple columns just have a null
cell path.
As often as possible, atoms are manipulated through iterators. And this through
AtomIterator
, which is an iterator over the atoms of a single partition, and
PartitionIterator
, which is an iterator of AtomIterator
, i.e. an iterator
over multiple partitions. In other words a single partition query fundamentally
returns an AtomIterator
, while a range query returns a PartitionIterator
.
Those iterators are closeable and the code has to be make sure to always close
them as they will often have resources to clean (like an OpOrder.Group to close
or files to release).
The read path mainly consists in getting unfiltered and partition iterators from
sstables and memtable and merging, filtering and transforming them. There is a
number of functions to do just that (merging, filtering and transforming) in
AtomIterators
and PartitionIterators
, but there is also a number of classes
(RowFilteringAtomIterator
, CountingAtomIterator
, ...) that wraps one of
those iterator type to apply some filtering/transformation.
AtomIterator
and PartitionIterator
also have their doppelgänger
RowIterator
and DataIterator
which exists for the sake of making it easier
for the upper layers (StorageProxy and above) to deal with deletion. We'll
discuss those later.
While we avoid materializing partitions in memory as much as possible (favoring
the iterative approach), there is cases where we need/want to hold some subpart
of a partition in memory and we have a generic Partition
interface for those.
A Partition
basically corresponds to materializing an AtomIterator
and
is thus somewhat equivalent to the existing
ColumnFamily
(but again, many existing usage of ColumnFamily
simply use
AtomIterator
). Partition
is mainly used through the following
implementations:
PartitionUpdate
: this is what aMutation
holds and is used to gather update and apply them to memtables.AtomicBTreePartition
: this is the direct counterpart of AtomicBTreeColumns. The difference being that the BTree holds rows instead of cells. On updates, we merge the rows together to create a new one.CachedPartition
: this is used by the row cache.
The ReadCommand
class still exists, but instead of being just for single
partition reads, it's now a common abstract class for both single partition and
range reads. It then has 2 subclass: SinglePartitionReadCommand
and
PartitionRangeReadCommand
, the former of which has 2 subclasses itself:
SinglePartitionSliceCommand
and SinglePartitionNamesCommand
. All ReadCommand
,
have a ColumnFilter
and a DataLimits
(see below). PartitionRangeReadCommand
additionally has a DataRange
, which is mostly just a range of partition key
with a PartitionFilter
, while SinglePartitionReadCommand
has a partition
key and a PartitionFilter
(see below too).
The code to execute those queries locally, which used to be in ColumnFamilyStore
and CollationController
is now in those ReadCommand
classes. For instance, the
CollationController
code for names queries is in SinglePartitionNamesCommand
,
and the code to decide if we use a 2ndary index or not is directly in
ReadCommand.executeLocally()
.
Note that because they share a common class, all ReadCommand
actually
return a PartitionIterator
(an iterator over partitions), even single partition
ones (that iterator will just return one or zero result). It actually allows to
generalize (and simplify) the "response resolver". Instead of having separate resolver
for range and single partition queries, we only have DataResolver
and
DigestResolver
that work for any read command. This does mean that the patch
fixes CASSANDRA-2986, and that we could use digest queries for range if we
wanted to (not necessarily saying it's a good idea).
ColumnFilter
is the new List<IndexExpression>
. It holds those column restrictions that
can't be directly fulfilled by the PartitionFilter
, i.e. those that require either a
2ndary index, or filtering.
PartitionFilter
is the new IDiskAtomFilter
/QueryFilter
. There is still 2 variants:
SlicePartitionFilter
and NamesPartitionFilter
. Both variant includes the actual columns
that are queried (as we don't return full CQL rows anymore), and both can be
reversed. A names filter queries a bunch of rows by names, i.e. has a set of
Clustering
. A slice filter queries one or more slice of rows. A slice filter
does not however include a limit since that is dealt with by DataLimits
which
is in ReadCommand
directly.
DataLimits
implement the limits on a query. This is meant to abstract the differences between
how we count for thrift and for CQL. Further, for CQL, this allow to have a limit per partition,
which clean up how DISTINCT queries are handled and allow for CASSANDRA-7017 (the patch doesn't
add support for the PER PARTITION LIMIT syntax of that ticket, but handle it
internally otherwise).
The code also use the SliceableAtomIterator
abstraction. A
SliceableAtomIterator
is an AtomIterator
for which we basically know how to seek
into efficiently. In particular, we have a SSTableIterator
which is a
SliceableAtomIterator
. That SSTableIterator
replaces both the existing
SSTableNamesIterator
and SSTableSliceIterator
, and the respective
PartitionFilter
uses that SliceableAtomIterator
interface to query what
they want exactly.
For quick reference, here's the rough correspondence of old classes to new classes:
ColumnFamily
: for writes, this is handled byPartitionUpdate
andAtomicBTreePartition
. For reads, this is replaced byAtomIterator
(orRowIterator
, see the parts on tombstones below). For the row cache, there is a specificCachedPartition
(which is actually an interface, the implementing class beingArrayBackedPartition
)Cell
: there is still aCell
class, which is roughly the same thing than the old one, except that instead of having a cell name, cells are now in a row and correspond to a column.QueryFilter
: doesn't really exists anymore. What it was holding is now inSinglePartitionReadCommand
.AbstractRangeCommand
is nowPartitionRangeReadCommand
.IDiskAtomFilter
is nowPartitionFilter
.List<IndexExpression>
is nowColumnFilter
.RowDataResolver
,RowDigestResolver
andRangeSliceResponseResolver
are nowDataResolver
andDigestResolver
.Row
is nowAtomIterator
.List<Row>
is nowPartitionIterator
(orDataIterator
, see the part about tombstones below).AbstractCompactedRow
andLazilyCompactedRow
are not really needed anymore. Their corresponding code is inCompactionIterable
.
There is a few aspects worth noting regarding the handling of deleted and shadowed data:
- it's part of the contract of an
AtomIterator
that it must not shadow it's own data. In other words, it should not return a cell that is deleted by one of its own range tombstone, or by its partition level deletion. In practice this means that we get rid of shadowed data quickly (a good thing) and that there is a limited amount of places that have to deal with shadowing (merging being one). - Upper layer of the code (anything above StorageProxy) don't care about
deleted data (and deletion informations in general). Basically, as soon as
we've merge results for the replica, we don't need tombstones anymore.
So, instead of requiring all those upper layer to filter tombstones
themselves (which is error prone), we get rid of them as soon as we can,
i.e. as soon as we've resolved replica responses (so in the
ResponseResolver
). To do that and to make it clear when tombstones have been filtered (and make the code cleaner), we transform anAtomIterator
into aRowIterator
. Both being essentially the same thing, except that aRowIterator
only return live stuffs. Which mean in particular that it's an iterator ofRow
(since an unfiltered is either a row or a range tombstone and we've filtered tombstones). Similarly, aPartitionIterator
becomes aDataIterator
, which is just an iterator ofRowIterator
. - In the existing code a CQL row deletion involves a range tombstone. But as
row deletions are pretty frequent and range tombstone have inherent
inefficiencies, the patch adds the notion of row deletion, which is just
some optional deletion information on the
Row
. This can be though as just an optimization of range tombstones that span only a single row. - As mentioned at the beginning of this document, the code splits range
tombstones into 2 range tombstone marker, one for each bound. The problem
with storing full range tombstone as we currently do is that it makes it
harder to merge them efficiently, and in particular to "normalize"
overlapping ones. In practice, a given
AtomIterator
guarantees that there is only one range tombstone to worry about at any given time. In other words, at any point of the iterator, either there is a single open range tombstone or there is none, which makes things easier and more efficient. It's worth noting that we still have theRangeTombstone
class because for in-memory structures (PartitionUpdate
,AtomicBTreePartition
, ...) we currently use the existingRangeTombstoneList
out of convenience. This is kind of an implementation detail that could change in the future.
Static columns are handled separately from the rest of the rows. In practice,
each Partition
/AtomIterator
has a separate "static row" that holds the
value for all the static columns (that row can of course be empty). That row
doesn't really correspond to any CQL row, it's content is simply "merged" to
the result set in SelectStatement
(very much like we already do).
In CQL, a row can exists even if only its PK columns have values. In other
words, a Row
can be live even if it doesn't have any cells. Currently, we
handle this through the "row marker" (i.e. through a specific cell). The patch
makes this slightly less hacky by adding a timestamp (and potentially a ttl) to
each row, which can be though as the timestamp (and ttl) of the PK columns.
Further, when we query only some specific columns in a row, we need to add a
row (with nulls) in the result set if a row is live (it has live cells or the
timestamp we described above) even if it has no actual values for the queried
columns. We currently deal with that by querying entire row all the time, but
the patch change that. This does mean that even when we query only some
columns, we still need to have the information on whether the row itself is
live or not. And because we'll merge results from different sources (which can
include deletions), a boolean is not enough, so we include in a Row
object
the maximum live timestamp known for this row. Which currently mean that in
practice, we do scan the full row on disk but filter cells we're not interested
by right away (and in fact, we don't even deserialize the value of such cells).
This might be optimizable later on, but expiring data makes that harder (we
typically can't just pre-compute that max live timestamp when writing the
sstable since it can depend on the time of the query).
The patch makes relatively heavy use of a "flyweight"-like pattern. Typically,
Row
and Cell
data are stored in arrays, and a given AtomIterator
will
only use a single Row
object (and Cell
object) that points in those arrays
(and thus change at each call to hasNext()/next()
). This does mean that the
objects returned that way shouldn't be aliased (taken a reference of) without
care. The patch uses an Aliasable
interface to mark object that may use this
pattern and should thus potentially be copied in the rare cases where a
reference should be kept on them.
The patch slightly change the way reversed queries are handled. First, while
a reverse query should currently reverse its slices in SliceQueryFilter
, this
is not the case anymore: SlicePartitionFilter
always keep its slices in
clustering order and simply handles those from the end to the beginning on
reversed queries. Further, if a query is reversed, the results are returned in
this reverse order all the way through. Which differs from the current code
where ColumnFamily
actually always holds cells in forward order (making it a
mess for paging and forcing us to re-reverse everything in the result set).
As the storage engine works iteratively, compaction simply has to get iterators
on the sstables, merge them and write the result back, along with a simple
filter that skip purgeable tombstones in the process. So there isn't really a
need for AbstractCompactedRow
/LazilyCompactedRow
anymore and the
compaction/purging code is now in CompactionIterable
directly.
The current way to handle short reads would require us to consume the whole
result before deciding on a retry (and we currently retry the whole command),
which doesn't work too well in an iterative world. So the patch moves read
protection in DataResolver
(where it kind of belong anyway) and we don't
retry the full command anymore. Instead, if we realize that a given node has a
short read while its result is consumed, we simply query this node (and this
node only) for a continuation of the result. On top of avoiding the retry of
the whole read (and limiting the number of node queried on the retry), this
also make it trivial to solve CASSANDRA-8933.
Given that the main abstractions are changed, the existing on-wire
ColumnFamily
format is not appropriate anymore and the patch switches to a
new format. The same can be told of the on-disk format, and while it is not an
objective of CASSANDRA-8099 to get fancy on the on-disk format, using the
on-wire format as on-disk format was actually relatively simple (until more
substantial changes land with CASSANDRA-7447) and the patch does that too.
For a given partition, the format simply serialize rows one after another
(atoms in practice). For the on-disk format, this means that it is now rows
that are indexed, not cells. The format uses a header that is written at the
beginning of each partition for the on-wire format (in
AtomIteratorSerializer
) and is kept as a new sstable Component
for
sstables. The details of the format are described in the javadoc of
AtomIteratorSerializer
(only used by the on-wire format) and of
AtomSerializer
, so let me just point the following differences compared to
the current format:
- Clustering values are only serialized once per row (we even skip serializing the number of elements since that is fixed for a given table).
- Column names are not written for every row. Instead they are written once in the header. For a given row, we support two small variant: dense and sparse. When dense, cells come in the order the columns have in the header, meaning that if a row doesn't have a particular column, this column will still use a byte. When sparse, we don't have anything if the column doesn't have a cell, but each cell has an additional 2 bytes which points into the header (so with vint it should rarely take more than 1 byte in practice). The variant used is automatically decided based on stats on how many columns set a row has on average for the source we serialize.
- Values for fixed-width cell values are serialized without a size.
- If a cell has the same timestamp than its row, that timestamp is not repeated for the cell. Same for the ttl (if applicable).
- Timestamps, ttls and local deletion times are delta encoded so that they are ripe for vint encoding. The current version of the patch does not yet activate vint encoding however (neither for on-wire or on-disk).