Cassandra Heap Issues

 Cassandra Out Of Memory (OOM) exceptions can cause performance impact and reduce availability.  The OOM exception is reported when Cassandra is unable to allocate heap space. Some times the application workload alone can cause Cassandra to quickly use all the available heap space and throw OOM exceptions. Common cause for OOM is application workload combined with background jobs such as repair or Compactions can increase heap usage and eventually lead to OOM exceptions. Some of the components and background jobs which uses high heap space are

Memtables

Cassandra by default allocates 1/4th of max heap to Memtables. It can be configured cassandra.yaml to reduce the heap usage by Memtable allocations.   

# memtable_heap_space_in_mb: 2048
# memtable_offheap_space_in_mb: 2048

When Cassandra is under severe heap pressure it will start flushing aggressively which will result in smaller SSTables and the creates high background compaction activity.

Reads

CQL queries that read many columns or rows per query can consume lot of heap space. Especially when the rows are wider (contains huge number of columns per row) or when rows contains tombstones. When rows are replicated to multiple nodes, the coordinator node needs to hold the data longer to meet the consistency constraints. When rows are spread across multiple SSTables, the results need to be merges from multiple SSTables and this increases heap space usage. Cassandra need to filter out the tombstones to read the live columns from a row. The number of tombstones scanned per query also affects the heap usage. Cassandra nodetool cfstats command provides some metrics on these factors. Following sample cfstat output collected from a live cluster shows the application is using wider rows with huge number of columns and there exist huge number of tombstones per row

                Average live cells per slice (last five minutes): 8.566489361702128
                Maximum live cells per slice (last five minutes): 1109
                Average tombstones per slice (last five minutes): 1381.0478723404256
                Maximum tombstones per slice (last five minutes): 24601

Cassandra nodetool cfhistograms command also provides some metrics on the SSTables. It displays statistics on row size, column count and average number of SSTables scanned per read.

Percentile  SSTables     Write Latency    Read Latency    Partition Size        Cell Count
                                      (micros)             (micros)             (bytes)
50%             1.00           51.01                 182.79               1331                      4
75%             1.00           61.21                 219.34               2299                      7
95%             3.00           73.46                 2816.16             42510                    179
98%             3.00           88.15                 4055.27             182785                  770
99%             3.00           105.78               4866.32             454826                  1916
Min              0.00           6.87                   9.89                  104                         0
Max             3.00           379.02               74975.55          44285675122         129557750

Concurrent Read threads

Cassandra uses by default 32 concurrent threads in read stage. These threads serve read queries. If there are more read requests are received then they go in to pending queue. Cassandra nodetool tpstats command ca be used to check the active and pending read stage tasks. If each read query consuming high amount of heap space then reducing this setting can help heap usage.

Repair 

Cassandra repair creates Merkle trees to detect inconsistencies row data among multiple replicas. By default Cassandra creates Merkle tree till depth 15. Each Merkle tree contains 32768 internal nodes and consumes lot of heap space. Repair running concurrently with application load can cause high heap usage.

Compaction

Background compactions can also cause high heap usage. By default compaction_throughput_mb_per_sec is set to 16 MB.

Other factors

Other factors that can cause high heap usage are Cassandra Hints,  Row Cache, Tombstones and  space used for Index entries. 

Workarounds

If enough system memory available, increasing the max heap size can help. Some time min heap and max heap set to same value. reducing the min heap size can help reduce the heap usage. If application is allocating bigger chunks of memory, increasing G1GC region size can help in reducing heap fragmentation and improve heap utilization. Some times reducing concurrent read threads also help in reducing heap usage which can impact read performance.
 

Cassandra compaction

Cassandra compaction is a background process which merges multiple SSTables of a Column Family to one or more SSTables to improve the read performance and to reclaim the space occupied deleted data. Cassandra supports different Compaction strategies to accommodate different workloads. In this blog post, the size tiered strategy is described.

In size tiered compaction strategy, SSTables are grouped in to different buckets based on their size. SSTables with similar sizes are grouped in to same bucket. For each bucket an average size is calculated using the sizes of SSTables in it. There are two options for a bucket, bucketLow and bucketHigh which define the bucket width. If a SSTable size falls between (bucketLow * average bucket size) and  (bucketHigh * average bucket size) then it will be includes in this bucket. The average size of the bucket is updated by the newly added SSTable size.  The default value for bucketLow is 0.5 and for bucketHigh is 1.5.

Next, the buckets with SSTable count greater than or equal to the minimum compaction threshold are selected. The thresholds are defined as part of Compaction settings in Column Family schema. 

compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} 

Finally, Cassandra selects  the bucket that contains SSTables with highest read activity. For each bucket a read hotness score is assigned and the bucket with highest read hotness score is selected for compaction. Read Hotness score for a SSTable is the number of reads per second per key. A two hour read rate is used in the calculation. For a bucket, the read hotness score is the combined read hotness score of all SSTable in it. 

If two buckets have same read hotness score then the bucket with smaller SSTables is selected. If the selected bucket has SSTable count more than maximum compaction threshold then the coldest SSTables are dropped from the bucket to meet the max threshold. The SSTables in a bucket are sorted in descending order on their read hotness score and the coldest SSTables will be at the end of sorted order.

Before performing compaction, the available disk space is checked against the estimated compaction output size. If the available disk space is not enough then largest SSTables are dropped from the input list of SSTables to reduce the storage space needed. During compaction, the input SSTables are scanned and the same rows from multiple SSTables are merged. Compaction can cause heavy disk I/O. To limit the disk I/O, there is a configuration setting to throttle the compaction activity compaction_throughput_mb_per_sec. The setting concurrent_compactors limits the number of simultaneous compactions at a time.

How the actual compaction is performed

 A scanner object is created for each input SSTable which can sequentially iterate the partitions in it. The input SSTable scanners added to a min heap based Priority queue to merge the partitions. The partitions are consumed from the priority queue. The scanner at the root of the priority queue contains the least element in the sorted order of partitions. If multiple SSTables contain the same partition which matches with the root node of priority queue, matching partition from all those SSTables are consumes and the SSTables are advanced to point to the next partitions. Once a scanner is consumed from Priority queue, it is removed from the top and added back at the end and the data structure will re-heapify to bubble up the smallest element to the root. During compaction, deleted rows with delete timestamp older than the GC grace period (gc_before) are also deleted. 

Compaction Statistics

The running compaction progress can be monitored using nodetool compactionstats command. It will display the number of pending compactions, the keyspace name and the column family that is being compacted and the remaining time of compaction.

At the end of Compaction, Cassandra stores various statistics collected during compaction in to compaction_history table. The statistics can be viewed using nodetool compactionhistory command.

$ ./bin/nodetool compactionhistory

Compaction History:
id            keyspace_name   columnfamily_name compacted_at            bytes_in bytes_out rows_merged    
f0382bb0-c692-11eb-badd-7377db2be2d0 system  local 2021-06-05T23:46:36.266 10384 5117 {5:1}          
Stop Active Compaction

To stop an active compation use 

$ nodetool stop COMPACTION

Disable Auto Compactions

Cassandra nodetool provide a command to disable auto compactions. 

$ /opt/cassandra/bin/nodetool disableautocompaction

To check the compaction status

$ /opt/cassandra/bin/nodetool compactionstats

pending tasks: 0

Tombstone Compaction

Some times we want to run a single SSTable compaction to drop the tombstones which are older than gc_grace period. It can be done using the JMX interface. Cassandra JMX interface can be accessed using jmxterm jar. The allowed tombstone threshold can be set on Column family compaction properties using alter table command. This setting is allowed only for SizeTieredCompactionStrategy type.

$ java -jar jmxterm-1.0-alpha-4-uber.jar
$>open <Cassandra PID>
$>run -b org.apache.cassandra.db:type=CompactionManager forceUserDefinedCompaction <SSTable file path> 

We can chose the SSTable to perform tombstone compaction by checking the sstable metadata 

$>/opt/cassandra/tools/bin/sstablemetadata <SSTable path> | grep "Estimated droppable tombstones"
Estimated droppable tombstones: 0.1  

Major Compaction

By default auto compaction selects the input SSTables for compaction based on the threshold defined as part of Column family definition. To run compaction on all the SSTables for a given column family, nodetool compact command can be be used to force a major compaction. Major compaction can results in to two compaction runs one of repaired SSTables and other one is for not repaired SSTables. If tombstones and live columns spread across these two different sets of SSTables, major compaction may not be able to drop tombstones. The tool /opt/cassandra/tools/bin/sstablerepairedset can be used to reset the repairAt to 0 as to include all SSTables in single set.

Typical Problems

Compaction is causing too much high CPU usage. This could be due to many pending compactions or the throughput is low. Sometimes increasing the throughput limit (compaction_throughput_mb_per_sec) from default 16 MB to higher value can benefit. In other cases reducing the concurrent_compactors to 1 can help easing the load.



Cassandra Commit Log

Cassandra Commit Log is an append only log that is used to track the mutations made to Column Families and provide durability for those mutations. Every mutation is appended to a Commit Log first before applied to an in memory write-back cache called Memtable. If a Memtable is not flushed to disk to create a SSTable then the mutations stored in it can be lost if Cassandra suddenly shutdowns or crashes. At startup, Cassandra replays the Commit Log to recover the data that was not stored to disk. 

A mutation can be insert, update or delete operation on a partition. A mutation can involve single or multiple columns. Each mutation is associated with a partition key and timestamp. If multiple mutations occur on same partition key, Cassandra will maintain the latest one. Memtable also combines different mutations occurring on the same partition and group them as a single Row mutation.

Commit Log provides a faster way to persist the changes applied to Memtables before they flushed to disk. SSTables are stored in separate files for each Column Family. Cassandra can receive mutations to multiple Column families at the same time. If these mutations need to be persisted then Cassandra needs to issue multiple disk writes one for each mutation applied to a Column Family. This will cause more disk seeks and random I/O to disk which will impact performance. Commit Log provides a way to solve this performance issue. Commit log provides a way to convert this random I/O to sequential I/O by writing mutations of different Column Families in to the same file. Also it provides batching mutation so in a single disk write, multiple mutations are synced to disk to reduce disk I/O and make the changes durable. It also helps to buffer mutations of each Column Family in to a separate Memtable and flush them to disk using sequential I/O.

Commit Log can not grow forever. It needs to be truncated often to reduce its size so that Cassandra spends less time in replaying the unsaved changes next time it starts from a sudden shutdown. Cassandra provides few configuration settings to tune the Commit Log size. Also mutations in Commit Log are cleaned up when a Memtable is flushed to SSTable. For each Memtable flush, Cassandra maintains the latest offset of the Commit Log called Replay position. When Memtable flush completes successfully, all the mutations for that particular Column Family which are stored before the Replay position are discarded from Commit Log. 

Commit Log Configuration

The setting commitlog_segment_size_in_mb controls the max size of an individual Commit log segment file. A new Commit Log segment is created when the current active segment size reaches this threshold. It defaults to 32 MB.

Commit Log disk sync frequency can be controlled using the configuration setting commitlog_sync.  If it is set to batch mode, then Cassandra will not acknowledge the writes until Commit Log is synced to disk. Instead of executing fsync for every write, it provides another setting commitlog_sync_batch_window_in_ms which indicates number of milliseconds between successive fsyncs. It is set to 2 milliseconds by default. Other alternative option is setting commitlog_sync to periodic. In this case Cassandra will immediately acknowledge writes and the Commit Log will be synced periodically for every commitlog_sync_period_in_ms milliseconds. The default value for commitlog_sync is periodic and the sync period is set to 10 milliseconds. Commit Log protects the data stored locally. The the interval between fsyncs will have direct impact on the amount of unsaved data when Cassandra is suddenly down. Having replication factor greater than one will protect the data if one of the replica goes down.

The total disk space used for Commit Log files is controlled using commitlog_total_space_in_mb setting. When the disk space reached this threshold, Cassandra will flush all the Column families that have mutations in the oldest Commit Log segment and removes the oldest segment. For this purpose a list of dirty Column families are managed in each Commit Log segment.  The threshold defaults to maximum of 8 GB or 1/4th size of total space of Commit Log volume.

There can be multiple segments storing the Commit Log at any time. All Commit Log Segments are managed in a queue and the current active segment is at the tail of the queue. A Commit Log segment can be recycled when all the mutations in it are flushed successfully.

Commit Log Segment

It represents a single Commit Log file on the disk and stores mutations of different Column Families. The file name format is CommitLog-<Version>-<Msec Timestamp>.log (For ex: CommitLog-6-1621835590940.log).  It maintains a map to manage the dirty Column Families and their last mutation positions in the file. Commit Log segment is memory mapped and the operations are applied to memory mapped buffer and synced to disk periodically.


 

The contents of Commit Log segment are memory mapped to a buffer. Mutations are applied to this buffer and on sync they are persisted to disk. At the beginning of Commit Log a file header is written which includes version, the Commit Log ID and parameters associated with Compressed or Encrypted Commit Logs. Also CRC checksum is created with header fields and is written at the end of the header.

Mutations are allocated on active segments. If there is not enough space to write it in the current active segment then a new segment is created. Commit Log Segments are managed as a queue and the active segment is at the tail of the queue

Commit Log remembers the mutations which are synced to disk and which are not synced using two pointers. Last synced Offset represents the position in the Commit Log where all the mutations which were written before it has been synced to disk. Last Marker Offset points to the position of last mutation which has not yet synced to the disk. All the mutations between these two pointers are not synced to the disk.

 A Commit Log divided in to sections separated with sync markers. These sync markers are chained where the previous marker points to next sync marker. Every time a sync is performed on a Commit Log a new marker is created.  The sync marker consists of two integers. The first integer stores a Commit Log file pointer where the next sync marker going to be written. It indicates the section length. Second integer stores the CRC of the Commit Log ID and the the file position where the sync marker is written.

 


Cassandra Memtable


Memtables store the current mutations applied to Column families. They function as a write back cache and provide faster write performance and faster read performance for recently written data. Mutations are organized in sorted order using skip list data structure in the Memtable. Each Column family is associated with its own Memtable. In this blog, on-heap based Memtable type is described.
 
Memtable Size

There are two types of settings to limit the Memtable size depending on whether Memtable is stored in heap space or off heap space. Both are specified in units of MB.  By default these setting are commented in cassandra.yaml. 

# memtable_heap_space_in_mb: 2048
# memtable_offheap_space_in_mb: 2048

If Memtable size is not configured in then Cassandra assigns 1/4th of max heap size allocated to Cassandra process. For ex: if max size is 64GB then 16GB is set as Memtable size.

Memtable Allocation Types

The allocation type in cassandra.yaml specifies how to store Memtable data. The supported allocation types are

unslabbed_heap_buffers
heap_buffers
offheap_buffers
offheap_objects

By default Cassandra is configured to store Memtable data in heap space. It uses a Slab allocator to reduce the heap fragmentation.

memtable_allocation_type: heap_buffers

 

Above figure, shows the association between different components of Memtable. Memtables are created during Cassandra startup. Based on the configured Memtable storage type in cassandra.yaml, a pool is allocated either on heap or on both on heap and off heap. All the Memtables use the same Memtable Pool which controls overall Memtable storage usage. Each Memtable will have its own Allocator which will serves the allocation requests for it.  Allocator objects will have a reference to the parent Memtable Pool for the configured limits.

Memtable contains a reference to its Column Family, the Memtable pool, the positions in to the Commit Log segments where its data is persisted, and the Mutations. The Commit Log Lower Bound is set when Memtable is created and it points to the position of the active commit log segment in which this Memtable mutations are stored. Commit Log Upper Bound is set to the last mutation position in the Commit Log. When a Memtable is flushed then these two bounds are used to discard the Commit Log segments which store the mutations in this Memtable. The bounds can refer to either the same Commit Log or different Commit Log segments. A single Commit Log can contain mutations from multiple Memtables and also a single Memtable can persist mutations in to multiple Commit Log segments. Please review Cassanadr Commit Log for additional details.

The Min timestamp field stores the smallest timestamp of all partitions stored in this Memtable. Live Data Size stores the size of all mutations applied to this Memtable. Partitions contain the actual Column Family mutations applied in this Memtable.  The Current Operations metric tracks the number of columns updated in the stored mutations.

 

Slab Allocator

Slab allocator tries to combine smaller memory allocations less than 128KB in to a bigger region of size 1MB to avoid heap fragmentation. Allocations bigger than 128KB are directly allocated from JVM's heap. The basic idea is to reclaim more space of older generation of JVM heap when the allocations in a slab region have similar lifetime. For example, if a column of integer type is updated, it needs heap space of 4 bytes. It will be allocated in the current slab region and the next free offset pointer is bumped by 4 bytes and the allocation count is incremented. If an application is allocating more than 128KB size then those allocations will spill all over heap and cause heap fragmentation and eventually JVM needs to de-fragment the heap space.

Each Memtable will have its own Slab allocator. Multiple Memtable are managed using a single Slab Pool which will have the configured Memtable size settings and it ensures the overall data stored in all Memtables falls below the threshold.


 


Partitions

Partitions in a Memtable are managed using a Skip List data structure. Each partition stores a mutations applied to a particular row. A mutation can be insert, update or delete operation applied to a row and include zero or more columns. 

cqlsh:flowerskeyspace> describe iris;

CREATE TABLE flowerskeyspace.iris (
    id int PRIMARY KEY,
    class text,
    petallength float,
    petalwidth float,
    sepallength float,
    sepalwidth float
)

cqlsh:flowerskeyspace> update iris set sepalwidth=6.4 where id=4;

cqlsh:flowerskeyspace> update iris set sepalwidth=6.5 where id=5

cqlsh:flowerskeyspace> update iris set sepalwidth=3.3 where id=3;


In the above example, there are three updates to three different rows.
These are stored as three partitions in the Memtable

partitions size = 3

[0] "DecoratedKey(-7509452495886106294, 00000005)" -> "[flowerskeyspace.iris]
key=5 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=6.5 ts=1622222969042456]"

[1] "DecoratedKey(-2729420104000364805, 00000004)" -> "[flowerskeyspace.iris]
key=4 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=6.5 ts=1622222995765139]"

[2] "DecoratedKey(9010454139840013625, 00000003)" -> "[flowerskeyspace.iris]
key=3 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=3.3 ts=1622218525529221]"

If we update another column with Row with id = 3, the results will be
merged with the previous mutation

cqlsh:flowerskeyspace> update iris set petalwidth=1.5 where id=3;

The partition with index 2 will be updated

[2] "DecoratedKey(9010454139840013625, 00000003)" -> "[flowerskeyspace.iris]
key=3 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [petalwidth sepalwidth]]
Row[info=[ts=-9223372036854775808] ]: | [petalwidth=1.5 ts=1622232283101650],
[sepalwidth=3.3 ts=1622218525529221]" 

 

 

Memtable Flush

Each Column family will have its own Memtable. All the live mutations to that Column family will be appended to commit log and also applied to Memtable. During the read, data from the Memtables and SSTables and the results are merged. When a Memtable heap threshold exceeds or expired or commitlog threshold reaches, it will be flushed to disk to create a SSTable. 

The expiry is controlled using a flush period setting which is set as a schema property (memtable_flush_period_in_ms) on Column family definition. By default flush period is set to zero. 

Memtable heap space exceeding condition is determined by configured Memtable heap size limit and internally calculated Memtable cleanup threshold. The cleanup threshold is calculated using memtable_flush_writers which defaults to 2 for single Cassandra data directory. So the memtable_cleanup_threshold is 0.33.

memtable_cleanup_threshold = 1 / (memtable_flush_writers + 1)

If configured heap space limit is 2GB then the Memtable size threshold is 2GB * 0.33333334 which is around 683 MB. When the size of all non flushed Memtables added together exceeds 683 MB, then a Memtable with largest live data size is flushed.

When total space used by commit logs exceeds the configured commitlog_total_space_in_mb, then Cassandra selects the Memtables that are associated to oldest segments and flushes them. If the oldest commit log segment contains date for different Column families then Memtables of those Column families will be flushed. By default commitlog_total_space_in_mb is set minimum value of either 8MB or 1/4th of total space of commitlog directory.

There are other factors such as repair, nodetool drain or flush can cause Memtables to be flushed to disk. A Column family can have more than one Memtable when a flush is pending on its  previous Memtable.

Cassandra Memtable Metrics

Cassandra provides various metrics for Memtable as part of Column family metrics.

MetricDescription
 memtableColumnsCount      
Number of columns present in the Memtable
 memtableOnHeapSizeThe amount of data stored in the Memtable which is allocated in heap memory. This also includes overhead associated with Columns
 memtableOffHeapSizeThe amount of data stored in off heap space by this Memtable. This also includes overhead associated with Columns
 memtableLiveDataSizeThe amount of live data size
 AllMemtableOnHeapSizeThe amount of heap memory used by all the Memtables
 AllMemtableOffHeapSizeThe amount of data stored in off heap space by all Memtables
 AllMemtableLiveDataSize   
The amount of live data stored in all Memtables
 memtableSwitchCountNumber of times flush has resulted in the Memtable being switched out

 Commit Log provides durability to the mutations in a Memtable. Get more details on Commit Log at

Commit Log

Cassandra how to repair a row if regular repair fails

Recently we had a strange issue with Cassandra repair. A table is replicated across three Datacenters and some of rows were missing on all the nodes in one of the newly added Datacenter. We have hundreds of production Cassandra clusters of various sizes, having few nodes to 10s of nodes in each cluster. Most of the cluster have multiple Datacenters and use replication across them. We didn't had this kind of issue so far. This article describes various way we tried to fix the problem.  These steps will be useful to recover the data in Cassandra 2.2.*.

First thing we tried listing rows by running cqlsh on each node and executed select query to list the rows in this particular table. All the nodes except the nodes in recently added DC, lets call it DC3 where all the rows were missing in the result. This is an obvious problem when the replication settings for the Keyspace where this table resides do not have the DC3 in it.

Keyspace: Test: Replication Strategy: org.apache.cassandra.locator.NetworkTopologyStrategy Durable Writes: true Options: [DC1:3, DC2:3]

We manually added the DC3 to the Keyspace replication strategy and ran Cassandra nodetool repair on DC3 nodes for this Keyspace and Table. After running repair, we verified the table rows on DC3 nodes but still the rows were missing.

Keyspace: Test: Replication Strategy: org.apache.cassandra.locator.NetworkTopologyStrategy Durable Writes: true Options: [DC1:3, DC2:3, DC3:3]

Second thing we tried was running Cassandra nodetool repair with different DC options and running it on different Datacenter nodes but still the rows were missing. We tried full repair and entire keyspace repair but still it was unable to repair the rows.

Third thing we tried is removing all SSTables for this problematic Table and ran repair. The repair restored the SSTables but the rows were still missing

Fourth thing we tried is resetting the SSTable repairedAt status to unrepaired for all SSTables using sstablerepairedset tool and ran the repair again. Still the rows were missing.

Finally we used Cassandra read repair to fix the missing rows problems. We ran cqlsh on the DC3 nodes, set the consistency level to quorum and read each missing row one by one by executing select and this fixed the issue and was able to repair the data.

# cat commands 
CONSISTENCY
CONSISTENCY QUORUM
CONSISTENCY
select * from "Test"."SampleTable" WHERE key = 60b8b2e1-5866-4a4a-b6f8-3e521c44f43c;
 

Hope this helps if some one stuck with similar issue.


Cassandra 3.x SSTable Storage Format

Cassandra SSTable storage format is changed in 3.0 to support higher level CQL structure directly at Storage engine level. Older format of SSTable was designed to support a very simple model of storing basic key/value pairs which was adequate to support Thrift API. In 2.* and earlier versions of Cassandra, Thrift was predominantly used to define the database schema in the older versions of 2.*. But thrift's column family definition is not sufficient to create tables to store structured application data. CQL was later introduced in Cassandra 2.*, to enable applications to define structured data using SQL like semantics and query the data using primary key. But the simple SSTable storage format available in 2.* was not suitable to support the relational characteristics of the row keys and columns. To query CQL data using primary key, redundant information is required to be stored along with CQL columns in SSTable to associate them with the primary key. This caused lot of duplicate data in the SSTables and inefficiencies in querying the data.

In Cassandra 3.*, the SSTable format is redesigned to support CQL data types and the relational mappings, naturally at Storage engine level to reduce the SSTable size and to support executing CQL queries efficiently. Basically the new storage engine recognizes rows as first class citizens whereas in the old format, the rows are represented as a sequence of cells with no row semantics in it. The row is the primary building block for the new storage engine.


In the new format, SSTable stores a set of Partitions and each partition stores a sequence of rows or range tombstone markers. The partitions are ordered by the partition keys and the rows or markers in a partition are indexed by their clustering keys.  The Flags field in the partition determines the type of row following it. The flags are stored as one byte and if extended flags is set in the first byte then another byte is added to indicate the extended flags.

Rest of the article uses an example CQL table with partitioning key and clustering keyto describe the details.

CREATE TABLE flowerskeyspace.irisplot (
    petallength float,
    sepallength float,
    id int,
    color text,
    PRIMARY KEY (petallength, sepallength, id)
)


insert into irisplot(petallength, sepallength, id, color) values (6,6.3,5,'blue');
insert into irisplot(petallength, sepallength, id, color) values (5.1,5.8,6,'blue');
insert into irisplot(petallength, sepallength, id, color) values (1.4,5.1,1,'red');
insert into irisplot(petallength, sepallength, id, color) values (1.4,4.9,2,'red');
insert into irisplot(petallength, sepallength, id, color) values (4.5,6.4,4,'green');
insert into irisplot(petallength, sepallength, id, color) values (4.7,7,3,'green');




In the above irisplot table the primary key is composed of three CQL columns (petallength, sepallength, id). The first part of the primary key petallength is automatically chosen as Partitioning key. The partitioning key controls which node is responsible to store each CQL row. Rest of the CQL columns in the primary key are called Clustering key (sepallength, id) which is used to select a specific row columns. 


Our example table irisplot has single secondary CQL column "color" and it is prefixes with the clustering key value. If there are multiple secondary CQL columns defined then the data representing the clustering key will be repeated for each one of the secondary CQL columns in Cassandra 2.*.

JSON dump of SSTable partition in Cassandra 2.*:
{"key": "1.4",
 "cells": [["4.9:2:","",1582054358578646],
           ["4.9:2:color","red",1582054358578646],
           ["5.1:1:","",1582054337118746],
           ["5.1:1:color","red",1582054337118746]]}

JSON dump of SSTable in Cassandra 3.*:[
  {
    "partition" : {
      "key" : [ "4.7" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 18,
        "clustering" : [ 7.0, 3 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:59.655787Z" },
        "cells" : [
          { "name" : "color", "value" : "green" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "1.4" ],
      "position" : 41
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 59,
        "clustering" : [ 4.9, 2 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.948204Z" },
        "cells" : [
          { "name" : "color", "value" : "red" }
        ]
      },
      {
        "type" : "row",
        "position" : 79,
        "clustering" : [ 5.1, 1 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.946200Z" },
        "cells" : [
          { "name" : "color", "value" : "red" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "5.1" ],
      "position" : 100
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 118,
        "clustering" : [ 5.8, 6 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.943769Z" },
        "cells" : [
          { "name" : "color", "value" : "blue" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "6.0" ],
      "position" : 140
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 158,
        "clustering" : [ 6.3, 5 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.921656Z" },
        "cells" : [
          { "name" : "color", "value" : "blue" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "4.5" ],
      "position" : 178
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 196,
        "clustering" : [ 6.4, 4 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.950560Z" },
        "cells" : [
          { "name" : "color", "value" : "green" }
        ]
      }
    ]
  }

In the new storage engine, still the partitioning key is used to distribute the data to nodes in the cluster but the clustering key is used to index a row in a partition.
For example in the above table partition with key [ "1.4" ] contains two rows with different clustering keys [ 4.9, 2 ] and [ 5.1, 1 ].
{
    "partition" : {
      "key" : [ "1.4" ],
      "position" : 41
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 59,
        "clustering" : [ 4.9, 2 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.948204Z" },
        "cells" : [
          { "name" : "color", "value" : "red" }
        ]
      },
      {
        "type" : "row",
        "position" : 79,
        "clustering" : [ 5.1, 1 ],
        "liveness_info" : { "tstamp" : "2020-03-03T22:25:57.946200Z" },
        "cells" : [
          { "name" : "color", "value" : "red" }
        ]
      }
    ]
  }

The SSTable size in Cassandra 3.* is smaller compared to Cassandra 2.*

$ ls -al cassandra-3.11/data/data/flowerskeyspace/irisplot-ecde46205d9d11eaa60a47b84e452526/md-1-big-Data.db
-rw-r--r--  1 bharat  staff  161 Mar  3 14:26 cassandra-3.11/data/data/flowerskeyspace/irisplot-ecde46205d9d11eaa60a47b84e452526/md-1-big-Data.db

$ ls -al cassandra-2.2/data/data/flowerskeyspace/irisplot-dbf35720528411eabbb8b16d9d604ffd/lb-1-big-Data.db
-rw-r--r--  1 bharat  staff  286 Feb 18 11:34 cassandra-2.2/data/data/flowerskeyspace/irisplot-dbf35720528411eabbb8b16d9d604ffd/lb-1-big-Data.db

SSTable hex dump

Single Partition Details



The storage format also includes the details of CQL table schema in Statistics.db file
$ ./tools/bin/sstablemetadata data/data/flowerskeyspace/irisplot-ecde46205d9d11eaa60a47b84e452526/md-1-big-Data.db

KeyType: org.apache.cassandra.db.marshal.FloatType
ClusteringTypes: [org.apache.cassandra.db.marshal.FloatType, org.apache.cassandra.db.marshal.Int32Type]
StaticColumns: {}
RegularColumns: {color:org.apache.cassandra.db.marshal.UTF8Type}

Varint Encoding


For encoding data compactly, the new storage format uses varint type to store integers. it is similar concept as in Google's Protocol Buffer encoding but the Cassandra implementation is different.
 In varint encoding format, integers are stored using variable number of bytes instead of using four bytes for 32 bit integer or eight bytes for 64 bit integer. The first byte's most significant bit will indicate whether an integer is stored in a single byte or in multiple bytes. If first byte's most significant bit is set then the integer is stored in extra bytes otherwise its value is stored in the same byte. This encoding scheme can take one to nine bytes to represent integers. The number of leading bits set in the first byte from the most significant bit will indicate the number of bytes needed to store an integer.  A zero bit separates the length indicated bytes from the bits that store actual value of integer. Python code to decode the varint data is available at unpack_vint()

In the above example Partition, the row timestamp is encoded as varint. The timestamp is a 64 bit integer but it s compactly stored in three bytes using varint encoding.

 Delta Encoding


Also you can notice that the timestamp is not an absolute value. The row timestamp is stored as delta value from the encoding minimum timestamp in Statistics table.

$ tools/bin/sstablemetadata data/data/flowerskeyspace/irisplot-ecde46205d9d11eaa60a47b84e452526/md-1-big-Statistics.db
EncodingStats minTimestamp: 1583274357921656

To get the absolute value of Row timestamp we need to ad the integer value in the varint encoding ('110100111010111110011') to the encoding minimum timestamp 1583274357921656

We can use Python to convert it back to absolute value

int('110100111010111110011', 2) =  1734131
Row timestamp = 1583274357921656 + 1734131 = 1583274359655787

$ python -c "import datetime,pytz; print datetime.datetime.fromtimestamp(1583274359655787/1000000.0, pytz.utc)"
2020-03-03 22:25:59.655787+00:00

We can validate using the timestamp from JSON dump of SSTable
 "type" : "row",
 "position" : 18,
 "clustering" : [ 7.0, 3 ],
 "liveness_info" : { "tstamp" : "2020-03-03T22:25:59.655787Z" },
 "cells" : [ { "name" : "color", "value" : "green" ]

Columns


There are two types of columns Simple and Complex. Simple column is encoded as a single Cell and Complex column is encoded as a sequence of Cells preceded by deletion time and number of columns in it.



For further details on storage engine changes please review the references.


References

putting-some-structure-storage-engine

Storange engine change Patch description

Cassandra CQL Storage Format



In Cassandra 2.* version, a CQL table is stored using the same storage format that is used for storing thrift based column family. Cassandra stores extra information as part of column names to detect CQL clustering keys and other CQL columns.

CQL Row format includes its Partitioning key, Clustering key followed by a sequence of CQL columns. Each CQL column is prefixed with clustering key value. A clustering key can be defined as a combination of multiple CQL columns.

A new term is introduced to represent CQL columns called Cell. A Cell consists of cell name and cell value. A Cell name is a composite type which is a sequence of individual components.  A component is encoded as three parts. First part is the value length, second part is the value and the last part is a byte representing the end of component. The end of component byte is set to 0 always for CQL columns. Each CQL column which is part of clustering key will be stored as a separate component in the cell in the order that is defined in schema.


Example1: CQL Table schema with only partitioning key and no clustering key

CREATE KEYSPACE flowerskeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy' , 'replication_factor' : 1 };
  CREATE TABLE flowerskeyspace.iris (
    id int PRIMARY KEY,
    class text,
    petallength float,
    petalwidth float,
    sepallength float,
    sepalwidth float
)



CQL Table Rows:

insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (2,4.9,3.0,1.4,0.2,'Iris-setosa');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (1,5.1,3.5,1.4,0.2,'Iris-setosa');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (3,7.0,3.2,4.7,1.4,'Iris-versicolor');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (4,6.4,3.2,4.5,1.5,'Iris-versicolor');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (5,6.3,3.3,6.0,2.5,'Iris-virginica');
insert into iris(id, sepallength, sepalwidth, petallength, petalwidth, class) values (6,5.8,2.7,5.1,1.9,'Iris-virginica');




CQL data in Json format:

{"key": "5",
 "cells": [["","",1581757206044154],
           ["class","Iris-virginica",1581757206044154],
           ["petallength","6.0",1581757206044154],
           ["petalwidth","2.5",1581757206044154],
           ["sepallength","6.3",1581757206044154],
           ["sepalwidth","3.3",1581757206044154]]},


Uncompressed SSTable Data:



CQL Table Row Storage Format:





Example2: CQL Table schema with partitioning key and clustering key

CREATE TABLE flowerskeyspace.irisplot (
    petallength float,
    sepallength float,
    id int,
    color text,
    PRIMARY KEY (petallength, sepallength, id)
)


insert into irisplot(petallength, sepallength, id, color) values (6,6.3,5,'blue');
insert into irisplot(petallength, sepallength, id, color) values (5.1,5.8,6,'blue');
insert into irisplot(petallength, sepallength, id, color) values (1.4,5.1,1,'red');
insert into irisplot(petallength, sepallength, id, color) values (1.4,4.9,2,'red');
insert into irisplot(petallength, sepallength, id, color) values (4.5,6.4,4,'green');
insert into irisplot(petallength, sepallength, id, color) values (4.7,7,3,'green');





JSON data:

[
{"key": "4.7",
 "cells": [["7.0:3:","",1582054414657067],
           ["7.0:3:color","green",1582054414657067]]},
{"key": "1.4",
 "cells": [["4.9:2:","",1582054358578646],
           ["4.9:2:color","red",1582054358578646],
           ["5.1:1:","",1582054337118746],
           ["5.1:1:color","red",1582054337118746]]},
{"key": "5.1",
 "cells": [["5.8:6:","",1582054298177996],
           ["5.8:6:color","blue",1582054298177996]]},
{"key": "6.0",
 "cells": [["6.3:5:","",1582054268167535],
           ["6.3:5:color","blue",1582054268167535]]},
{"key": "4.5",
 "cells": [["6.4:4:","",1582054399453891],
           ["6.4:4:color","green",1582054399453891]]}
]

Uncompressed SSTable data:


For every CQL Row an empty component is added as a marker to allow inserting rows with NULL secondary values. For the empty component, two bytes are used to store the length of the name which is set to 0 and followed by a one byte end of component (EOC) field set to 0. This empty component marker (00 00 00) will be at the beginning of row if there is no clustering key is defined or at the end of clustering key if there is one. For example in the irisplot table we add a row with only clustering key


JSON data:

[
{"key": "4.0",
 "cells": [["7.0:3:","",1582057689702366]]}
]

SSTable uncompressed data:

0000000 00 04 40 80 00 00 7f ff ff ff 80 00 00 00 00 00
0000010 00 00 00 11 00 04 40 e0 00 00 00 00 04 00 00 00
0000020 03 00 00 00 00 00 00 05 9e df 82 9b df de 00 00
0000030 00 00 00 00



Row Tombstone

When a row is deleted, only the row key with the deletion info which consists of 8 byte markedForDeleteAt timestamp and 4 byte localDeletionTime are stored.


JSON data:
[
{"key": "6.0",
 "metadata": {"deletionInfo": {"markedForDeleteAt":1582065526802267,"localDeletionTime":1582065526}},
 "cells": []}
]

Uncompressed SSTable Data:

0000000 00 04 40 c0 00 00 5e 4c 67 76 00 05 9e e1 55 bc
0000010 87 5b 00 00                       

1582065526802267 = 0x00059ee155bc875b
1582065526 = 0x 5e4c6776

Partitioning key and Clustering key

Basically in Cassandra 2.*, the CQL data model was fitted to the existing thrift data storage engine which is designed for storing Thrift column families. This caused redundant storage of clustering key data for each secondary columns. For ex: in the above irisplot table the primary key is composed of three CQL columns (petallength, sepallength, id). The first part of the primary key petallength is automatically chosen as Partitioning key. The partitioning key controls which node is responsible to store each CQL row. Rest of the CQL columns in the primary key are called Clustering key (sepallength, id)

In the above two sample rows both have same petallength of 1.4 cms. These two rows are stored in the same partition but have different clustering keys. The first row's clustering key is "4.9:2:" and the second row's clustering key is "5.1:1:"


{"key": "1.4",
 "cells": [["4.9:2:","",1582054358578646],
           ["4.9:2:color","red",1582054358578646],
           ["5.1:1:","",1582054337118746],
           ["5.1:1:color","red",1582054337118746]]}

 Our example table irisplot has single secondary CQL column "color" and it is prefixes with the clustering key value. If there are multiple secondary CQL columns defined then the data representing the clustering key will be repeated for each one of the secondary CQL columns. In Cassandra 3.* the storage format is changed to have the CQL semantics at the storage level to store CQL data efficiently and also to simplify the CQL queries.


To understand the internal storage format please use the python script Cassandra tools



Cassandra Tombstones

In an eventual consistent system like Cassandra, information about deleted keys should be stored to avoid reading the deleted data. When a row or column is deleted, this information is stored as tombstone.  Tombstones are stored until gc grace period associated with column family not reached. Only major compaction removes tombstones that are older than gc grace period. Tombstones are spread to all replicas when repair is performed. It is important to run repair regularly to eliminate resurrecting deleted data.

A Row tombstone is created when a row is deleted, a Column tombstone is created when a specified column is deleted from a row and a Range tombstone is created when user deletes a super column. Tombstone includes application's deletion timestamp (MarkedForDeleteAt) and local deletion timestamp (LocalDeletionTime). Range tombstone also includes start and end column names.

MarkedForDeleteAt

It is a timestamp in milliseconds specified by Application in delete request. For a live column or row it is stored as max long value 0x8000000000000000.

LocalDeletionTime

It is a timestamp in seconds set by Cassandra server when it receives a request to delete a column or super column. The value is set as current time in seconds. For a live column or row it is stored as max integer value 0x7FFFFFFF..

Tombstone thresholds

tombstone_warn_threshold (Default value 1000) is used to log a warning message when  scanned tombstone count is greater than this limit but the query execution will continue and tombstone_failure_threshold (Default value 100000) is used to abort slice query if scanned tombstone count exceeds this limit. An error message (TombstoneOverwhelmingException) is also logged. These settings can be changed in cassandra.yaml

Tombstones can impact performance of slice queries especially for wider rows. When Cassandra executes a column slice query it needs to read columns from all the SSTables that include the given row and filter out tombstones. And all these tombstones need to be kept in memory till the row fragments from all SSTables are merged which increase heap space usage.

 Estimated droppable tombstones

sstablemetadata tool can be used to estimate the amount of tombsones in a given SSTable. The value is the ratio of tombsones to estimated column count in a SSTable. Tombsones are cleared during compaction based on the gc_grace period. The tool by default uses the current time as the gc time to decide whether a tombstone can be dropped or not. To find out the tombstones are created before a given time, use  command line argument --gc_grace_seconds to pass GC time in seconds

Usage: sstablemetadata [--gc_grace_seconds n] <sstable filenames>

$ ./tools/bin/sstablemetadata data/data/ycsb/usertable-8dd787404e2c11e8b1a22f4e9082bb4e/mc-1-big-Data.db | grep "Estimated droppable tombstones"
Estimated droppable tombstones: 0.0

Paxos Consensus Algorithm

Paxos consensus algorithm is used to achieve consensus across a set of unreliable processes which are running independently.  Consensus is required to agree on a data value proposed by one of the processes. Reaching consensus becomes difficult when processes fail or messages can be lost or delayed. An example of this kind of environment is asynchronous distributed system.

Paxos consensus algorithm is used in some of the popular highly scalable systems such as Google Spanner and Amazon Dynamo DB. Most of the modern distributed systems use commodity hardware and they fall in asynchronous distributed system category.  They use multiple machines and in some cases hosted in multiple data centers to provide high availability and scalability. Machine outages, disk failures and network partitions are common in these kind of environments. Consensus is required for the common functions such as designating one of the process as a leader to perform some computation, ordering input events so that same output is calculated by multiple processes, acquiring a lock/lease on a shared resource, replicating some state across multiples machines or to provide distributed transactions.

An example use case of Paxos algorithm is to replicate the state of a key-value store to multiple instances to make it fault tolerant and to improve read latency. In a key-value store, the data is managed as a sorted map of key-value records. Each record is associated with a unique key and it is used to access its associated value. A reliable technique to replicate data is using state machine replication approach.

In state machine replication, a set of server processes called replicas, each execute a finite state machine. Each replica starts in the same initial state. The input to the state machine is client requests. The state machine processes the input request and transitions to an output state and produces output which is sent back to client. State machine maps (input, state) pair to (output, state) pair.

Client can send requests to any replica. Each replica based on its current state executes client requests and produces results.  If each replica started with same initial state and receives inputs in same order, all replicas will transition to the same state and the data is consistent.

In our example input is key-value store commands such as put or delete. Commands are first appended to a replication log and data snapshots are created from it. The replication log at each replica needs to contain the input commands in the same order. In other words all replicas need to agree on the order of inputs in replication log. This can be accomplished using Paxos consensus algorithm.

A variant of Paxos algorithm called Multi-Paxos is used to sequence the commands in the replication log which uses instance numbers.One of the replica is disignated as a leader which proposes values. Client commands are forwarded to leader replica which assigns them sequence numbers in the replication log file.

Replication log contains a sequence of log records. Each log record is uniquely identified by a sequence number and contains an input command to data store. For example a log record of insert operation contains key and value being inserted. Replication log file is stored on disk. Only read and append operations supported on the log file. Database snapshots are created by replaying the records in replication log.


A simple way to understand Shamir's secret sharing scheme

Shamir's secret sharing method provides a technique to secure data by decomposing the data in to multiple blocks and store them on different  machines. It transforms data in to n blocks such that even access to k -1 blocks of data doesn't reveal any information about the original data. This technique is called (k, n) threshold scheme which can be used to secure data without using encryption keys.

It is based on polynomial interpolation on finite field. Data is set as the constant term (Coefficient a0)in a polynomial of degree k - 1. A polynomial is constructed by choosing random k -1 Coefficients (a1, a2..ak-1) from a field and the data is converted to a number (D)and is set as Coefficient a0.

f(x) = a0 + a1x + ... + a^(k-1)x

The field is generated by choosing a prime number greater than D and n. This polynomial is evaluated at n points where D1 = f(1), D2= f(2),..., Dn = f(n) and these n data blocks are stored. So the original data D is not stored but it is calculated from k blocks.

The way I understood this technique is by geometry perspective, using a line. For example if we have given a point (x1, y1) in 2 dimensional space, there can be infinite number of lines pass through it. To calculate the y-intercept we need at least two points to calculate the slope of the line to find it. Without 2 points there are infinite possible y-intercepts.



A line is represented by f(x) = a1 * x + a0 a first degree polynomial. The term y-intercept is calculated by setting x to 0. If we choose (2, n) threshold scheme to secure data, n blocks of data (D1,D2 to Dn) is calculated by evaluating f(x) at x = 1 to n. We need at least two blocks and their indexes to calculate the slope. The index value gives the x coordinate value and the data block value is the y value. With two points (x1, y1) and (x2, y2), the slope can be calculated as

m = (y2 - y1)  / (x2 - x1)



The slope is the coefficient a1. Once we calculate a1 now we can calculate a0 by calculating y1 -  a1x1 or y2 - a1x2. In the above graph, the slope of the line is 0.5 and a0 is 1.