Cassandra Heap Issues

 Cassandra Out Of Memory (OOM) exceptions can cause performance impact and reduce availability.  The OOM exception is reported when Cassandra is unable to allocate heap space. Some times the application workload alone can cause Cassandra to quickly use all the available heap space and throw OOM exceptions. Common cause for OOM is application workload combined with background jobs such as repair or Compactions can increase heap usage and eventually lead to OOM exceptions. Some of the components and background jobs which uses high heap space are

Memtables

Cassandra by default allocates 1/4th of max heap to Memtables. It can be configured cassandra.yaml to reduce the heap usage by Memtable allocations.   

# memtable_heap_space_in_mb: 2048
# memtable_offheap_space_in_mb: 2048

When Cassandra is under severe heap pressure it will start flushing aggressively which will result in smaller SSTables and the creates high background compaction activity.

Reads

CQL queries that read many columns or rows per query can consume lot of heap space. Especially when the rows are wider (contains huge number of columns per row) or when rows contains tombstones. When rows are replicated to multiple nodes, the coordinator node needs to hold the data longer to meet the consistency constraints. When rows are spread across multiple SSTables, the results need to be merges from multiple SSTables and this increases heap space usage. Cassandra need to filter out the tombstones to read the live columns from a row. The number of tombstones scanned per query also affects the heap usage. Cassandra nodetool cfstats command provides some metrics on these factors. Following sample cfstat output collected from a live cluster shows the application is using wider rows with huge number of columns and there exist huge number of tombstones per row

                Average live cells per slice (last five minutes): 8.566489361702128
                Maximum live cells per slice (last five minutes): 1109
                Average tombstones per slice (last five minutes): 1381.0478723404256
                Maximum tombstones per slice (last five minutes): 24601

Cassandra nodetool cfhistograms command also provides some metrics on the SSTables. It displays statistics on row size, column count and average number of SSTables scanned per read.

Percentile  SSTables     Write Latency    Read Latency    Partition Size        Cell Count
                                      (micros)             (micros)             (bytes)
50%             1.00           51.01                 182.79               1331                      4
75%             1.00           61.21                 219.34               2299                      7
95%             3.00           73.46                 2816.16             42510                    179
98%             3.00           88.15                 4055.27             182785                  770
99%             3.00           105.78               4866.32             454826                  1916
Min              0.00           6.87                   9.89                  104                         0
Max             3.00           379.02               74975.55          44285675122         129557750

Concurrent Read threads

Cassandra uses by default 32 concurrent threads in read stage. These threads serve read queries. If there are more read requests are received then they go in to pending queue. Cassandra nodetool tpstats command ca be used to check the active and pending read stage tasks. If each read query consuming high amount of heap space then reducing this setting can help heap usage.

Repair 

Cassandra repair creates Merkle trees to detect inconsistencies row data among multiple replicas. By default Cassandra creates Merkle tree till depth 15. Each Merkle tree contains 32768 internal nodes and consumes lot of heap space. Repair running concurrently with application load can cause high heap usage.

Compaction

Background compactions can also cause high heap usage. By default compaction_throughput_mb_per_sec is set to 16 MB.

Other factors

Other factors that can cause high heap usage are Cassandra Hints,  Row Cache, Tombstones and  space used for Index entries. 

Workarounds

If enough system memory available, increasing the max heap size can help. Some time min heap and max heap set to same value. reducing the min heap size can help reduce the heap usage. If application is allocating bigger chunks of memory, increasing G1GC region size can help in reducing heap fragmentation and improve heap utilization. Some times reducing concurrent read threads also help in reducing heap usage which can impact read performance.
 

Cassandra compaction

Cassandra compaction is a background process which merges multiple SSTables of a Column Family to one or more SSTables to improve the read performance and to reclaim the space occupied deleted data. Cassandra supports different Compaction strategies to accommodate different workloads. In this blog post, the size tiered strategy is described.

In size tiered compaction strategy, SSTables are grouped in to different buckets based on their size. SSTables with similar sizes are grouped in to same bucket. For each bucket an average size is calculated using the sizes of SSTables in it. There are two options for a bucket, bucketLow and bucketHigh which define the bucket width. If a SSTable size falls between (bucketLow * average bucket size) and  (bucketHigh * average bucket size) then it will be includes in this bucket. The average size of the bucket is updated by the newly added SSTable size.  The default value for bucketLow is 0.5 and for bucketHigh is 1.5.

Next, the buckets with SSTable count greater than or equal to the minimum compaction threshold are selected. The thresholds are defined as part of Compaction settings in Column Family schema. 

compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} 

Finally, Cassandra selects  the bucket that contains SSTables with highest read activity. For each bucket a read hotness score is assigned and the bucket with highest read hotness score is selected for compaction. Read Hotness score for a SSTable is the number of reads per second per key. A two hour read rate is used in the calculation. For a bucket, the read hotness score is the combined read hotness score of all SSTable in it. 

If two buckets have same read hotness score then the bucket with smaller SSTables is selected. If the selected bucket has SSTable count more than maximum compaction threshold then the coldest SSTables are dropped from the bucket to meet the max threshold. The SSTables in a bucket are sorted in descending order on their read hotness score and the coldest SSTables will be at the end of sorted order.

Before performing compaction, the available disk space is checked against the estimated compaction output size. If the available disk space is not enough then largest SSTables are dropped from the input list of SSTables to reduce the storage space needed. During compaction, the input SSTables are scanned and the same rows from multiple SSTables are merged. Compaction can cause heavy disk I/O. To limit the disk I/O, there is a configuration setting to throttle the compaction activity compaction_throughput_mb_per_sec. The setting concurrent_compactors limits the number of simultaneous compactions at a time.

How the actual compaction is performed

 A scanner object is created for each input SSTable which can sequentially iterate the partitions in it. The input SSTable scanners added to a min heap based Priority queue to merge the partitions. The partitions are consumed from the priority queue. The scanner at the root of the priority queue contains the least element in the sorted order of partitions. If multiple SSTables contain the same partition which matches with the root node of priority queue, matching partition from all those SSTables are consumes and the SSTables are advanced to point to the next partitions. Once a scanner is consumed from Priority queue, it is removed from the top and added back at the end and the data structure will re-heapify to bubble up the smallest element to the root. During compaction, deleted rows with delete timestamp older than the GC grace period (gc_before) are also deleted. 

Compaction Statistics

The running compaction progress can be monitored using nodetool compactionstats command. It will display the number of pending compactions, the keyspace name and the column family that is being compacted and the remaining time of compaction.

At the end of Compaction, Cassandra stores various statistics collected during compaction in to compaction_history table. The statistics can be viewed using nodetool compactionhistory command.

$ ./bin/nodetool compactionhistory

Compaction History:
id            keyspace_name   columnfamily_name compacted_at            bytes_in bytes_out rows_merged    
f0382bb0-c692-11eb-badd-7377db2be2d0 system  local 2021-06-05T23:46:36.266 10384 5117 {5:1}          
Stop Active Compaction

To stop an active compation use 

$ nodetool stop COMPACTION

Disable Auto Compactions

Cassandra nodetool provide a command to disable auto compactions. 

$ /opt/cassandra/bin/nodetool disableautocompaction

To check the compaction status

$ /opt/cassandra/bin/nodetool compactionstats

pending tasks: 0

Tombstone Compaction

Some times we want to run a single SSTable compaction to drop the tombstones which are older than gc_grace period. It can be done using the JMX interface. Cassandra JMX interface can be accessed using jmxterm jar. The allowed tombstone threshold can be set on Column family compaction properties using alter table command. This setting is allowed only for SizeTieredCompactionStrategy type.

$ java -jar jmxterm-1.0-alpha-4-uber.jar
$>open <Cassandra PID>
$>run -b org.apache.cassandra.db:type=CompactionManager forceUserDefinedCompaction <SSTable file path> 

We can chose the SSTable to perform tombstone compaction by checking the sstable metadata 

$>/opt/cassandra/tools/bin/sstablemetadata <SSTable path> | grep "Estimated droppable tombstones"
Estimated droppable tombstones: 0.1  

Major Compaction

By default auto compaction selects the input SSTables for compaction based on the threshold defined as part of Column family definition. To run compaction on all the SSTables for a given column family, nodetool compact command can be be used to force a major compaction. Major compaction can results in to two compaction runs one of repaired SSTables and other one is for not repaired SSTables. If tombstones and live columns spread across these two different sets of SSTables, major compaction may not be able to drop tombstones. The tool /opt/cassandra/tools/bin/sstablerepairedset can be used to reset the repairAt to 0 as to include all SSTables in single set.

Typical Problems

Compaction is causing too much high CPU usage. This could be due to many pending compactions or the throughput is low. Sometimes increasing the throughput limit (compaction_throughput_mb_per_sec) from default 16 MB to higher value can benefit. In other cases reducing the concurrent_compactors to 1 can help easing the load.



Cassandra Commit Log

Cassandra Commit Log is an append only log that is used to track the mutations made to Column Families and provide durability for those mutations. Every mutation is appended to a Commit Log first before applied to an in memory write-back cache called Memtable. If a Memtable is not flushed to disk to create a SSTable then the mutations stored in it can be lost if Cassandra suddenly shutdowns or crashes. At startup, Cassandra replays the Commit Log to recover the data that was not stored to disk. 

A mutation can be insert, update or delete operation on a partition. A mutation can involve single or multiple columns. Each mutation is associated with a partition key and timestamp. If multiple mutations occur on same partition key, Cassandra will maintain the latest one. Memtable also combines different mutations occurring on the same partition and group them as a single Row mutation.

Commit Log provides a faster way to persist the changes applied to Memtables before they flushed to disk. SSTables are stored in separate files for each Column Family. Cassandra can receive mutations to multiple Column families at the same time. If these mutations need to be persisted then Cassandra needs to issue multiple disk writes one for each mutation applied to a Column Family. This will cause more disk seeks and random I/O to disk which will impact performance. Commit Log provides a way to solve this performance issue. Commit log provides a way to convert this random I/O to sequential I/O by writing mutations of different Column Families in to the same file. Also it provides batching mutation so in a single disk write, multiple mutations are synced to disk to reduce disk I/O and make the changes durable. It also helps to buffer mutations of each Column Family in to a separate Memtable and flush them to disk using sequential I/O.

Commit Log can not grow forever. It needs to be truncated often to reduce its size so that Cassandra spends less time in replaying the unsaved changes next time it starts from a sudden shutdown. Cassandra provides few configuration settings to tune the Commit Log size. Also mutations in Commit Log are cleaned up when a Memtable is flushed to SSTable. For each Memtable flush, Cassandra maintains the latest offset of the Commit Log called Replay position. When Memtable flush completes successfully, all the mutations for that particular Column Family which are stored before the Replay position are discarded from Commit Log. 

Commit Log Configuration

The setting commitlog_segment_size_in_mb controls the max size of an individual Commit log segment file. A new Commit Log segment is created when the current active segment size reaches this threshold. It defaults to 32 MB.

Commit Log disk sync frequency can be controlled using the configuration setting commitlog_sync.  If it is set to batch mode, then Cassandra will not acknowledge the writes until Commit Log is synced to disk. Instead of executing fsync for every write, it provides another setting commitlog_sync_batch_window_in_ms which indicates number of milliseconds between successive fsyncs. It is set to 2 milliseconds by default. Other alternative option is setting commitlog_sync to periodic. In this case Cassandra will immediately acknowledge writes and the Commit Log will be synced periodically for every commitlog_sync_period_in_ms milliseconds. The default value for commitlog_sync is periodic and the sync period is set to 10 milliseconds. Commit Log protects the data stored locally. The the interval between fsyncs will have direct impact on the amount of unsaved data when Cassandra is suddenly down. Having replication factor greater than one will protect the data if one of the replica goes down.

The total disk space used for Commit Log files is controlled using commitlog_total_space_in_mb setting. When the disk space reached this threshold, Cassandra will flush all the Column families that have mutations in the oldest Commit Log segment and removes the oldest segment. For this purpose a list of dirty Column families are managed in each Commit Log segment.  The threshold defaults to maximum of 8 GB or 1/4th size of total space of Commit Log volume.

There can be multiple segments storing the Commit Log at any time. All Commit Log Segments are managed in a queue and the current active segment is at the tail of the queue. A Commit Log segment can be recycled when all the mutations in it are flushed successfully.

Commit Log Segment

It represents a single Commit Log file on the disk and stores mutations of different Column Families. The file name format is CommitLog-<Version>-<Msec Timestamp>.log (For ex: CommitLog-6-1621835590940.log).  It maintains a map to manage the dirty Column Families and their last mutation positions in the file. Commit Log segment is memory mapped and the operations are applied to memory mapped buffer and synced to disk periodically.


 

The contents of Commit Log segment are memory mapped to a buffer. Mutations are applied to this buffer and on sync they are persisted to disk. At the beginning of Commit Log a file header is written which includes version, the Commit Log ID and parameters associated with Compressed or Encrypted Commit Logs. Also CRC checksum is created with header fields and is written at the end of the header.

Mutations are allocated on active segments. If there is not enough space to write it in the current active segment then a new segment is created. Commit Log Segments are managed as a queue and the active segment is at the tail of the queue

Commit Log remembers the mutations which are synced to disk and which are not synced using two pointers. Last synced Offset represents the position in the Commit Log where all the mutations which were written before it has been synced to disk. Last Marker Offset points to the position of last mutation which has not yet synced to the disk. All the mutations between these two pointers are not synced to the disk.

 A Commit Log divided in to sections separated with sync markers. These sync markers are chained where the previous marker points to next sync marker. Every time a sync is performed on a Commit Log a new marker is created.  The sync marker consists of two integers. The first integer stores a Commit Log file pointer where the next sync marker going to be written. It indicates the section length. Second integer stores the CRC of the Commit Log ID and the the file position where the sync marker is written.

 


Cassandra Memtable


Memtables store the current mutations applied to Column families. They function as a write back cache and provide faster write performance and faster read performance for recently written data. Mutations are organized in sorted order using skip list data structure in the Memtable. Each Column family is associated with its own Memtable. In this blog, on-heap based Memtable type is described.
 
Memtable Size

There are two types of settings to limit the Memtable size depending on whether Memtable is stored in heap space or off heap space. Both are specified in units of MB.  By default these setting are commented in cassandra.yaml. 

# memtable_heap_space_in_mb: 2048
# memtable_offheap_space_in_mb: 2048

If Memtable size is not configured in then Cassandra assigns 1/4th of max heap size allocated to Cassandra process. For ex: if max size is 64GB then 16GB is set as Memtable size.

Memtable Allocation Types

The allocation type in cassandra.yaml specifies how to store Memtable data. The supported allocation types are

unslabbed_heap_buffers
heap_buffers
offheap_buffers
offheap_objects

By default Cassandra is configured to store Memtable data in heap space. It uses a Slab allocator to reduce the heap fragmentation.

memtable_allocation_type: heap_buffers

 

Above figure, shows the association between different components of Memtable. Memtables are created during Cassandra startup. Based on the configured Memtable storage type in cassandra.yaml, a pool is allocated either on heap or on both on heap and off heap. All the Memtables use the same Memtable Pool which controls overall Memtable storage usage. Each Memtable will have its own Allocator which will serves the allocation requests for it.  Allocator objects will have a reference to the parent Memtable Pool for the configured limits.

Memtable contains a reference to its Column Family, the Memtable pool, the positions in to the Commit Log segments where its data is persisted, and the Mutations. The Commit Log Lower Bound is set when Memtable is created and it points to the position of the active commit log segment in which this Memtable mutations are stored. Commit Log Upper Bound is set to the last mutation position in the Commit Log. When a Memtable is flushed then these two bounds are used to discard the Commit Log segments which store the mutations in this Memtable. The bounds can refer to either the same Commit Log or different Commit Log segments. A single Commit Log can contain mutations from multiple Memtables and also a single Memtable can persist mutations in to multiple Commit Log segments. Please review Cassanadr Commit Log for additional details.

The Min timestamp field stores the smallest timestamp of all partitions stored in this Memtable. Live Data Size stores the size of all mutations applied to this Memtable. Partitions contain the actual Column Family mutations applied in this Memtable.  The Current Operations metric tracks the number of columns updated in the stored mutations.

 

Slab Allocator

Slab allocator tries to combine smaller memory allocations less than 128KB in to a bigger region of size 1MB to avoid heap fragmentation. Allocations bigger than 128KB are directly allocated from JVM's heap. The basic idea is to reclaim more space of older generation of JVM heap when the allocations in a slab region have similar lifetime. For example, if a column of integer type is updated, it needs heap space of 4 bytes. It will be allocated in the current slab region and the next free offset pointer is bumped by 4 bytes and the allocation count is incremented. If an application is allocating more than 128KB size then those allocations will spill all over heap and cause heap fragmentation and eventually JVM needs to de-fragment the heap space.

Each Memtable will have its own Slab allocator. Multiple Memtable are managed using a single Slab Pool which will have the configured Memtable size settings and it ensures the overall data stored in all Memtables falls below the threshold.


 


Partitions

Partitions in a Memtable are managed using a Skip List data structure. Each partition stores a mutations applied to a particular row. A mutation can be insert, update or delete operation applied to a row and include zero or more columns. 

cqlsh:flowerskeyspace> describe iris;

CREATE TABLE flowerskeyspace.iris (
    id int PRIMARY KEY,
    class text,
    petallength float,
    petalwidth float,
    sepallength float,
    sepalwidth float
)

cqlsh:flowerskeyspace> update iris set sepalwidth=6.4 where id=4;

cqlsh:flowerskeyspace> update iris set sepalwidth=6.5 where id=5

cqlsh:flowerskeyspace> update iris set sepalwidth=3.3 where id=3;


In the above example, there are three updates to three different rows.
These are stored as three partitions in the Memtable

partitions size = 3

[0] "DecoratedKey(-7509452495886106294, 00000005)" -> "[flowerskeyspace.iris]
key=5 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=6.5 ts=1622222969042456]"

[1] "DecoratedKey(-2729420104000364805, 00000004)" -> "[flowerskeyspace.iris]
key=4 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=6.5 ts=1622222995765139]"

[2] "DecoratedKey(9010454139840013625, 00000003)" -> "[flowerskeyspace.iris]
key=3 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [sepalwidth]]
    Row[info=[ts=-9223372036854775808] ]:  | [sepalwidth=3.3 ts=1622218525529221]"

If we update another column with Row with id = 3, the results will be
merged with the previous mutation

cqlsh:flowerskeyspace> update iris set petalwidth=1.5 where id=3;

The partition with index 2 will be updated

[2] "DecoratedKey(9010454139840013625, 00000003)" -> "[flowerskeyspace.iris]
key=3 partition_deletion=deletedAt=-9223372036854775808, localDeletion=2147483647
columns=[[] | [petalwidth sepalwidth]]
Row[info=[ts=-9223372036854775808] ]: | [petalwidth=1.5 ts=1622232283101650],
[sepalwidth=3.3 ts=1622218525529221]" 

 

 

Memtable Flush

Each Column family will have its own Memtable. All the live mutations to that Column family will be appended to commit log and also applied to Memtable. During the read, data from the Memtables and SSTables and the results are merged. When a Memtable heap threshold exceeds or expired or commitlog threshold reaches, it will be flushed to disk to create a SSTable. 

The expiry is controlled using a flush period setting which is set as a schema property (memtable_flush_period_in_ms) on Column family definition. By default flush period is set to zero. 

Memtable heap space exceeding condition is determined by configured Memtable heap size limit and internally calculated Memtable cleanup threshold. The cleanup threshold is calculated using memtable_flush_writers which defaults to 2 for single Cassandra data directory. So the memtable_cleanup_threshold is 0.33.

memtable_cleanup_threshold = 1 / (memtable_flush_writers + 1)

If configured heap space limit is 2GB then the Memtable size threshold is 2GB * 0.33333334 which is around 683 MB. When the size of all non flushed Memtables added together exceeds 683 MB, then a Memtable with largest live data size is flushed.

When total space used by commit logs exceeds the configured commitlog_total_space_in_mb, then Cassandra selects the Memtables that are associated to oldest segments and flushes them. If the oldest commit log segment contains date for different Column families then Memtables of those Column families will be flushed. By default commitlog_total_space_in_mb is set minimum value of either 8MB or 1/4th of total space of commitlog directory.

There are other factors such as repair, nodetool drain or flush can cause Memtables to be flushed to disk. A Column family can have more than one Memtable when a flush is pending on its  previous Memtable.

Cassandra Memtable Metrics

Cassandra provides various metrics for Memtable as part of Column family metrics.

MetricDescription
 memtableColumnsCount      
Number of columns present in the Memtable
 memtableOnHeapSizeThe amount of data stored in the Memtable which is allocated in heap memory. This also includes overhead associated with Columns
 memtableOffHeapSizeThe amount of data stored in off heap space by this Memtable. This also includes overhead associated with Columns
 memtableLiveDataSizeThe amount of live data size
 AllMemtableOnHeapSizeThe amount of heap memory used by all the Memtables
 AllMemtableOffHeapSizeThe amount of data stored in off heap space by all Memtables
 AllMemtableLiveDataSize   
The amount of live data stored in all Memtables
 memtableSwitchCountNumber of times flush has resulted in the Memtable being switched out

 Commit Log provides durability to the mutations in a Memtable. Get more details on Commit Log at

Commit Log

Cassandra how to repair a row if regular repair fails

Recently we had a strange issue with Cassandra repair. A table is replicated across three Datacenters and some of rows were missing on all the nodes in one of the newly added Datacenter. We have hundreds of production Cassandra clusters of various sizes, having few nodes to 10s of nodes in each cluster. Most of the cluster have multiple Datacenters and use replication across them. We didn't had this kind of issue so far. This article describes various way we tried to fix the problem.  These steps will be useful to recover the data in Cassandra 2.2.*.

First thing we tried listing rows by running cqlsh on each node and executed select query to list the rows in this particular table. All the nodes except the nodes in recently added DC, lets call it DC3 where all the rows were missing in the result. This is an obvious problem when the replication settings for the Keyspace where this table resides do not have the DC3 in it.

Keyspace: Test: Replication Strategy: org.apache.cassandra.locator.NetworkTopologyStrategy Durable Writes: true Options: [DC1:3, DC2:3]

We manually added the DC3 to the Keyspace replication strategy and ran Cassandra nodetool repair on DC3 nodes for this Keyspace and Table. After running repair, we verified the table rows on DC3 nodes but still the rows were missing.

Keyspace: Test: Replication Strategy: org.apache.cassandra.locator.NetworkTopologyStrategy Durable Writes: true Options: [DC1:3, DC2:3, DC3:3]

Second thing we tried was running Cassandra nodetool repair with different DC options and running it on different Datacenter nodes but still the rows were missing. We tried full repair and entire keyspace repair but still it was unable to repair the rows.

Third thing we tried is removing all SSTables for this problematic Table and ran repair. The repair restored the SSTables but the rows were still missing

Fourth thing we tried is resetting the SSTable repairedAt status to unrepaired for all SSTables using sstablerepairedset tool and ran the repair again. Still the rows were missing.

Finally we used Cassandra read repair to fix the missing rows problems. We ran cqlsh on the DC3 nodes, set the consistency level to quorum and read each missing row one by one by executing select and this fixed the issue and was able to repair the data.

# cat commands 
CONSISTENCY
CONSISTENCY QUORUM
CONSISTENCY
select * from "Test"."SampleTable" WHERE key = 60b8b2e1-5866-4a4a-b6f8-3e521c44f43c;
 

Hope this helps if some one stuck with similar issue.