Showing posts with label Gossip Protocol. Show all posts
Showing posts with label Gossip Protocol. Show all posts

Saturday, March 3, 2012

Cassandra Architecture

Book Review and Excerpt
Cassandra: The Definitive Guide by Eben Hewitt
Chapter 5

 

My previous post on Cassandra discussed a sample application.
Let's turn now to some architectural details! I bet you are eager to find some inside tricks (just like I am eager to discover them).


System Keyspace
The system keyspace stores metadata for a local node like token, cluster name, schema definitions, etc. But you cannot modify the system keyspace.

Peer-to-Peer
Cassandra has a peer-to-peer distribution model, such that any given node is structurally identical to any other node—that is, there is no “master” node that acts differently than a “slave” node. The aim of Cassandra’s design is overall system availability and ease of scaling.

Note: In a master/slave setup, any changes are written to the master and then passed on to slaves. This model is optimized for reading data, as it allows data to be read from any slave.
But in a Cassandra setup, the model is optimized for writing data:
• It updates only a configurable subset of nodes (the replicas only) and
• Replicas synchronization is done in background unless we configure it to be done on read operations.
So remember: The ability to handle application workloads that require high performance at significant write volumes with many concurrent client threads is one of the primary features of Cassandra.

Gossip and Failure Detection
To support decentralization and partition tolerance, Cassandra uses a gossip protocol for intra-ring communication so that each node can have state information about other nodes. The gossiper runs every second on a timer. 

Gossip protocols (sometimes called “epidemic protocols”) generally assume a faulty network, are commonly employed in very large, decentralized network systems, and are often used as an automatic mechanism for replication in distributed databases.

Because Cassandra gossip is used for failure detection, the Gossiper class maintains a list of nodes that are alive and dead. Here is how the gossiper works:
Periodically (according to the settings in its TimerTask), the G=gossiper will choose a random node in the ring and initialize a gossip session with it.
When the gossiper determines that another endpoint is dead, it “convicts” that endpoint by marking it as dead in its local list and logging that fact.

Anti-Entropy (or Replica Synchronization) and Read Repair
Where you find gossip protocols, you will often find their counterpart, anti-entropy, which is also based on an epidemic theory of computing. Anti-entropy is the replica synchronization mechanism in Cassandra for ensuring that data on different nodes is updated to the newest version.
Here’s how it works. During a major compaction, the server initiates a TreeRequest/TreeReponse conversation to exchange Merkle trees (or hash trees) with neighboring nodes. The Merkle tree is a hash representing the data in that column family. If the trees from the different nodes don’t match, they have to be reconciled (or “repaired”) in order to determine the latest data values they should all be set to.
After each update, the anti-entropy algorithm kicks in. 
To keep the operation fast, nodes internally maintain an inverted index keyed by timestamp and only exchange the most recent updates.

In Cassandra, you have multiple nodes that make up your cluster, and one or more of the nodes act as replicas for a given piece of data. To read data, a client connects to any node in the cluster and, based on the consistency level specified by the client, a number of nodes are read. The read operation blocks until the client-specified consistency level is met. If it is detected that some of the nodes responded with an out-of-date value, Cassandra will return the most recent value to the client. After returning, Cassandra will perform what’s called a read repair in the background. This operation brings the replicas with stale values up to date. This design acts as a performance improvement because the client does not block until all nodes are read, but the read repair stage manages the task of keeping the data fresh in the background. If you have lots of clients, it’s important to read from a quorum of nodes in order to ensure that at least one will have the most recent value.
If the client specifies a weak consistency level (such as ONE), then the read repair is performed in the background after returning to the client. If you are using one of the two stronger consistency levels (QUORUM or ALL), then the read repair happens before data is returned to the client.

Memtables, SSTables, and Commit Logs
When you perform a write operation, it’s immediately written to the commit log. The commit log is a crash-recovery mechanism that supports Cassandra’s durability goals.
A write will not count as successful until it’s written to the commit log, to ensure that if a write operation does not make it to the in-memory store (the memtable, discussed in a moment), it will still be possible to recover the data. 
After it’s written to the commit log, the value is written to a memory-resident data structure called the memtable. When the number of objects stored in the memtable reaches a threshold, the contents of the memtable are flushed to disk in a file called an SSTable. A new memtable is then created. This flushing is a nonblocking operation; multiple memtables may exist for a single column family, one current and the rest waiting to be flushed. They typically should not have to wait very long, as the node should flush them very quickly unless it is overloaded.
The SSTable is a concept borrowed from Google’s Bigtable. Once a memtable is flushed to disk as an SSTable, it is  immutable and cannot be changed by the application. Despite the fact that SSTables are compacted, this compaction changes only their on-disk representation; it essentially performs the “merge” step of a mergesort into new files
and removes the old files on success.
Each SSTable also has an associated Bloom filter, which is used as an additional performance enhancer (we'll come back to this topic in a few moments).
All writes are sequential, which is the primary reason that writes perform so well in Cassandra. No reads or seeks of any kind are required for writing a value to Cassandra because all writes are append operations. 
Compaction is intended to amortize the reorganization of data, but it uses sequential IO to do so. So the performance benefit is gained by splitting; the write operation is just an immediate append, and then compaction helps to organize for better future read performance. If Cassandra naively inserted values where they ultimately belonged, writing clients would pay for seeks up front.
On reads, Cassandra will check the memtable first to find the value.

Hinted Handoff
Consider the following scenario. A write request is sent to Cassandra, but the node where the write properly belongs is not available due to network partition, hardware failure, or some other reason. In order to ensure general availability of the ring in such a situation, Cassandra implements a feature called hinted handoff. You might think of a hint as a little post-it note that contains the information from the write request. If the node where the write belongs has failed, the Cassandra node that receives the write will create a hint, which is a small reminder that says, “I have the write information that is intended for node B. I’m going to hang onto this write, and I’ll notice when node B comes back online; when it does, I’ll send it the write request.” That is, node A will “hand off” to node B the “hint” regarding the write.
This allows Cassandra to be always available for writes, and reduces the time that a failed node will be inconsistent after it does come back online. 
We discussed consistency levels previously, and you may recall that consistency level ANY, which was added in 0.6, means that a hinted handoff alone will count as sufficient toward the success of a write operation. That is, even if only a hint was able to be recorded, the write still counts as successful.

Compaction
A compaction operation in Cassandra is performed in order to merge SSTables. During
compaction, the data in SSTables is merged: the keys are merged, columns are combined,
tombstones are discarded, and a new index is created.
Compaction is the process of freeing up space by merging large accumulated datafiles.
This is roughly analogous to rebuilding a table in the relational world. But the primary difference in Cassandra is that it is intended as a transparent operation that is amortized across the life of the server.
On compaction, the merged data is sorted, a new index is created over the sorted data,
and the freshly merged, sorted, and indexed data is written to a single new SSTable
(each SSTable consists of three files: Data, Index, and Filter). 
Another important function of compaction is to improve performance by reducing the  number of required seeks. 

You can increase overall performance by reducing the priority of compaction threads.
To do so, use the following flag:  -Dcassandra.compaction.priority=1
This will affect CPU usage, not IO.

Bloom Filters
Bloom filters are used as a performance booster. They are very fast, nondeterministic algorithms for testing whether an element is a member of a set. They are nondeterministic because it is possible to get a false-positive read from a Bloom filter, but not a false-negative. 
The filters are stored in memory and are used to improve performance by reducing disk access on key lookups. Disk access is typically much slower than memory access. So, in a way, a Bloom filter is a special kind of cache.
When a query is performed, the Bloom filter is checked first before accessing disk.
Because false-negatives are not possible, if the filter indicates that the element does not exist in the set, it certainly doesn’t; but if the filter thinks that the element is in the set, the disk is accessed to make sure.

Tombstones
When you execute a delete operation, the data is not immediately deleted. Instead, it’s treated as an update operation that places a tombstone on the value. A tombstone is a deletion marker that is required to suppress older data in SSTables until compaction can run.
There’s a related setting called Garbage Collection Grace Seconds. This is the amount of time that the server will wait to garbage-collect a tombstone. By default, it’s set to 864,000 seconds, the equivalent of 10 days. Cassandra keeps track of tombstone age, and once a tombstone is older than GCGraceSeconds, it will be garbage-collected. The purpose of this delay is to give a node that is unavailable time to recover; if a node is down longer than this value, then it is treated as failed and replaced.
As of 0.7, this setting is configurable per column family (it used to be for the whole keyspace).

Staged Event-Driven Architecture (SEDA)
Cassandra implements a Staged Event-Driven Architecture (SEDA): a general architecture for highly concurrent Internet services.
In a typical application, a single unit of work is often performed within the confines of a single thread. A write operation, for example, will start and end within the same thread. Cassandra, however, is different: its concurrency model is based on SEDA, so a single  operation may start with one thread, which then hands off the work to another thread, which may hand it off to other threads. But it’s not up to the current thread to hand off the work to another thread. Instead, work is subdivided into what are called stages, and the thread pool (really, a java.util.concurrent.ExecutorService) associated with the stage  determines execution. A stage is a basic unit of work, and a single operation may internally state-transition from one stage to the next. Because each stage can be handled by a different thread pool, Cassandra experiences a massive performance improvement. This SEDA design also means that Cassandra is better able to manage its own resources internally because different operations might require disk IO, or they might be CPU-bound, or they might be network operations, and so on, so the pools can manage their work according to the  availability of these resources.
The following operations are represented as stages in Cassandra: Read, Mutation, Gossip, Response, Anti-Entropy, Load Balance, Migration, Streaming.
A few additional operations are also implemented as stages. There are stages for units
of work performed on memtables and the ConsistencyManager is a stage in the StorageService.
SEDA is a powerful architecture. Because it is event-driven, as its name states, work
can be performed with exceptional concurrency and minimal coupling.

In this chapter, we examined the main pillars of Cassandra’s construction, including peer-to-peer design and its corresponding gossip protocol, anti-entropy, accrual failure detection, and how the use of a Staged Event-Driven Architecture maximizes performance.
 
If this post has open your apatite for the subject, you can find much more information in the book itself: Cassandra: The Definitive Guide

The excerpt is from the book, 'Cassandra: The Definitive Guide', authored by Eben Hewit, published November 2010 by O’Reilly Media, Copyright 2011 Eben Hewitt.