Cassandra Commit Log

Cassandra Commit Log is an append only log that is used to track the mutations made to Column Families and provide durability for those mutations. Every mutation is appended to a Commit Log first before applied to an in memory write-back cache called Memtable. If a Memtable is not flushed to disk to create a SSTable then the mutations stored in it can be lost if Cassandra suddenly shutdowns or crashes. At startup, Cassandra replays the Commit Log to recover the data that was not stored to disk. 

A mutation can be insert, update or delete operation on a partition. A mutation can involve single or multiple columns. Each mutation is associated with a partition key and timestamp. If multiple mutations occur on same partition key, Cassandra will maintain the latest one. Memtable also combines different mutations occurring on the same partition and group them as a single Row mutation.

Commit Log provides a faster way to persist the changes applied to Memtables before they flushed to disk. SSTables are stored in separate files for each Column Family. Cassandra can receive mutations to multiple Column families at the same time. If these mutations need to be persisted then Cassandra needs to issue multiple disk writes one for each mutation applied to a Column Family. This will cause more disk seeks and random I/O to disk which will impact performance. Commit Log provides a way to solve this performance issue. Commit log provides a way to convert this random I/O to sequential I/O by writing mutations of different Column Families in to the same file. Also it provides batching mutation so in a single disk write, multiple mutations are synced to disk to reduce disk I/O and make the changes durable. It also helps to buffer mutations of each Column Family in to a separate Memtable and flush them to disk using sequential I/O.

Commit Log can not grow forever. It needs to be truncated often to reduce its size so that Cassandra spends less time in replaying the unsaved changes next time it starts from a sudden shutdown. Cassandra provides few configuration settings to tune the Commit Log size. Also mutations in Commit Log are cleaned up when a Memtable is flushed to SSTable. For each Memtable flush, Cassandra maintains the latest offset of the Commit Log called Replay position. When Memtable flush completes successfully, all the mutations for that particular Column Family which are stored before the Replay position are discarded from Commit Log. 

Commit Log Configuration

The setting commitlog_segment_size_in_mb controls the max size of an individual Commit log segment file. A new Commit Log segment is created when the current active segment size reaches this threshold. It defaults to 32 MB.

Commit Log disk sync frequency can be controlled using the configuration setting commitlog_sync.  If it is set to batch mode, then Cassandra will not acknowledge the writes until Commit Log is synced to disk. Instead of executing fsync for every write, it provides another setting commitlog_sync_batch_window_in_ms which indicates number of milliseconds between successive fsyncs. It is set to 2 milliseconds by default. Other alternative option is setting commitlog_sync to periodic. In this case Cassandra will immediately acknowledge writes and the Commit Log will be synced periodically for every commitlog_sync_period_in_ms milliseconds. The default value for commitlog_sync is periodic and the sync period is set to 10 milliseconds. Commit Log protects the data stored locally. The the interval between fsyncs will have direct impact on the amount of unsaved data when Cassandra is suddenly down. Having replication factor greater than one will protect the data if one of the replica goes down.

The total disk space used for Commit Log files is controlled using commitlog_total_space_in_mb setting. When the disk space reached this threshold, Cassandra will flush all the Column families that have mutations in the oldest Commit Log segment and removes the oldest segment. For this purpose a list of dirty Column families are managed in each Commit Log segment.  The threshold defaults to maximum of 8 GB or 1/4th size of total space of Commit Log volume.

There can be multiple segments storing the Commit Log at any time. All Commit Log Segments are managed in a queue and the current active segment is at the tail of the queue. A Commit Log segment can be recycled when all the mutations in it are flushed successfully.

Commit Log Segment

It represents a single Commit Log file on the disk and stores mutations of different Column Families. The file name format is CommitLog-<Version>-<Msec Timestamp>.log (For ex: CommitLog-6-1621835590940.log).  It maintains a map to manage the dirty Column Families and their last mutation positions in the file. Commit Log segment is memory mapped and the operations are applied to memory mapped buffer and synced to disk periodically.


 

The contents of Commit Log segment are memory mapped to a buffer. Mutations are applied to this buffer and on sync they are persisted to disk. At the beginning of Commit Log a file header is written which includes version, the Commit Log ID and parameters associated with Compressed or Encrypted Commit Logs. Also CRC checksum is created with header fields and is written at the end of the header.

Mutations are allocated on active segments. If there is not enough space to write it in the current active segment then a new segment is created. Commit Log Segments are managed as a queue and the active segment is at the tail of the queue

Commit Log remembers the mutations which are synced to disk and which are not synced using two pointers. Last synced Offset represents the position in the Commit Log where all the mutations which were written before it has been synced to disk. Last Marker Offset points to the position of last mutation which has not yet synced to the disk. All the mutations between these two pointers are not synced to the disk.

 A Commit Log divided in to sections separated with sync markers. These sync markers are chained where the previous marker points to next sync marker. Every time a sync is performed on a Commit Log a new marker is created.  The sync marker consists of two integers. The first integer stores a Commit Log file pointer where the next sync marker going to be written. It indicates the section length. Second integer stores the CRC of the Commit Log ID and the the file position where the sync marker is written.

 


Cassandra Memtable


Memtables store the current mutations applied to Column families. They function as a write back cache and provide faster write performance and faster read performance for recently written data. Mutations are organized in sorted order using skip list data structure in the Memtable. Each Column family is associated with its own Memtable. In this blog, on-heap based Memtable type is described.
 
Memtable Size

There are two types of settings to limit the Memtable size depending on whether Memtable is stored in heap space or off heap space. Both are specified in units of MB.  By default these setting are commented in cassandra.yaml. 

# memtable_heap_space_in_mb: 2048
# memtable_offheap_space_in_mb: 2048

If Memtable size is not configured in then Cassandra assigns 1/4th of max heap size allocated to Cassandra process. For ex: if max size is 64GB then 16GB is set as Memtable size.

Memtable Allocation Types

The allocation type in cassandra.yaml specifies how to store Memtable data. The supported allocation types are

unslabbed_heap_buffers
heap_buffers
offheap_buffers
offheap_objects

By default Cassandra is configured to store Memtable data in heap space. It uses a Slab allocator to reduce the heap fragmentation.

memtable_allocation_type: heap_buffers

 

Above figure, shows the association between different components of Memtable. Memtables are created during Cassandra startup. Based on the configured Memtable storage type in cassandra.yaml, a pool is allocated either on heap or on both on heap and off heap. All the Memtables use the same Memtable Pool which controls overall Memtable storage usage. Each Memtable will have its own Allocator which will serves the allocation requests for it.  Allocator objects will have a reference to the parent Memtable Pool for the configured limits.

Memtable contains a reference to its Column Family, the Memtable pool, the positions in to the Commit Log segments where its data is persisted, and the Mutations. The Commit Log Lower Bound is set when Memtable is created and it points to the position of the active commit log segment in which this Memtable mutations are stored. Commit Log Upper Bound is set to the last mutation position in the Commit Log. When a Memtable is flushed then these two bounds are used to discard the Commit Log segments which store the mutations in this Memtable. The bounds can refer to either the same Commit Log or different Commit Log segments. A single Commit Log can contain mutations from multiple Memtables and also a single Memtable can persist mutations in to multiple Commit Log segments. Please review Cassanadr Commit Log for additional details.

The Min timestamp field stores the smallest timestamp of all partitions stored in this Memtable. Live Data Size stores the size of all mutations applied to this Memtable. Partitions contain the actual Column Family mutations applied in this Memtable.  The Current Operations metric tracks the number of columns updated in the stored mutations.

 

Slab Allocator

Slab allocator tries to combine smaller memory allocations less than 128KB in to a bigger region of size 1MB to avoid heap fragmentation. Allocations bigger than 128KB are directly allocated from JVM's heap. The basic idea is to reclaim more space of older generation of JVM heap when the allocations in a slab region have similar lifetime. For example, if a column of integer type is updated, it needs heap space of 4 bytes. It will be allocated in the current slab region and the next free offset pointer is bumped by 4 bytes and the allocation count is incremented. If an application is allocating more than 128KB size then those allocations will spill all over heap and cause heap fragmentation and eventually JVM needs to de-fragment the heap space.

Each Memtable will have its own Slab allocator. Multiple Memtable are managed using a single Slab Pool which will have the configured Memtable size settings and it ensures the overall data stored in all Memtables falls below the threshold.


 


Partitions

Partitions in a Memtable are managed using a Skip List data structure. Each partition stores a mutations applied to a particular row. A mutation can be insert, update or delete operation applied to a row and include zero or more columns. 

cqlsh:flowerskeyspace> describe iris;

CREATE TABLE flowerskeyspace.iris (
    id int PRIMARY KEY,
    class text,
    petallength float,
    petalwidth float,
    sepallength float,
    sepalwidth float
)

cqlsh:flowerskeyspace> update iris set sepalwidth=6.4 where id=4;

cqlsh:flowerskeyspace> update iris set sepalwidth=6.5 where id=5

cqlsh:flowerskeyspace> update iris set sepalwidth=3.3 where id=3;


In the above example, there are three updates to three different rows.
These are stored as three partitions in the Memtable

partitions size = 3

[0] "DecoratedKey(-7509452495886106294, 00000005)" -> "[flowerskeyspace.iris]
key=5 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=6.5 ts=1622222969042456]"

[1] "DecoratedKey(-2729420104000364805, 00000004)" -> "[flowerskeyspace.iris]
key=4 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=6.5 ts=1622222995765139]"

[2] "DecoratedKey(9010454139840013625, 00000003)" -> "[flowerskeyspace.iris]
key=3 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=3.3 ts=1622218525529221]"

If we update another column with Row with id = 3, the results will be
merged with the previous mutation

cqlsh:flowerskeyspace> update iris set petalwidth=1.5 where id=3;

The partition with index 2 will be updated

[2] "DecoratedKey(9010454139840013625, 00000003)" -> "[flowerskeyspace.iris]
key=3 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [petalwidth sepalwidth]]
Row[info=[ts=-9223372036854775808] ]: | [petalwidth=1.5 ts=1622232283101650],
[sepalwidth=3.3 ts=1622218525529221]" 

 

 

Memtable Flush

Each Column family will have its own Memtable. All the live mutations to that Column family will be appended to commit log and also applied to Memtable. During the read, data from the Memtables and SSTables and the results are merged. When a Memtable heap threshold exceeds or expired or commitlog threshold reaches, it will be flushed to disk to create a SSTable. 

The expiry is controlled using a flush period setting which is set as a schema property (memtable_flush_period_in_ms) on Column family definition. By default flush period is set to zero. 

Memtable heap space exceeding condition is determined by configured Memtable heap size limit and internally calculated Memtable cleanup threshold. The cleanup threshold is calculated using memtable_flush_writers which defaults to 2 for single Cassandra data directory. So the memtable_cleanup_threshold is 0.33.

memtable_cleanup_threshold = 1 / (memtable_flush_writers + 1)

If configured heap space limit is 2GB then the Memtable size threshold is 2GB * 0.33333334 which is around 683 MB. When the size of all non flushed Memtables added together exceeds 683 MB, then a Memtable with largest live data size is flushed.

When total space used by commit logs exceeds the configured commitlog_total_space_in_mb, then Cassandra selects the Memtables that are associated to oldest segments and flushes them. If the oldest commit log segment contains date for different Column families then Memtables of those Column families will be flushed. By default commitlog_total_space_in_mb is set minimum value of either 8MB or 1/4th of total space of commitlog directory.

There are other factors such as repair, nodetool drain or flush can cause Memtables to be flushed to disk. A Column family can have more than one Memtable when a flush is pending on its  previous Memtable.

Cassandra Memtable Metrics

Cassandra provides various metrics for Memtable as part of Column family metrics.

MetricDescription
 memtableColumnsCount      
Number of columns present in the Memtable
 memtableOnHeapSizeThe amount of data stored in the Memtable which is allocated in heap memory. This also includes overhead associated with Columns
 memtableOffHeapSizeThe amount of data stored in off heap space by this Memtable. This also includes overhead associated with Columns
 memtableLiveDataSizeThe amount of live data size
 AllMemtableOnHeapSizeThe amount of heap memory used by all the Memtables
 AllMemtableOffHeapSizeThe amount of data stored in off heap space by all Memtables
 AllMemtableLiveDataSize   
The amount of live data stored in all Memtables
 memtableSwitchCountNumber of times flush has resulted in the Memtable being switched out

 Commit Log provides durability to the mutations in a Memtable. Get more details on Commit Log at

Commit Log

Cassandra how to repair a row if regular repair fails

Recently we had a strange issue with Cassandra repair. A table is replicated across three Datacenters and some of rows were missing on all the nodes in one of the newly added Datacenter. We have hundreds of production Cassandra clusters of various sizes, having few nodes to 10s of nodes in each cluster. Most of the cluster have multiple Datacenters and use replication across them. We didn't had this kind of issue so far. This article describes various way we tried to fix the problem.  These steps will be useful to recover the data in Cassandra 2.2.*.

First thing we tried listing rows by running cqlsh on each node and executed select query to list the rows in this particular table. All the nodes except the nodes in recently added DC, lets call it DC3 where all the rows were missing in the result. This is an obvious problem when the replication settings for the Keyspace where this table resides do not have the DC3 in it.

Keyspace: Test: Replication Strategy: org.apache.cassandra.locator.NetworkTopologyStrategy Durable Writes: true Options: [DC1:3, DC2:3]

We manually added the DC3 to the Keyspace replication strategy and ran Cassandra nodetool repair on DC3 nodes for this Keyspace and Table. After running repair, we verified the table rows on DC3 nodes but still the rows were missing.

Keyspace: Test: Replication Strategy: org.apache.cassandra.locator.NetworkTopologyStrategy Durable Writes: true Options: [DC1:3, DC2:3, DC3:3]

Second thing we tried was running Cassandra nodetool repair with different DC options and running it on different Datacenter nodes but still the rows were missing. We tried full repair and entire keyspace repair but still it was unable to repair the rows.

Third thing we tried is removing all SSTables for this problematic Table and ran repair. The repair restored the SSTables but the rows were still missing

Fourth thing we tried is resetting the SSTable repairedAt status to unrepaired for all SSTables using sstablerepairedset tool and ran the repair again. Still the rows were missing.

Finally we used Cassandra read repair to fix the missing rows problems. We ran cqlsh on the DC3 nodes, set the consistency level to quorum and read each missing row one by one by executing select and this fixed the issue and was able to repair the data.

# cat commands 
CONSISTENCY
CONSISTENCY QUORUM
CONSISTENCY
select * from "Test"."SampleTable" WHERE key = 60b8b2e1-5866-4a4a-b6f8-3e521c44f43c;
 

Hope this helps if some one stuck with similar issue.


Cassandra 3.x SSTable Storage Format

Cassandra SSTable storage format is changed in 3.0 to support higher level CQL structure directly at Storage engine level. Older format of SSTable was designed to support a very simple model of storing basic key/value pairs which was adequate to support Thrift API. In 2.* and earlier versions of Cassandra, Thrift was predominantly used to define the database schema in the older versions of 2.*. But thrift's column family definition is not sufficient to create tables to store structured application data. CQL was later introduced in Cassandra 2.*, to enable applications to define structured data using SQL like semantics and query the data using primary key. But the simple SSTable storage format available in 2.* was not suitable to support the relational characteristics of the row keys and columns. To query CQL data using primary key, redundant information is required to be stored along with CQL columns in SSTable to associate them with the primary key. This caused lot of duplicate data in the SSTables and inefficiencies in querying the data.

In Cassandra 3.*, the SSTable format is redesigned to support CQL data types and the relational mappings, naturally at Storage engine level to reduce the SSTable size and to support executing CQL queries efficiently. Basically the new storage engine recognizes rows as first class citizens whereas in the old format, the rows are represented as a sequence of cells with no row semantics in it. The row is the primary building block for the new storage engine.


In the new format, SSTable stores a set of Partitions and each partition stores a sequence of rows or range tombstone markers. The partitions are ordered by the partition keys and the rows or markers in a partition are indexed by their clustering keys.  The Flags field in the partition determines the type of row following it. The flags are stored as one byte and if extended flags is set in the first byte then another byte is added to indicate the extended flags.

Rest of the article uses an example CQL table with partitioning key and clustering keyto describe the details.

CREATE TABLE flowerskeyspace.irisplot (
    petallength float,
    sepallength float,
    id int,
    color text,
    PRIMARY KEY (petallength, sepallength, id)
)


insert into irisplot(petallength, sepallength, id, color) values (6,6.3,5,'blue');
insert into irisplot(petallength, sepallength, id, color) values (5.1,5.8,6,'blue');
insert into irisplot(petallength, sepallength, id, color) values (1.4,5.1,1,'red');
insert into irisplot(petallength, sepallength, id, color) values (1.4,4.9,2,'red');
insert into irisplot(petallength, sepallength, id, color) values (4.5,6.4,4,'green');
insert into irisplot(petallength, sepallength, id, color) values (4.7,7,3,'green');




In the above irisplot table the primary key is composed of three CQL columns (petallength, sepallength, id). The first part of the primary key petallength is automatically chosen as Partitioning key. The partitioning key controls which node is responsible to store each CQL row. Rest of the CQL columns in the primary key are called Clustering key (sepallength, id) which is used to select a specific row columns. 


Our example table irisplot has single secondary CQL column "color" and it is prefixes with the clustering key value. If there are multiple secondary CQL columns defined then the data representing the clustering key will be repeated for each one of the secondary CQL columns in Cassandra 2.*.

JSON dump of SSTable partition in Cassandra 2.*:
{"key": "1.4",
 "cells": [["4.9:2:","",1582054358578646],
           ["4.9:2:color","red",1582054358578646],
           ["5.1:1:","",1582054337118746],
           ["5.1:1:color","red",1582054337118746]]}

JSON dump of SSTable in Cassandra 3.*:[
  {
    "partition" : {
      "key" : [ "4.7" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 18,
        "clustering" : [ 7.0, 3 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:59.655787Z" },
        "cells" : [
          { "name" : "color", "value" : "green" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "1.4" ],
      "position" : 41
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 59,
        "clustering" : [ 4.9, 2 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.948204Z" },
        "cells" : [
          { "name" : "color", "value" : "red" }
        ]
      },
      {
        "type" : "row",
        "position" : 79,
        "clustering" : [ 5.1, 1 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.946200Z" },
        "cells" : [
          { "name" : "color", "value" : "red" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "5.1" ],
      "position" : 100
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 118,
        "clustering" : [ 5.8, 6 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.943769Z" },
        "cells" : [
          { "name" : "color", "value" : "blue" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "6.0" ],
      "position" : 140
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 158,
        "clustering" : [ 6.3, 5 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.921656Z" },
        "cells" : [
          { "name" : "color", "value" : "blue" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "4.5" ],
      "position" : 178
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 196,
        "clustering" : [ 6.4, 4 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.950560Z" },
        "cells" : [
          { "name" : "color", "value" : "green" }
        ]
      }
    ]
  }

In the new storage engine, still the partitioning key is used to distribute the data to nodes in the cluster but the clustering key is used to index a row in a partition.
For example in the above table partition with key [ "1.4" ] contains two rows with different clustering keys [ 4.9, 2 ] and [ 5.1, 1 ].
{
    "partition" : {
      "key" : [ "1.4" ],
      "position" : 41
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 59,
        "clustering" : [ 4.9, 2 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.948204Z" },
        "cells" : [
          { "name" : "color", "value" : "red" }
        ]
      },
      {
        "type" : "row",
        "position" : 79,
        "clustering" : [ 5.1, 1 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.946200Z" },
        "cells" : [
          { "name" : "color", "value" : "red" }
        ]
      }
    ]
  }

The SSTable size in Cassandra 3.* is smaller compared to Cassandra 2.*

$ ls -al cassandra-3.11/data/data/flowerskeyspace/irisplot-ecde46205d9d11eaa60a47b84e452526/md-1-big-Data.db
-rw-r--r--  1 bharat  staff  161 Mar  3 14:26 cassandra-3.11/data/data/flowerskeyspace/irisplot-ecde46205d9d11eaa60a47b84e452526/md-1-big-Data.db

$ ls -al cassandra-2.2/data/data/flowerskeyspace/irisplot-dbf35720528411eabbb8b16d9d604ffd/lb-1-big-Data.db
-rw-r--r--  1 bharat  staff  286 Feb 18 11:34 cassandra-2.2/data/data/flowerskeyspace/irisplot-dbf35720528411eabbb8b16d9d604ffd/lb-1-big-Data.db

SSTable hex dump

Single Partition Details



The storage format also includes the details of CQL table schema in Statistics.db file
$ ./tools/bin/sstablemetadata data/data/flowerskeyspace/irisplot-ecde46205d9d11eaa60a47b84e452526/md-1-big-Data.db

KeyType: org.apache.cassandra.db.marshal.FloatType
ClusteringTypes: [org.apache.cassandra.db.marshal.FloatType, org.apache.cassandra.db.marshal.Int32Type]
StaticColumns: {}
RegularColumns: {color:org.apache.cassandra.db.marshal.UTF8Type}

Varint Encoding


For encoding data compactly, the new storage format uses varint type to store integers. it is similar concept as in Google's Protocol Buffer encoding but the Cassandra implementation is different.
 In varint encoding format, integers are stored using variable number of bytes instead of using four bytes for 32 bit integer or eight bytes for 64 bit integer. The first byte's most significant bit will indicate whether an integer is stored in a single byte or in multiple bytes. If first byte's most significant bit is set then the integer is stored in extra bytes otherwise its value is stored in the same byte. This encoding scheme can take one to nine bytes to represent integers. The number of leading bits set in the first byte from the most significant bit will indicate the number of bytes needed to store an integer.  A zero bit separates the length indicated bytes from the bits that store actual value of integer. Python code to decode the varint data is available at unpack_vint()

In the above example Partition, the row timestamp is encoded as varint. The timestamp is a 64 bit integer but it s compactly stored in three bytes using varint encoding.

 Delta Encoding


Also you can notice that the timestamp is not an absolute value. The row timestamp is stored as delta value from the encoding minimum timestamp in Statistics table.

$ tools/bin/sstablemetadata data/data/flowerskeyspace/irisplot-ecde46205d9d11eaa60a47b84e452526/md-1-big-Statistics.db
EncodingStats minTimestamp: 1583274357921656

To get the absolute value of Row timestamp we need to ad the integer value in the varint encoding ('110100111010111110011') to the encoding minimum timestamp 1583274357921656

We can use Python to convert it back to absolute value

int('110100111010111110011', 2) =  1734131
Row timestamp = 1583274357921656 + 1734131 = 1583274359655787

$ python -c "import datetime,pytz; print datetime.datetime.fromtimestamp(1583274359655787/1000000.0, pytz.utc)"
2020-03-03 22:25:59.655787+00:00

We can validate using the timestamp from JSON dump of SSTable
 "type" : "row",
 "position" : 18,
 "clustering" : [ 7.0, 3 ],
 "liveness_info" : { "tstamp" : "2020-03-03T22:25:59.655787Z" },
 "cells" : [ { "name" : "color", "value" : "green" ]

Columns


There are two types of columns Simple and Complex. Simple column is encoded as a single Cell and Complex column is encoded as a sequence of Cells preceded by deletion time and number of columns in it.



For further details on storage engine changes please review the references.


References

putting-some-structure-storage-engine

Storange engine change Patch description

Cassandra CQL Storage Format



In Cassandra 2.* version, a CQL table is stored using the same storage format that is used for storing thrift based column family. Cassandra stores extra information as part of column names to detect CQL clustering keys and other CQL columns.

CQL Row format includes its Partitioning key, Clustering key followed by a sequence of CQL columns. Each CQL column is prefixed with clustering key value. A clustering key can be defined as a combination of multiple CQL columns.

A new term is introduced to represent CQL columns called Cell. A Cell consists of cell name and cell value. A Cell name is a composite type which is a sequence of individual components.  A component is encoded as three parts. First part is the value length, second part is the value and the last part is a byte representing the end of component. The end of component byte is set to 0 always for CQL columns. Each CQL column which is part of clustering key will be stored as a separate component in the cell in the order that is defined in schema.


Example1: CQL Table schema with only partitioning key and no clustering key

CREATE KEYSPACE flowerskeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy' , 'replication_factor' : 1 };
  CREATE TABLE flowerskeyspace.iris (
    id int PRIMARY KEY,
    class text,
    petallength float,
    petalwidth float,
    sepallength float,
    sepalwidth float
)



CQL Table Rows:

insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (2,4.9,3.0,1.4,0.2,'Iris-setosa');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (1,5.1,3.5,1.4,0.2,'Iris-setosa');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (3,7.0,3.2,4.7,1.4,'Iris-versicolor');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (4,6.4,3.2,4.5,1.5,'Iris-versicolor');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (5,6.3,3.3,6.0,2.5,'Iris-virginica');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (6,5.8,2.7,5.1,1.9,'Iris-virginica');




CQL data in Json format:

{"key": "5",
 "cells": [["","",1581757206044154],
           ["class","Iris-virginica",1581757206044154],
           ["petallength","6.0",1581757206044154],
           ["petalwidth","2.5",1581757206044154],
           ["sepallength","6.3",1581757206044154],
           ["sepalwidth","3.3",1581757206044154]]},


Uncompressed SSTable Data:



CQL Table Row Storage Format:





Example2: CQL Table schema with partitioning key and clustering key

CREATE TABLE flowerskeyspace.irisplot (
    petallength float,
    sepallength float,
    id int,
    color text,
    PRIMARY KEY (petallength, sepallength, id)
)


insert into irisplot(petallength, sepallength, id, color) values (6,6.3,5,'blue');
insert into irisplot(petallength, sepallength, id, color) values (5.1,5.8,6,'blue');
insert into irisplot(petallength, sepallength, id, color) values (1.4,5.1,1,'red');
insert into irisplot(petallength, sepallength, id, color) values (1.4,4.9,2,'red');
insert into irisplot(petallength, sepallength, id, color) values (4.5,6.4,4,'green');
insert into irisplot(petallength, sepallength, id, color) values (4.7,7,3,'green');





JSON data:

[
{"key": "4.7",
 "cells": [["7.0:3:","",1582054414657067],
           ["7.0:3:color","green",1582054414657067]]},
{"key": "1.4",
 "cells": [["4.9:2:","",1582054358578646],
           ["4.9:2:color","red",1582054358578646],
           ["5.1:1:","",1582054337118746],
           ["5.1:1:color","red",1582054337118746]]},
{"key": "5.1",
 "cells": [["5.8:6:","",1582054298177996],
           ["5.8:6:color","blue",1582054298177996]]},
{"key": "6.0",
 "cells": [["6.3:5:","",1582054268167535],
           ["6.3:5:color","blue",1582054268167535]]},
{"key": "4.5",
 "cells": [["6.4:4:","",1582054399453891],
           ["6.4:4:color","green",1582054399453891]]}
]

Uncompressed SSTable data:


For every CQL Row an empty component is added as a marker to allow inserting rows with NULL secondary values. For the empty component, two bytes are used to store the length of the name which is set to 0 and followed by a one byte end of component (EOC) field set to 0. This empty component marker (00 00 00) will be at the beginning of row if there is no clustering key is defined or at the end of clustering key if there is one. For example in the irisplot table we add a row with only clustering key


JSON data:

[
{"key": "4.0",
 "cells": [["7.0:3:","",1582057689702366]]}
]

SSTable uncompressed data:

0000000 00 04 40 80 00 00 7f ff ff ff 80 00 00 00 00 00
0000010 00 00 00 11 00 04 40 e0 00 00 00 00 04 00 00 00
0000020 03 00 00 00 00 00 00 05 9e df 82 9b df de 00 00
0000030 00 00 00 00



Row Tombstone

When a row is deleted, only the row key with the deletion info which consists of 8 byte markedForDeleteAt timestamp and 4 byte localDeletionTime are stored.


JSON data:
[
{"key": "6.0",
 "metadata": {"deletionInfo": {"markedForDeleteAt":1582065526802267,"localDeletionTime":1582065526}},
 "cells": []}
]

Uncompressed SSTable Data:

0000000 00 04 40 c0 00 00 5e 4c 67 76 00 05 9e e1 55 bc
0000010 87 5b 00 00                       

1582065526802267 = 0x00059ee155bc875b
1582065526 = 0x 5e4c6776

Partitioning key and Clustering key

Basically in Cassandra 2.*, the CQL data model was fitted to the existing thrift data storage engine which is designed for storing Thrift column families. This caused redundant storage of clustering key data for each secondary columns. For ex: in the above irisplot table the primary key is composed of three CQL columns (petallength, sepallength, id). The first part of the primary key petallength is automatically chosen as Partitioning key. The partitioning key controls which node is responsible to store each CQL row. Rest of the CQL columns in the primary key are called Clustering key (sepallength, id)

In the above two sample rows both have same petallength of 1.4 cms. These two rows are stored in the same partition but have different clustering keys. The first row's clustering key is "4.9:2:" and the second row's clustering key is "5.1:1:"


{"key": "1.4",
 "cells": [["4.9:2:","",1582054358578646],
           ["4.9:2:color","red",1582054358578646],
           ["5.1:1:","",1582054337118746],
           ["5.1:1:color","red",1582054337118746]]}

 Our example table irisplot has single secondary CQL column "color" and it is prefixes with the clustering key value. If there are multiple secondary CQL columns defined then the data representing the clustering key will be repeated for each one of the secondary CQL columns. In Cassandra 3.* the storage format is changed to have the CQL semantics at the storage level to store CQL data efficiently and also to simplify the CQL queries.


To understand the internal storage format please use the python script Cassandra tools



Cassandra Tombstones

In an eventual consistent system like Cassandra, information about deleted keys should be stored to avoid reading the deleted data. When a row or column is deleted, this information is stored as tombstone.  Tombstones are stored until gc grace period associated with column family not reached. Only major compaction removes tombstones that are older than gc grace period. Tombstones are spread to all replicas when repair is performed. It is important to run repair regularly to eliminate resurrecting deleted data.

A Row tombstone is created when a row is deleted, a Column tombstone is created when a specified column is deleted from a row and a Range tombstone is created when user deletes a super column. Tombstone includes application's deletion timestamp (MarkedForDeleteAt) and local deletion timestamp (LocalDeletionTime). Range tombstone also includes start and end column names.

MarkedForDeleteAt

It is a timestamp in milliseconds specified by Application in delete request. For a live column or row it is stored as max long value 0x8000000000000000.

LocalDeletionTime

It is a timestamp in seconds set by Cassandra server when it receives a request to delete a column or super column. The value is set as current time in seconds. For a live column or row it is stored as max integer value 0x7FFFFFFF..

Tombstone thresholds

tombstone_warn_threshold (Default value 1000) is used to log a warning message when  scanned tombstone count is greater than this limit but the query execution will continue and tombstone_failure_threshold (Default value 100000) is used to abort slice query if scanned tombstone count exceeds this limit. An error message (TombstoneOverwhelmingException) is also logged. These settings can be changed in cassandra.yaml

Tombstones can impact performance of slice queries especially for wider rows. When Cassandra executes a column slice query it needs to read columns from all the SSTables that include the given row and filter out tombstones. And all these tombstones need to be kept in memory till the row fragments from all SSTables are merged which increase heap space usage.

 Estimated droppable tombstones

sstablemetadata tool can be used to estimate the amount of tombsones in a given SSTable. The value is the ratio of tombsones to estimated column count in a SSTable. Tombsones are cleared during compaction based on the gc_grace period. The tool by default uses the current time as the gc time to decide whether a tombstone can be dropped or not. To find out the tombstones are created before a given time, use  command line argument --gc_grace_seconds to pass GC time in seconds

Usage: sstablemetadata [--gc_grace_seconds n] <sstable filenames>

$ ./tools/bin/sstablemetadata data/data/ycsb/usertable-8dd787404e2c11e8b1a22f4e9082bb4e/mc-1-big-Data.db | grep "Estimated droppable tombstones"
Estimated droppable tombstones: 0.0

Paxos Consensus Algorithm

Paxos consensus algorithm is used to achieve consensus across a set of unreliable processes which are running independently.  Consensus is required to agree on a data value proposed by one of the processes. Reaching consensus becomes difficult when processes fail or messages can be lost or delayed. An example of this kind of environment is asynchronous distributed system.

Paxos consensus algorithm is used in some of the popular highly scalable systems such as Google Spanner and Amazon Dynamo DB. Most of the modern distributed systems use commodity hardware and they fall in asynchronous distributed system category.  They use multiple machines and in some cases hosted in multiple data centers to provide high availability and scalability. Machine outages, disk failures and network partitions are common in these kind of environments. Consensus is required for the common functions such as designating one of the process as a leader to perform some computation, ordering input events so that same output is calculated by multiple processes, acquiring a lock/lease on a shared resource, replicating some state across multiples machines or to provide distributed transactions.

An example use case of Paxos algorithm is to replicate the state of a key-value store to multiple instances to make it fault tolerant and to improve read latency. In a key-value store, the data is managed as a sorted map of key-value records. Each record is associated with a unique key and it is used to access its associated value. A reliable technique to replicate data is using state machine replication approach.

In state machine replication, a set of server processes called replicas, each execute a finite state machine. Each replica starts in the same initial state. The input to the state machine is client requests. The state machine processes the input request and transitions to an output state and produces output which is sent back to client. State machine maps (input, state) pair to (output, state) pair.

Client can send requests to any replica. Each replica based on its current state executes client requests and produces results.  If each replica started with same initial state and receives inputs in same order, all replicas will transition to the same state and the data is consistent.

In our example input is key-value store commands such as put or delete. Commands are first appended to a replication log and data snapshots are created from it. The replication log at each replica needs to contain the input commands in the same order. In other words all replicas need to agree on the order of inputs in replication log. This can be accomplished using Paxos consensus algorithm.

A variant of Paxos algorithm called Multi-Paxos is used to sequence the commands in the replication log which uses instance numbers.One of the replica is disignated as a leader which proposes values. Client commands are forwarded to leader replica which assigns them sequence numbers in the replication log file.

Replication log contains a sequence of log records. Each log record is uniquely identified by a sequence number and contains an input command to data store. For example a log record of insert operation contains key and value being inserted. Replication log file is stored on disk. Only read and append operations supported on the log file. Database snapshots are created by replaying the records in replication log.


A simple way to understand Shamir's secret sharing scheme

Shamir's secret sharing method provides a technique to secure data by decomposing the data in to multiple blocks and store them on different  machines. It transforms data in to n blocks such that even access to k -1 blocks of data doesn't reveal any information about the original data. This technique is called (k, n) threshold scheme which can be used to secure data without using encryption keys.

It is based on polynomial interpolation on finite field. Data is set as the constant term (Coefficient a0)in a polynomial of degree k - 1. A polynomial is constructed by choosing random k -1 Coefficients (a1, a2..ak-1) from a field and the data is converted to a number (D)and is set as Coefficient a0.

f(x) = a0 + a1x + ... + a^(k-1)x

The field is generated by choosing a prime number greater than D and n. This polynomial is evaluated at n points where D1 = f(1), D2= f(2),..., Dn = f(n) and these n data blocks are stored. So the original data D is not stored but it is calculated from k blocks.

The way I understood this technique is by geometry perspective, using a line. For example if we have given a point (x1, y1) in 2 dimensional space, there can be infinite number of lines pass through it. To calculate the y-intercept we need at least two points to calculate the slope of the line to find it. Without 2 points there are infinite possible y-intercepts.



A line is represented by f(x) = a1 * x + a0 a first degree polynomial. The term y-intercept is calculated by setting x to 0. If we choose (2, n) threshold scheme to secure data, n blocks of data (D1,D2 to Dn) is calculated by evaluating f(x) at x = 1 to n. We need at least two blocks and their indexes to calculate the slope. The index value gives the x coordinate value and the data block value is the y value. With two points (x1, y1) and (x2, y2), the slope can be calculated as

m = (y2 - y1)  / (x2 - x1)



The slope is the coefficient a1. Once we calculate a1 now we can calculate a0 by calculating y1 -  a1x1 or y2 - a1x2. In the above graph, the slope of the line is 0.5 and a0 is 1.


Cassandra nodetool repair

Cassandra nodetool provides a command to run repair on a specified endpoint. The repair command uses Anti-Entropy service which detects inconsistencies and repairs data across all replicas of the specified endpoint. Cassandra provides high availability and durability by storing multiple copies (replicas) of data. Data inconsistencies can occur due to server, disk failures or network partitions. 

nodetool repair is executed on a specified endpoint in the cluster and provides several options to control the amount of data to be repaired. Data is organized in to keyspaces and column families and distributed to the nodes using data partitioning scheme. User can execute repair on a specified column family or all column families in a specified keyspace or repair all column families from all keyspaces.

At storage layer, each column family data is stored separately in SSTables. A column family contains a set of rows. Each row is mapped to a token using its key and it is assigned to a node using data partitioning scheme. Rows in a column family are sorted based on their token order in SSTables. Rows are replicated to other nodes using replication strategy and the number of replicas are determined by replication factor. Each keyspace can have different replication strategies, so Cassandra repairs each keyspace data separately.

In order to perform repair, we need to identify replicas of a row.  Each row can have a different set of replicas based on its token. But all the rows in a token range will have same set of replicas. So instead of repairing each row separately, Cassandra repairs rows of a token range together.
command t
Cassandra uses token ranges to split a node repair activity in to small repair jobs. By default all token ranges assigned to a given node are repaired. These token ranges include primary partition ranges as well as replication ranges. nodetool repair -pr command can be used to repair only primary partition ranges of a node.


The target node which receives repair request from nodetool repair command is also called Repair coordinator node.  Repair coordinator node creates repair sessions for each token range being repaired and each repair session is assigned with a unique UUID.
 
A repair session (RepairSession) is responsible to perform repair on all the column families in a specified keyspace. it creates repair jobs (RepairJob) for each column family to be repaired and adds jobs to a queue. A repair job repairs a single token range across all replicas associated to its token range. Repair job contains two phases. The first phase is Validation phase in which Merkle trees are built at each replica. The second phase is called Sync phase where each replica's Merkle tree is compared with each other replica's Merkle tree and the differing rows are synced. Repair jobs in a repair session are either can be executed sequentially or in parallel.

Validation Phase

In validation phase, repair coordinator node requests Merkle tree from each replica for the token range being repaired.  Each replica builds a Merkle tree that contains hash values of rows from the requested column family and token range. A validation compaction is triggered for the requested column family. For each row, a MD5 hash value of its contents is computed and added to Merkle tree. It is not feasible to construct a Merkle tree to its maximum hash depth to include each row hash value in a separate leaf node. Cassandra builds Merkle tree of variable depth based on the estimated number of keys in the requested token range. It limits the max depth of Merkle tree to 20. Each leaf node's hash value is computed by mixing hash values of rows that belong to its sub token range. If a leaf node's sub token range do not contain any rows then an empty hash value is used as its hash value.

Sync Phase

In this phase, received Merkle trees are compared. Each replica's Merkle tree is compared with that of each other replica and differing sub token ranges identified between replicas. If root hash values of two Merkle trees match then the data in the token range being compared is consistent. If root hash values are not equal then recursively hash values of left branches are compared followed by hash values of right branches until all the nodes in trees are traversed. For each differing sub token range, streaming repair is performed between the replicas to sync up the data.

Cassandra Data Replication

In a distributed system like Cassandra, data replication enables high availability and durability. Cassandra replicates rows in a column family on to multiple endpoints based on the replication strategy associated to its keyspace. The endpoints which store a row are called replicas or natural endpoints for that row. Number of replicas and their location are determined by replication factor and replication strategy. Replication strategy controls how the replicas are chosen and replication factor determines the number of replicas for a key. Replication strategy is defined when creating a keyspace and replication factor is configured differently based on the chosen replication strategy.

Two kinds of replication strategies available in Cassandra. First one is SimpeStrategy which is rack unaware and data center unaware policy. It is commonly used when nodes are in a single data center. The second strategy is NetworkTopologyStategy which is both rack aware and data center aware. Even if you have single data center but nodes are in different racks it is better to use NetworkTopologyStategy which is rack aware and enables fault tolerance by choosing replicas from different racks. Also if you are planning to expand the cluster in the future to have more than one data center then choosing NetworkTopologyStategy from the beginning avoids data migration.

Data partitioner determines coordinator node for each key. The coordinator node is the fist replica for a key which is also called primary replica. If replication factor is N, a key's coordinator node replicates it to other N-1 replicas.

In SimpleStrategy, successor nodes or the nodes on the ring immediate following in clockwise direction to the coordinator node are selected as  replicas.



In NetworkTopologyStategy, nodes from distinct available racks in each data center are chosen as replicas. User need to specify per data center replication factor in multiple data center environment. In each data center, successor nodes to the coordinator node which are from distinct racks are chosen.



Cassandra nodetool getendpoints command can be used to retrieve the replicas of a key.