Monday, March 19, 2012

Integrating Cassandra with Hadoop


Book Review and Excerpt
Cassandra: The Definitive Guide by Eben Hewitt
Chapter 12


In this chapter, we explore how Cassandra and Hadoop fit together.

Hadoop is a set of open source projects that deal with large amounts of data in a distributed way.


Note: The ''word count'' example given in this section is also found in the Cassandra source download in its contrib module. It can be compiled and run using instructions found there.



Cassandra has a Java source package for Hadoop integration code, called org.apache.cassandra.hadoop

Running the Word Count Example

It takes a body of text and counts the occurrences of each distinct word. Here we provide some code to perform a word count over data contained in Cassandra.


The Mapper class
IColumn column = columns.get(columnName.getBytes());
String value = new String(column.value());
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens()) {
 word.set(itr.nextToken()); // Text word
 context.write(word, one); // IntWritable one
}

The Reducer Class
int sum = 0;
for (IntWritable val : values) {
 sum += val.get();
}
result.set(sum);
context.write(key, result);

Our MapReduce Program
Job job = new Job(getConf(), “wordcount”);
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);

FileOutputFormat.setOutputPath(job, new Path(“/tmp/word_count”));

ConfigHelper.setThriftContact(job.getConfiguration(), “localhost”, 9160);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), “Keyspace1”, “Standard1”);
SlicePredicate predicate = new    SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);
return 0;

Outputting Data to Cassandra
For updates on the built-in output format, see http://wiki.apache.org/cassandra/HadoopSupport

It is possible, however, to write directly to Cassandra via Thrift (or a higher-level client) in the Reducer step. In the previous example, this means that instead of writing to the context, one could write its key and value to Cassandra directly.

Tools Above MapReduce
MapReduce is a great abstraction for developers so that they can worry less about the details of distributed computing and more about the problems they are trying to solve. Over time, an even more abstracted toolset has emerged. Pig and Hive operate at a level above MapReduce and allow developers to perform more complex analytics more easily. Both of these frameworks can operate against data in Cassandra.

Pig
Pig is a platform for data analytics developed at Yahoo!.
Included in the platform is a high-level language called Pig Latin and a compiler that translates programs written in Pig Latin into sequences of MapReduce jobs.

To write our word count example using Pig Latin:
LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage() \
as (key:chararray, cols:bag{col:tuple(name:bytearray, value:bytearray)});
cols = FOREACH rows GENERATE flatten(cols) as (name, value);
words = FOREACH cols GENERATE flatten(TOKENIZE((chararray) value)) as word;
grouped = GROUP words BY word;
counts = FOREACH grouped GENERATE group, COUNT(words) as count;
ordered = ORDER counts BY count DESC;
topten = LIMIT ordered 10;
dump topten;

Line 1 gets all the data in the Standard1 column family, describing that data with aliases and data types. We extract the name/value pairs in each of the rows. In line 3, we have to cast the value to a character array in order to tokenize it with the built-in TOKENIZE function. We next group by and count each word instance. Finally, we order our data by count and output the top 10 words found.

Pig provides an abstraction that makes our code significantly more concise. Pig also allows programmers to express operations such as joins much more simply than by using MapReduce alone.

Hive
Like Pig, Hive is a platform for data analytics. Instead of a scripting language, queries are written in a query language similar to the familiar SQL called Hive-QL. Hive was developed by Facebook to allow large data sets to be abstracted into a common structure.

Cluster Configuration: Cassandra & Hadoop
Because Hadoop has some unfamiliar terminology, here are some useful definitions:
HDFS
Hadoop distributed filesystem.
Namenode
The master node for HDFS. It has locations of data blocks stored in several datanodes and often runs on the same server as the jobtracker in smaller clusters.
Datanode
Nodes for storing data blocks for HDFS. Datanodes run on the same servers as tasktrackers.
Jobtracker
The master process for scheduling MapReduce jobs. The jobtracker accepts new jobs, breaks them into map and reduce tasks, and assigns those tasks to tasktrackers in the cluster. It is responsible for job completion. It often runs on the same server as the namenode in smaller clusters.
Tasktracker
The process responsible for running map or reduce tasks from the jobtracker. Tasktrackers run on the same servers as datanodes.


Like Cassandra, Hadoop is a distributed system. The MapReduce jobtracker spreads tasks across the cluster, preferably near the data it needs. When a jobtracker initiates tasks, it looks to HDFS to provide it with information about where that data is stored. Similarly, Cassandra’s built-in Hadoop integration provides the jobtracker with data locality information so that tasks can be close to the data. In order to achieve this data locality, Cassandra nodes must also be part of a Hadoop cluster. The namenode and jobtracker can reside on a server outside your Cassandra cluster. Cassandra nodes will need to be part of the cluster by running a tasktracker process on each node. Then, when a MapReduce job is initiated, the jobtracker can query Cassandra for locality of the data when it splits up the map and reduce tasks.


A four-node Cassandra cluster with tasktracker processes running on each Cassandra node is shown in Figure 12-1. At least one node in the cluster needs to be running the datanode process. There is a light dependency on HDFS for small amounts of data (the distributed cache), and a single datanode should suffice. External to the cluster is the server running the Hadoop namenode and jobtracker.






When a job is initiated from a client machine, it goes to the jobtracker. The jobtracker receives information about the data source from when the job is submitted, via the configuration options mentioned earlier. At that point, it can use Cassandra’s Column FamilyRecordReader and ColumnFamilySplit to get the physical locations of different segments of data in the cluster. Then, it can use that location to split up the tasks among the nodes in the cluster, preferring to run tasks on nodes where the associated data resides.


Finally, when creating jobs for MapReduce to execute (either directly or via Pig), the Hadoop configuration needs to point to the namenode/jobtracker (in the Hadoop configuration files) and the Cassandra configuration options. The cluster will be able to handle the integration from there.


Note: As far as configuration, Keith Thornhill from Raptr.com has a separate namenode/jobtracker and installed the datanode/tasktracker on each of his Cassandra nodes. He notes that a nice side effect of this is that the analytics engine scales with the data.



In this chapter, we examined Hadoop, the open source implementation of the Google MapReduce algorithm. We took a brief look at Hadoop basics and how to integrate it with Cassandra, and
also saw how to use Pig, a simple and concise language to express MapReduce jobs.


If this post has open your apatite for the subject, you can find much more information in the book itself: Cassandra: The Definitive Guide

The excerpt is from the book, 'Cassandra: The Definitive Guide', authored by Eben Hewit, published November 2010 by O’Reilly Media, Copyright 2011 Eben Hewitt. 

Performance Tuning in Cassandra

Book Review and Excerpt
Cassandra: The Definitive Guide by Eben Hewitt
Chapter 11



New goodies for today! Performance is always a challenging subject and with Cassandra we target performance in the context of hundreds of TeraBytes :) 
Let's go!

As a general rule, it’s important to note that simply adding nodes to a cluster will not improve performance on its own. You need to replicate the data appropriately, then send traffic to all the nodes from your clients. If you aren’t distributing client requests, the new nodes could just stand by somewhat idle.

Data StorageThere are two sets of files that Cassandra writes to as part of handling update operations:
the commit log and the datafile. Their different purposes need to be considered in order to understand how to treat them during configuration.
The commit log can be thought of as short-term storage. As Cassandra receives updates,
every write value is written immediately to the commit log in the form of raw sequential file appends. If you shut down the database or it crashes unexpectedly, the commit log can ensure that data is not lost. That’s because the next time you start the node, the commit log gets replayed. In fact, that’s the only time the commit log is read; clients never read from it. But the normal write operation to the commit log blocks, so it would damage performance to require clients to wait for the write to finish.

The datafile represents the Sorted String Tables (SSTables). Unlike the commit log, data
is written to this file asynchronously. The SSTables are periodically merged during major compactions to free up space. To do this, Cassandra will merge keys, combine columns, and delete tombstones.
Read operations can refer to the in-memory cache and in this case don’t need to go directly to the datafiles on disk. If you can allow Cassandra a few gigabytes of memory, you can improve performance dramatically when the row cache and the key cache are hit.
The commit logs are periodically removed, following a successful flush of all their appended
data to the dedicated datafiles. For this reason, the commit logs will not grow to anywhere near the size of the datafiles, so the disks don’t need to be as large; this is something to consider during hardware selection.
If Cassandra runs a flush, you’ll see something in the server logs like this:
INFO 18:26:11,732 Discarding obsolete commit log: CommitLogSegment(/var/lib/cassandra/commitlog/CommitLog-1278894011530.log)
Then, if you check the commit log directory, that file has been deleted.
By default, the commit log and the datafile are stored in the following locations:
<CommitLogDirectory>/var/lib/cassandra/commitlog</CommitLogDirectory>
<DataFileDirectories>
 <DataFileDirectory>/var/lib/cassandra/data</DataFileDirectory>
</DataFileDirectories>
You can change these values to store the datafiles or commit log in different locations.
You can specify multiple datafile directories if you wish.

Note: You don’t need to update these values for Windows, even if you leave them in the default location, because Windows will automatically adjust the path separator and place them under C:\. Of course, in a real environment, it’s a good idea to specify them separately, as indicated.

It’s recommended that you store the datafiles and the commit logs on separate hard disks for maximum performance. Cassandra, like many databases, is particularly dependent on
the speed of the hard disk and the speed of the CPUs (it’s best to have four or eight cores, to take advantage of Cassandra’s highly concurrent construction). So make sure for QA and production environments to get the fastest disks you can, and get at least two separate ones so that the commit logfiles and the datafiles are not competing for
I/O time. It’s more important to have several processing cores than one or two very fast ones.

Reply Timeout
The reply timeout is a setting that indicates how long Cassandra will wait for other nodes to respond before deciding that the request is a failure. This is a common setting in relational databases and messaging systems. This value is set by the RpcTimeoutInMillis element (rpc_timeout_in_ms in YAML). By default, this is 5,000, or five seconds.
Commit Logs
You can set the value for how large the commit log is allowed to grow before it stops appending new writes to a file and creates a new one. This is similar to setting log rotation on Log4J.
This value is set with the CommitLogRotationThresholdInMB element (commitlog_rotation_threshold_in_mb in YAML). By default, the value is 128MB.
Another setting related to commit logs is the sync operation, represented by the commitlog_sync element. There are two possible settings for this: periodic and batch. periodic is the default, and it means that the server will make writes durable only at specified intervals. When the server is set to make writes durable periodically, you can potentially lose the data that has not yet been synced to disk from the write-behind cache.
In order to guarantee durability for your Cassandra cluster, you may want to examine this setting.
If your commit log is set to batch, it will block until the write is synced to disk (Cassandra will not acknowledge write operations until the commit log has been completely synced to disk). This clearly will have a negative impact on performance.
You can change the value of the configuration attribute from periodic to batch to specify that Cassandra must flush to disk before it acknowledges a write. Changing this value will require taking some performance metrics, as there is a necessary trade-off here: forcing Cassandra to write more immediately constrains its freedom to manage its own resources. If you do set commitlog_sync to batch, you need to provide a suitable value for CommitLogSyncBatchWindowInMS, where MS is the number of milliseconds between each sync effort. Moreover, this is not generally needed in a multinode cluster when using write replication, because replication by definition means that the write isn’t acknowledged until another node has it.
If you decide to use batch mode, you will probably want to split the commit log onto a separate disk from the SSTables (data) to mitigate the performance impact.

Memtables
Each column family has a single memtable associated with it. There are a few settings around the treatment of memtables. The size that the memtable can grow to before it is flushed to disk as an SSTable is specified with the MemtableSizeInMB element (binary_memtable_throughput_in_mb in YAML). Note that this value is based on the size of the memtable itself in memory, and not heap usage, which will be larger because of the overhead associated with column indexing.
You’ll want to balance this setting with MemtableObjectCountInMillions (memtable_throughput_in_mb in YAML), which sets a threshold for the number of column values that will be stored in a memtable before it is flushed. The default value is 0.3, which is approximately 333,000 columns. 

You can also configure how long to keep memtables in memory after they’ve been flushed to disk. This value can be set with the memtable_flush_after_mins element.
When the flush is performed, it will write to a flush buffer, and you can configure the size of that buffer with flush_data_buffer_size_in_mb.
Another element related to tuning the memtables is memtable_flush_writers. This setting, which is 1 by default, indicates the number of threads used to write out the memtables when it becomes necessary. If you have a very large heap, it can improve performance to set this count higher, as these threads are blocked during disk I/O.
Concurrency
Cassandra differs from many data stores in that it offers much faster write performance than read performance. There are two settings related to how many threads can perform read and write operations: concurrent_reads and concurrent_writes. In general, the defaults provided by Cassandra out of the box are very good. But you might want to update the concurrent_reads setting immediately before you start your server. That’s because the concurrent_reads setting is optimal at two threads per processor core. By default, this setting is 8, assuming a four-core box. If that’s what you have, you’re in business. If you have an eight-core box, tune it up to 16.
The concurrent_writes setting behaves somewhat differently. This should match the number of clients that will write concurrently to the server. If Cassandra is backing a web application server, you can tune this setting from its default of 32 to match the number of threads the application server has available to connect to Cassandra. It is common in Java application servers such as WebLogic to prefer database connection pools no larger than 20 or 30, but if you’re using several application servers in a cluster, you’ll need to factor that in as well.

Caching
There are several settings related to caching, both within Cassandra and at the operating system level. Caches can use considerable memory, and it’s a good idea to tune them carefully once you understand your usage patterns.
There are two primary caches built into Cassandra: a row cache and a key cache. The row cache caches complete rows (all of their columns), so it is a superset of the key cache. If you are using a row cache for a given column family, you will not need to use a key cache on it as well.
Your caching strategy should therefore be tuned in accordance with a few factors:
• Consider your queries, and use the cache type that best fits your queries.
• Consider the ratio of your heap size to your cache size, and do not allow the cache to overwhelm your heap.
• Consider the size of your rows against the size of your keys. Typically keys will be much smaller than entire rows.
The keys_cached setting indicates the number of key locations—not key values—that will be saved in memory. This can be specified as a fractional value (a number between 0 and 1) or as an integer. If you use a fraction, you’re indicating a percentage of keys to cache, and an integer value indicates an absolute number of keys whose locations will be cached.
Note: The keys_cached setting is a per-column family setting, so different column families can have different numbers of key locations cached if some are used more frequently than others.
This setting will consume considerable memory, but can be a good trade-off if your locations are not hot already.

You can also populate the row cache when the server starts up. To do this, use the preload_row_cache element. The default setting for this is false, but you will want to set it to true to improve performance. The cost is that bootstrapping can take longer if there is considerable data in the column family to preload.
The rows_cached setting specifies the number of rows that will be cached. By default,
this value is set to 0, meaning that no rows will be cached, so it’s a good idea to turn this on. If you use a fraction, you’re indicating a percentage of everything to cache, and an integer value indicates an absolute number of rows whose locations will be cached.
You’ll want to use this setting carefully, however, as this can easily get out of hand. If your column family gets far more reads than writes, then setting this number very high will needlessly consume considerable server resources. If your column family has a lower ratio of reads to writes, but has rows with lots of data in them (hundreds of columns), then you’ll need to do some math before setting this number very high. And unless you have certain rows that get hit a lot and others that get hit very little, you’re not going to see much of a boost here.

Buffer Sizes
The buffer sizes represent the memory allocation when performing certain operations: flush_data_buffer_size_in_mb
By default, this is set to 32 megabytes and indicates the size of the buffer to use when memtables get flushed to disk.
flush_index_buffer_size_in_mb
By default, this is set to 8 megabytes. If each key defines only a few columns, then it’s a good idea to increase the index buffer size. Alternatively, if your rows have many columns, then you’ll want to decrease the size of the buffer.
sliced_buffer_size_in_kb
Depending on how variable your queries are, this setting is unlikely to be very useful. It allows you to specify the size, in kilobytes, of the buffer to use when executing slices of adjacent columns. If there is a certain slice query that you perform far more than others, or if your data is laid out with a relatively consistent number of columns per family, then this setting could be moderately helpful on read operations. But note that this setting is defined globally.

The Python Stress Test
Cassandra ships with a popular utility called py_stress that you can use to run a stress
test on your Cassandra cluster. To run py_stress, navigate to the <cassandra-home>/
contrib directory. You might want to check out the README.txt file, as it will have the
list of dependencies required to run the tool.
Running the Python Stress Test
Navigate to the <cassandra-home>/contrib/py_stress directory. In a terminal, type stress.py to run the test against local host. Or indicate which node you want to connect to:
$ stress.py -d 192.168.1.5
Note: You can execute stress.py -h to get usage on the stress test script. 
The test will run until it inserts one million values, and then stop. I ran this test on a single regular workstation with an Intel I7 processor (which is similar to eight cores) with 4GB of RAM available and a lot of other processes running. Here was my output:
eben@morpheus$ ./stress.py -d 192.168.1.5 -o insert
total,interval_op_rate,avg_latency,elapsed_time
196499,19649,0.0024959407711,10
370589,17409,0.00282591440216,20
510076,13948,0.00295883878841,30
640813,13073,0.00438663874102,40
798070,15725,0.00312562838215,50
950489,15241,0.0029109908417,60
1000000,4951,0.00444872583334,70
Let’s unpack this a bit. What we’ve done is generated and inserted one million values into a completely untuned single node Cassandra server in about 70 seconds. You can see that in the first 10 seconds we inserted 196,499 randomly generated values. The average latency per operation is 0.0025 seconds, or 2.5 milliseconds. But this is using the defaults, and is on a Cassandra server that already has 1GB of data in the database to manage before running the test. Let’s give the test more threads to see if we can squeeze a little better performance out of it:
eben@morpheus$ ./stress.py -d 192.168.1.5 -o insert -t 10
total,interval_op_rate,avg_latency,elapsed_time
219217,21921,0.000410911544945,10
427199,20798,0.000430060066223,20
629062,20186,0.000443717396772,30
832964,20390,0.000437958271074,40
1000000,16703,0.000463042383339,50
What we’ve done here is used the -t flag to use 10 threads at once. This means that in 50 seconds, we wrote 1,000,000 records—about 2 milliseconds latency per write, with a totally untuned database that is already managing 1.5GB of data.
You should run the test several times to get an idea of the right number of threads given
your hardware setup. Depending on the number of cores on your system, you’re going
to see worse—not better—performance if you set the number of threads arbitrarily
high, because the processor will devote more time to managing the threads than doing
your work. You want this to be a rough match between the number of threads and the
number of cores available to get a reasonable test.
Now that we have all of this data in the database, let’s use the test to read some values
too:
$ ./stress.py -d 192.168.1.5 -o read
total,interval_op_rate,avg_latency,elapsed_time
103960,10396,0.00478858081549,10
225999,12203,0.00406984714627,20
355129,12913,0.00384438665076,30
485728,13059,0.00379976526221,40
617036,13130,0.00378045491559,50
749154,13211,0.00375620621777,60
880605,13145,0.00377542658007,70
1000000,11939,0.00374060139004,80
As you can see, Cassandra doesn’t read nearly as fast as it writes; it takes about 80 seconds to read one million values. Remember, though, that this is out of the box, untuned, single-threaded, on a regular workstation running other programs, and the database is 2GB in size. Regardless, this is a great tool to help you do performance tuning for your environment and to get a set of numbers that indicates what to expect in your cluster.

Startup and JVM Settings
Cassandra allows you to configure a variety of options for how the server should start up, how much Java memory should be allocated, and so forth. In this section we look at how to tune the startup.
If you’re using Windows, the startup script is called cassandra.bat, and on Linux it’s cassandra.sh. You can start the server by simply executing this file, which sets several defaults. But there’s another file in the bin directory that allows you to configure a variety of settings related to how Cassandra starts. This file is called cassandra.in.sh and it separates certain options, such as the JVM settings, into a different file to make it easier to update.
Tuning the JVM
Heap Min and Max 
By default, these are set to 256MB and 1GB, respectively. To tune these, set them higher (see following note) and to the same value.
Assertions 
By default, the JVM is passed the -ea option to enable assertions. Changing this option from -ea to -da (disable assertions) can have a positive effect on performance.
Survivor Ratio 
The Java heap is broadly divided into two object spaces: young and old. The young space is subdivided into one for new object allocation (called “eden space”) and another for new objects that are still in use. Older objects still have some reference, and have therefore survived a few garbage collections, so the Survivor Ratio is the ratio of eden space to survivor space in the young object part of the heap.
Increasing the ratio makes sense for applications with lots of new object creation and low object preservation; decreasing it makes sense for applications with longer-living objects. Cassandra sets this value to 8 by default, meaning that the ratio of eden to survivor space is 1:8 (each survivor space will be 1/8 the size of eden). This is fairly low, because the objects are living longer in the memtables. Tune this setting along with MaxTenuringThreshold.
MaxTenuringThreshold
Every Java object has an age field in its header, indicating how many times it has been copied within the young generation space. They’re copied (into a new space) when they survive a young generation garbage collection, and this copying has a cost. Because long-living objects may be copied many times, tuning this value can improve performance. By default, Cassandra has this value set at 1. Set it to 0 to immediately move an object that survives a young generation collection to the tenured generation.
UseConcMarkSweepGC

This instructs the JVM on what garbage collection (GC) strategy to use; specifically, it enables the ConcurrentMarkSweep algorithm. This setting uses more RAM, and uses more CPU power to do frequent garbage collections while the application is running in order to keep the GC pause time to a minimum. When using this strategy, it’s important to set the heap min and max values to the same value, in order to prevent the JVM from having to spend a lot of time growing the heap initially. It is possible to tune this to -XX:+UseParallelGC, which also takes advantage of multiprocessor machines; this will give you peak application performance, but with occasional pauses. Do not use the Serial GC with Cassandra.

Note: If you want to change any of these values, simply open the cassandra.in.sh file in a text editor, change the values, and restart.

The majority of options in the include configuration file surround the Java settings. For
example, the default setting for the maximum size of the Java heap memory usage is 1GB. If you’re on a machine capable of using more, you may want to tune this setting.
Try setting the -Xmx and -Xms options to the same value to keep Java from having to
manage heap growth.

Note: The maximum theoretical heap size for a 32-bit JVM is 4GB. However, do not simply set your JVM to use as much memory as you have available up to 4GB. There are many factors involved here, such as the amount of swap space and memory fragmentation. Simply increasing the size of the heap using -Xmx will not help if you don’t have any swap available.
Typically it is possible to get approximately 1.6GB of heap on a 32-bit JVM on Windows and closer to 2GB on Solaris. Using a 64-bit JVM on a 64-bit system will allow more space. See http://java.sun.com/docs/hot spot/HotSpotFAQ.html for more information.

Tuning some of these options will make stress tests perform better. For example, I saw
a 15% performance improvement using the following settings over the defaults:
JVM_OPTS=" \
-da \
-Xms1024M \
-Xmx1024M \
-XX:+UseParallelGC \
-XX:+CMSParallelRemarkEnabled \
-XX:SurvivorRatio=4 \
-XX:MaxTenuringThreshold=0
When performance tuning, it’s a good idea to set only the heap min and max options, and nothing else at first. Only after real-world usage in your environment and some performance benchmarking with the aid of heap analysis tools and observation of your specific application’s behavior should you dive into tuning the more advanced JVM settings. If you tune your JVM options and see some success using a load-testing tool or something like the Python stress test in contrib, don’t get too excited. You need to test under real-world conditions; don’t simply copy these settings.

Note: For more information on Java 6 performance tuning (Java 6 operates differently than previous versions), see 
http://java.sun.com/javase/technologies/hotspot/gc/gc_tuning_6.html

In general, you’ll probably want to make sure that you’ve instructed the heap to dump its state if it hits an out of memory error. This is just good practice if you’ve been getting out of memory errors. You can also instruct the heap to print garbage-collection details. Also, if you have a lot of data in Cassandra and you’re noticing that garbage collection is causing long pauses, you can attempt to cause garbage collection to run when the heap has filled up less memory than it otherwise would take as a threshold to initialize a garbage collection. All of these parameters are shown here:
-XX:CMSInitiatingOccupancyFraction=88 \
-XX:+HeapDumpOnOutOfMemoryError \
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc \

You may want to check out Jonathan Ellis’s blog entry on using a variety of Linux performance monitoring tools at http://spyced.blogspot.com/2010/01/linux-performance-basics.html
In this chapter we looked at the settings available in Cassandra to aid in performance tuning, including caching settings, memory settings, and hardware concerns. We also set up and used the Python stress test tool to write and then read one million rows of data.

If this post has open your apatite for the subject, you can find much more information in the book itself: Cassandra: The Definitive Guide

The excerpt is from the book, 'Cassandra: The Definitive Guide', authored by Eben Hewit, published November 2010 by O’Reilly Media, Copyright 2011 Eben Hewitt.

Cassandra: Reading and Writing Data

Book Review and Excerpt
Cassandra: The Definitive Guide by Eben Hewitt
Chapter 7

My previous post on Cassandra discussed various configuration challenges for a better performance. Let's turn now to the basics: read and write data!


Query Differences Between RDBMS and Cassandra

No Update Query
You can achieve the same effect, however, by simply performing an insert using an existing row key.

Record-Level Atomicity on Writes
Cassandra automatically gives you record-level atomicity on every write operation. In RDBMS, you would have to specify row-level locking. Although Cassandra offers atomicity at the column family level, it does not guarantee isolation.

No Server-Side Transaction Support
If it is applicable for your use case, you’ll need to manually “roll back” writes by issuing a delete if one of the insert operations fails.
More to the point, perhaps, is that Cassandra doesn’t have transactions, because it just
wasn’t built with that goal in mind.

No Duplicate Keys
If you write a new record with a key that already exists in a column family, the values for any existing columns will be overwritten, and any columns that previously were not present for that row will be added to the row.

Basic Write Properties
Writing data is very fast in Cassandra, because its design does not require performing disk reads or seeks. The memtables and SSTables save Cassandra from having to perform these operations on writes, which slow down many databases. All writes in Cassandra are append-only.
Because of the database commit log and hinted handoff design, the database is always
writeable, and within a column family, writes are always atomic.

Consistency Levels
A higher consistency level means that more nodes need to respond to the query, giving you more assurance that the values present on each replica are the same. If two nodes respond with different timestamps, the newest value
wins, and that’s what will be returned to the client. In the background, Cassandra will then perform what’s called a read repair (replicas synchronization).
There are several consistency levels that you can specify, and they mean something different for read operations than for write operations.

Read consistency levels
ZERO - Unsupported. You cannot specify CL.ZERO for read operations because it doesn’t make sense. This would amount to saying “give me the data from no nodes.”
ANY - Unsupported. Use CL.ONE instead.
ONE - Immediately return the record held by the first node that responds to the query. A background thread is created to check that record against the same record on other replicas. If any are out of date, a read repair is then performed to sync them all to the most recent value.
QUORUM - Query all nodes. Once a majority of replicas ((replication factor / 2) + 1) respond, return to the client the value with the most recent timestamp. Then, if necessary, perform a read repair in the
background on all remaining replicas.
ALL - Query all nodes. Wait for all nodes to respond, and return to the client the record with the most recent timestamp. Then, if necessary, perform a read repair in the background. If any nodes fail to respond, fail the read operation.

Note: A node is considered unresponsive if it does not respond to a query before the value specified by   pc_timeout_in_ms in the configuration file. The default is 10 seconds.

Write consistency levels
ZERO - The write operation will return immediately to the client before the write is recorded; the write will happen asynchronously in a background thread, and there are no guarantees of success.
ANY - Ensure that the value is written to a minimum of one node, allowing hints to count as a write.
ONE - Ensure that the value is written to the commit log and memtable of at least one node before returning to the client.
QUORUM - Ensure that the write was received by at least a majority of replicas ((replication factor / 2) + 1).
ALL - Ensure that the number of nodes specified by replication factor received the write before returning to the client. If even one replica is unresponsive to the write operation, fail the operation.

The most notable consistency level for writes is the ANY level. This level means that the write is guaranteed to reach at least one node, but it allows a hint to count as a successful write.
Note: In many cases, the node that makes the hint actually isn’t the node that stores it; instead, it sends it off to one of the nonreplica neighbors of the node that is down.

Using the consistency level of ONE on writes means that the write operation will be written to both the commit log and the memtable. That means that writes at CL.ONE are durable, so this level is the minimum level to use to achieve fast performance and durability.
For both reads and writes, the consistency levels of ZERO, ANY, and ONE are considered weak, whereas QUORUM and ALL are considered strong. Consistency is tuneable in Cassandra because clients can specify the desired consistency level on both reads and writes. There is an equation that is popularly used to represent the way to achieve strong consistency in Cassandra: R(read replica count) + W(write replica count) > N(replication factor) = strong consistency. All client reads will see the most recent write in this scenario, and you will have strong consistency.

Basic Read Properties
If a client connects to a node that doesn’t have the data it’s trying to read, the node it’s connected to will act as coordinator node to read the data from a node that does have it, identified by token ranges.

To fulfill read operations, Cassandra does have to perform seeks, but you can speed these up by adding RAM. Adding RAM will help you if you find the OS doing a lot of paging on reads (in general, it is better to enable the various caches Cassandra has).

So reads are clearly slower than writes, for these various reasons. The partitioner doesn’t influence the speed of reads. In the case of range queries, using OPP is significantly faster because it allows you to easily determine which nodes don’t have your data. The partitioner’s responsibility is to perform the consistent hash operation that maps keys to nodes. In addition, you can choose row caching and key caching strategies to give you a performance boost.

Note: Columns are sorted by their type (as specified by CompareWith), and rows are sorted by their partitioner.

Ranges and Slices
Ranges typically refer to ranges of keys (rows). The term slice is used to refer to a range of columns within a row.

When specifying a range query and using Random Partitioner, there’s really no way to specify a range more narrow than “all”. This is obviously an expensive proposition, because you might incur additional network operations. It can also potentially result in missed keys. That’s because it’s possible that an update happening at the same time as your row scan will miss the updates made earlier in the index than what you are currently processing.

There is another thing that can be confusing at first. When you are using Random Partitioner, you must recall that range queries first hash the keys. So if you are using a range of “Alice” to “Alison”, the query will first run a hash on each of those keys and return not simply the natural values between Alice and Alison, but rather the values between the hashes of those values.

Here is the basic flow of a read operation that looks for a specific key when using Random Partitioner. First, the key is hashed, and then the client connects to any node in the cluster. That node will route the request to the node with that key. The memtable is consulted to see whether your key is present; if it’s not found, then a scan is performed on the Bloom filter for each file, starting with the newest one. Once the key is found in the Bloom filter, it is used to consult the corresponding datafile and find the column values.

Setup for a Sample Application
First, download Cassandra from http://cassandra.apache.org. It’s easiest to get started with the binary version.
For now, we’ll use a Java project in Eclipse. First, create a new project, and then add a few JARs to your classpath: a Log4J JAR to output log statements (don’t forget to set the properties file for your own classes); the Thrift library called libthrift-r917130.jar, which contains the org.apache.thrift classes; and the Cassandra JAR apache-cassandra-x.x.x.jar, which contains the org.apache.cassandra classes. We also need to add the SLF4J logger (both the API and the implementation JAR), which Cassandra requires. Finally, add an argument to your JVM to add the log4j.properties file to your classpath:
-Dlog4j.configuration=file:///home/eben/books/cassandra/log4j.properties
In Eclipse, you can add the log4j.properties file by creating a new Run Configuration. Click the Arguments tab, and then in the VM Arguments text field, enter the parameter specified in the previous code sample - of course using the actual path to your properties file.
The log4j.properties file looks like this:
# output messages into a rolling log file as well as stdout
log4j.rootLogger=DEBUG,stdout,R
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
# rolling log file
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.file.maxFileSize=5MB
log4j.appender.file.maxBackupIndex=5
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %C %F (line %L) %m%n
# This points to your logs directory
log4j.appender.R.File=cass-client.log

Simple Write Read Examples

//create a representation of the Name column
ColumnPath colPathName = new ColumnPath(cfName); // "Standard1"
colPathName.setColumn("name".getBytes(UTF8));
ColumnParent cp = new ColumnParent(cfName);
//insert the name column
LOG.debug("Inserting row for key " + new String(userIDKey)); // "1".getBytes()
client.insert(userIDKey, cp, new Column("name".getBytes(UTF8), "George Clinton".getBytes(),
 clock), CL);

// read just the Name column
LOG.debug("Reading Name Column:");
Column col = client.get(userIDKey, colPathName,
CL).getColumn();
LOG.debug("Column name: " + new String(col.name, UTF8));
LOG.debug("Column value: " + new String(col.value, UTF8));
LOG.debug("Column timestamp: " + col.clock.timestamp);

//create a slice predicate representing the columns to read
//start and finish are the range of columns--here, all
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);
LOG.debug("Complete Row:");
// read all columns in the row
ColumnParent parent = new ColumnParent(cfName);
List<ColumnOrSuperColumn> results = client.get_slice(userIDKey, parent, predicate, CL);
//loop over columns, outputting values
for (ColumnOrSuperColumn result : results) {
 Column column = result.column;
 LOG.debug(new String(column.name, UTF8) + " : " + new String(column.value, UTF8));
} 


So each column value has its own timestamp (wrapped in a clock) because it’s the
column, not the row, that is the atomic unit. This can be confusing when you first come
to Cassandra if you’re used to adding a timestamp column to a relational table to indicate
when it was last updated. But there’s no such thing in Cassandra as the last time
a row was updated; it’s granular at the column level.

The column parent is the path to the parent of a set of columns.

Get Example
ColumnPath path = new ColumnPath();
path.column_family = cfName;
path.column = "name".getBytes(); // sets the name of the column that we’re looking for
ColumnOrSuperColumn cosc = client.get(userIDKey, path, CL);


Slice Predicate
You can specify the predicate one of two ways: with a list of column names or with a slice range. If you know the names of a few columns that you want to retrieve, then you can specify them explicitly by name. If you don’t know their names, or for another reason want to retrieve a range of columns, you use a slice range object to specify the range.
Note: Cassandra is a big data store and can hold two billion columns per row in version 0.7.

You can get a set of columns contained in a column parent using a get_slice operation.
It will retrieve values by column name or a range of names. Example:
SlicePredicate predicate = new SlicePredicate();
List<byte[]> colNames = new ArrayList<byte[]>();
colNames.add("a".getBytes());
colNames.add("b".getBytes());
predicate.column_names = colNames;
ColumnParent parent = new ColumnParent("Standard1");
byte[] key = "k1".getBytes();
List<ColumnOrSuperColumn> results =
client.get_slice(key, parent, predicate, ConsistencyLevel.ONE);


Or, in order to read a range of the columns in a row, you can specify the start and finish columns, and Cassandra will give you the start and finish columns as well as any columns in
between, according to the sorting order based on the comparator for that column family.
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart("age".getBytes());
sliceRange.setFinish("name".getBytes());

sliceRange.count = 10; // limit the number of columns returned
predicate.setSlice_range(sliceRange);


You must consider the column names according to their comparator, and specify start
and finish in the proper order. For example, trying to set a start of name and a finish of
age will throw an exception like this:
InvalidRequestException (why:range finish must come after start in the order of traversal)

Note: You can also reverse the order in which the columns are fetched by setting the reversed = true attribute on the slice range.

Recall that “returning a column” doesn’t mean that you get the value, as in SQL; it means that you get the complete column data structure, which is the name, the value, and the timestamp.

Get Range Slices
In the same way that you access a set of columns by using a range, you can also access
a range of keys or tokens. Using the get_range_slices operation, you can pass it a
KeyRange object that defines the range of keys you want.
One major distinction here is that you can get either keys or tokens in a range using
the same KeyRange data structure as the parameter to get_range_slices. Key ranges are
start-inclusive, whereas token ranges are start-exclusive; because of ring distribution,
tokens can wrap around so that the end token is less than the start token.
SlicePredicate predicate = new SlicePredicate();
List<byte[]> colNames = new ArrayList<byte[]>();
colNames.add("a".getBytes());
colNames.add("b".getBytes());
predicate.column_names = colNames;
ColumnParent parent = new ColumnParent("Standard1");
KeyRange keyRange = new KeyRange();
keyRange.start_key = "k1".getBytes();
keyRange.end_key = "k2".getBytes();
//a key slice is returned
List<KeySlice> results = client.get_range_slices(parent, predicate, keyRange,
 ConsistencyLevel.ONE);


Multiget Slice
With get_slice, we saw how to get a set of column names for a single specified row key. multiget_slice lets you retrieve a subset of columns for a set of row keys.
//instead of one row key, we specify many
List<byte[]> rowKeys = new ArrayList<byte[]>();
rowKeys.add("k1".getBytes());
rowKeys.add("k2".getBytes());
//instead of a simple list, we get a map, where the keys are row keys
//and the values the list of columns returned for each
Map<byte[],List<ColumnOrSuperColumn>> results =
client.multiget_slice(rowKeys, parent, predicate, CL);


Note: Because we’re using Thrift as the underlying RPC mechanism to communicate
with Cassandra, the results will come back to you unordered and you’ll have to sort them on the client. That’s because Thrift can’t preserve order. A multiget is actually just a wrapper over a series of get requests.

Deleting
Deleting data is not the same in Cassandra as it is in a relational database. In RDBMS,
you simply issue a delete statement that identifies the row or rows you want to delete.
In Cassandra, a delete does not actually remove the data immediately. There’s a simple
reason for this: Cassandra’s durable, eventually consistent, distributed design. If Cassandra
had a straightforward design for deletes and a node goes down, that node would
therefore not receive the delete. Once that node comes back online, it would mistakenly
think that all of the nodes that had received the delete had actually missed a write (the
data that it still has because it missed the delete), and it would start repairing all of the
other nodes. So Cassandra needs a more sophisticated mechanism to support deletes.
That mechanism is called a tombstone.
A tombstone is a special marker issued in a delete that overwrites the deleted values,
acting as a placeholder. If any replica did not receive the delete operation, the tombstone
can later be propagated to those replicas when they are available again. The net effect
of this design is that your data store will not immediately shrink in size following a
delete. Each node keeps track of the age of all its tombstones. Once they reach the age
as configured in gc_grace_seconds (which is 10 days by default), then a compaction is
run, the tombstones are garbage-collected, and the corresponding disk space is
recovered.
Note: Remember that SSTables are immutable, so the data is not deleted from the SSTable. On compaction, tombstones are accounted for, merged data is sorted, a new index is created over the sorted data, and the freshly merged, sorted, and indexed data is written to a single new file.

The assumption is that 10 days is plenty of time for you to bring a failed node back online before compaction runs. If you feel comfortable doing so, you can reduce that grace period to reclaim disk space more quickly.

Note: Because a remove operation is really a tombstone write, you still have to supply a timestamp with the operation, because if there are multiple clients writing, the highest timestamp wins—and those writes might include a tombstone or a new value. Cassandra doesn’t discriminate here; whichever operation has the highest timestamp will win.

client.remove(key, colPath, clock, ConsistencyLevel.ALL);

Batch Mutates
To perform many insert or update operations at once, use the batch_mutate method
instead of the insert method. Like a batch update in the relational world, the
batch_mutate operation allows grouping calls on many keys into a single call in order
to save on the cost of network round trips. If batch_mutate fails in the middle of its list
of mutations, there will be no rollback, so any updates that have already occured up to
this point will remain intact. In the case of such a failure, the client can retry the
batch_mutate operation.

Batch Deletes
You use remove to delete a single column, but you can use a Deletion structure with a
batch_mutate operation to perform a set of complex delete operations at once.
You can create a list of column names that you want to delete, and then indirectly pass
it to batch_mutate:
First, create the list of column names to delete. Pass that list to a SlicePredicate, pass
the SlicePredicate to a Deletion object, pass that to a Mutation object, and finally,
pass that to a batch_mutate.
Example:
SlicePredicate delPred = new SlicePredicate();
List<byte[]> delCols = new ArrayList<byte[]>();
//let's delete the column named 'b', though we could add more
delCols.add("b".getBytes());
delPred.column_names = delCols;
Deletion deletion = new Deletion();
deletion.predicate = delPred;
deletion.clock = clock;
Mutation mutation = new Mutation();
mutation.deletion = deletion;
Map<byte[], Map<String, List<Mutation>>> mutationMap =
new HashMap<byte[], Map<String, List<Mutation>>>();
List<Mutation> mutationList = new ArrayList<Mutation>();
mutationList.add(mutation);
Map<String, List<Mutation>> m = new HashMap<String, List<Mutation>>();
m.put(columnFamily, mutationList);

//just for this row key, though we could add more
mutationMap.put(key, m);
client.batch_mutate(mutationMap, ConsistencyLevel.ALL);


There is a second way to specify items to delete using the Deletion structure: you can
use a SliceRange instead of a List of columns, so you can delete by range instead of
explicitly listing column names.

Range Ghosts
You may sometimes hear people refer to “range ghosts” in Cassandra. This means that
even if you have deleted all of the columns for a given row, you will still see a result
returned for that row in a range slice, but the column data will be empty. This is valid,
and is just something to keep in mind as you iterate result sets on the client.

Programmatically Defining Keyspaces and Column Families
KsDef ksdef = new KsDef();
ksdef.name = "ProgKS";
ksdef.replication_factor = 1;
ksdef.strategy_class =
"org.apache.cassandra.locator.RackUnawareStrategy";
List<CfDef> cfdefs = new ArrayList<CfDef>();
CfDef cfdef1 = new CfDef();
cfdef1.name = "ProgCF1";
cfdef1.keyspace = ksdef.name;
cfdefs.add(cfdef1);

ksdef.cf_defs = cfdefs;
client.system_add_keyspace(ksdef);
System.out.println("Defining new cf.");
CfDef cfdef2 = new CfDef();
cfdef2.keyspace = ksdef.name;
cfdef2.column_type = "Standard";
cfdef2.name = "ProgCF";
client.system_add_column_family(cfdef2);


In this chapter we experienced data reading and writing using a variety of operations offered
by Cassandra’s rich API.

If this post has open your apatite for the subject, you can find much more information in the book itself: Cassandra: The Definitive Guide

The excerpt is from the book, 'Cassandra: The Definitive Guide', authored by Eben Hewit,

 published November 2010 by O’Reilly Media, Copyright 2011 Eben Hewitt.

Monday, March 5, 2012

Configuring Cassandra

Book Review and Excerpt
Cassandra: The Definitive Guide by Eben Hewitt
Chapter 5

My previous post on Cassandra discussed it's architecture.
But Cassandra's architecture is no more interesting then learning how to play with its configurations for improving performance.

Keyspaces
Keyspaces used to be defined statically in an XML configuration file, but as of 0.7, you can use the API to create keyspaces and column families: system_add_keyspace,  system_rename_keyspace, system_drop_keyspace, system_add_column_family, system_drop_column_family, system_rename_column_family.

Creating a Column Family
To create a column family, you also use the CLI or the API. Here are the options available
when creating a column family: column_type (Super or Standard), clock_type (Timestamp),
comparator (AsciiType, BytesType, LexicalUUIDType, LongType, TimeUUIDType, and UTF8Type), subcomparator (for subcolumns), reconciler (Timestamp), comment, rows_cached, preload_row_cache, key_cache_size, read_repair_chance.

Replicas
Every Cassandra node is a replica for something. If your replication factor is set to N, then each node will act as a replica for N ranges, even if N is set to 1.
A Cassandra cluster, or collection of hosts, is typically referred to as a ring. Each node in the ring is assigned a single, unique token. Each node claims ownership of the range of values from its token to the token of the previous node.
When creating a replica, the first replica is always placed in the node claiming the key
range of its token. All remaining replicas are distributed according to a configurable
replication strategy, which we’ll look at now.

Replica Placement Strategies
Choosing the right replication strategy is important because the strategy determines which nodes are responsible for which key ranges. The implication is that you’re also determining which nodes should receive which write operations, which can have a big impact on efficiency in different scenarios. If you set up your cluster such that all writes are going to two data centers—one in Australia and one in Reston, Virginia—you will see a matching performance degradation.

Simple Strategy (or Rack-Unaware Strategy)
This strategy places replicas in a single data center, in a manner that is not aware of their placement on a data center rack. This means that the implementation is theoretically fast, but not if the next node that has the given key is in a different rack than others.
What’s happening here is that the next N nodes on the ring are chosen to hold replicas,
and the strategy has no notion of data centers.

Old Network Topology Strategy (or Rack-Aware Strategy)
Say you have two data centers, DC1 and DC2, and a set of Cassandra servers. This strategy will place some replicas in DC1, putting each in a different available rack. It will also ensure that another replica is placed in DC2. The Rack-Aware Strategy is specifically for when you have nodes in the same Cassandra cluster spread over two data centers and you are using a replication factor of 3.
Use this strategy when you want to ensure higher availability at the potential cost of some additional latency while the third node in the alternate data center is contacted.
There is no point in using Rack-Aware Strategy if you are only running Cassandra in a single data center. Cassandra is optimized for running in different data centers, however, and taking advantage of its high availability may be an important consideration for you.
If you use the Rack-Aware Strategy, you must also use the Rack-Aware Snitch (explained in a moment).

Network Topology Strategy
This strategy allows you to specify how replicas should be placed across data centers. To use it, you supply parameters in which you indicate the desired replication strategy for each data center.
This strategy used to employ a file called datacenter.properties. But as of 0.7, this metadata is attached directly to the keyspace and strategy as a map of configuration options.

Replication Factor
The replication factor specifies how many copies of each piece of data will be stored and distributed throughout the Cassandra cluster. It is specified by the replication_factor setting.
It may seem intuitive that the more nodes you have in your cluster, the more you want to increase the replication factor. However, you don’t want to follow this rule of thumb alone; instead, increase the replication factor in order to satisfy your required service level.
With a replication factor of one, your data will exist only in a single node in the cluster. Losing that node means that data becomes unavailable. It also means that Cassandra will have to do more work as coordinator among nodes; if all the data for a given key is on node B, every client request for that key that enters through node A will need to be forwarded.
Cassandra will achieve high consistency when the read replica count plus the write
replica count is greater than the replication factor.
So if you have 10 nodes in your cluster, you could set the replication factor to 10 as the maximum. But you might not want to do that, as it robs Cassandra of what it’s good at and stymies its availability, because if a single node goes down, you won’t be able to have high consistency. Instead, set the replication factor to a reasonable number and then tune the consistency level up or down. The consistency level never allows you to write to more than the number of nodes specified by the replication factor. A “reasonable number” here is probably fairly low. ONE seems like the lowest number; however, ANY is similar to ONE but even less consistent, since you might not see the written value until a long time after you wrote it. If any node in the cluster is alive, ANY should succeed.

If you’re new to Cassandra, the replication factor can sometimes be confused with the consistency level. The replication factor is set per keyspace, and is specified in the server’s config file. The consistency level is specified per query, by the client. The replication factor indicates how many nodes you want to use to store a value during each write operation. The consistency level specifies how many nodes the client has decided must respond in order to feel confident of a successful read or write operation. The confusion arises because the consistency level is based on the replication factor, not on the number of nodes in the system.

Increasing the Replication Factor
As your application grows and you need to add nodes, you can increase the replication factor. There are some simple guidelines to follow when you do this. First, keep in mind that you’ll have to restart the nodes after a simple increase in the replication factor value. A repair (replicas synchronization) will then be performed after you restart the nodes, as Cassandra will have to redistribute some data in order to account for the increased replication factor. For as long as the repair takes, it is possible that some clients will receive a notice that data does not exist if they connect to a replica that doesn’t have the data yet.
A faster way of increasing the replication factor from 1 to 2 is to use the node tool. First, execute a drain on the original node to ensure that all the data is flushed to the SSTables. Then, stop that node so it doesn’t accept any more writes. Next, copy the datafiles from your keyspaces (the files under the directory that is your value for the DataFile Directory element in the config). Make sure not to copy the values in the internal Cassandra keyspace. Paste those datafiles to the new node. Change the settings in the configuration of both nodes so that the replication factor is set to 2. Make sure that autobootstrap is set to false in both nodes. Then, restart both nodes and run node tool repair. These steps will ensure that clients will not have to endure the potential of false empty reads for as long.

Partitioners
The purpose of the partitioner is to allow you to specify how row keys should be sorted, which has a significant impact on how data will be distributed across your nodes. It also has an effect on the options available for querying ranges of rows. There are a few different partitioners you can use.
Note: The choice of partitioner does not apply to column sorting, only row key sorting.
You set the partitioner by updating the value of the Partitioner element in the config file or in the API.
Note: Keep in mind that the partitioner can modify the on-disk SSTable representations. So if you change the partitioner type, you will have to delete your data directories.

Random Partitioner
It uses a BigIntegerToken with an MD5 hash applied to it to determine where to place the keys on the node ring. This has the advantage of spreading your keys evenly across your cluster, because the distribution is random. It has the disadvantage of causing inefficient range queries, because keys within a specified range might be placed in a variety of disparate locations in the ring, and key range queries will return data in an essentially random order.

Order-Preserving Partitioner
Using this type of partitioner, the token is a UTF-8 string, based on a key. Rows are therefore stored by key order, aligning the physical structure of the data with your sort order.
This allows you to perform range slices.
It’s worth noting that OPP isn’t more efficient for range queries than random partitioning—
it just provides ordering. It has the disadvantage of creating a ring that is potentially very lopsided, because real-world data typically is not written to evenly. As an example, consider the value assigned to letters in a Scrabble game. Q and Z are rarely used, so they get a high value. With OPP, you’ll likely eventually end up with lots of data on some nodes and much less data on other nodes. The nodes on which lots of data is stored, making the ring lopsided, are often referred to as “hot spots.” Because of the ordering aspect, users are commonly attracted to OPP early on. However, using OPP means that your operations team will need to manually rebalance nodes periodically using Nodetool’s loadbalance or move operations.
If you want to perform range queries from your clients, you must use an orderpreserving
partitioner or a collating order-preserving partitioner.

Collating Order-Preserving Partitioner
This partitioner orders keys according to a United States English locale (EN_US). Like OPP, it requires that the keys are UTF-8 strings. 
This partitioner is rarely employed, as its usefulness is limited.

Byte-Ordered Partitioner
It is an order-preserving
partitioner that treats the data as raw bytes, instead of converting them to strings the way the order-preserving partitioner and collating order-preserving partitioner do. If you need an order-preserving partitioner that doesn’t validate your keys as being strings, BOP is recommended for the performance improvement.

Snitches
The job of a snitch is simply to determine relative host proximity. Snitches gather some information about your network topology so that Cassandra can efficiently route requests.
The snitch will figure out where nodes are in relation to other nodes. But inferring data centers is the job of the replication strategy.
You configure which endpoint snitch implementation to use by updating the value for
the <EndPointSnitch> element in the configuration file.

Simple Snitch
If two hosts have the same value in the second octet of their IP addresses, then they are determined to be in the same data center. If two hosts have the same value in the third octet of their IP addresses, then they are determined to be in the same rack. “Determined to be” really means that Cassandra has to guess based on an assumption of how your servers are
located in different VLANs or subnets.

PropertyFileSnitch
This snitch helps Cassandra know for certain if two IPs are in the same data center or on the same rack—because you tell it that they are. This is perhaps most useful if you move servers a lot, as operations often need to, or if you have inherited an unwieldy IP scheme.
The default configuration of cassandra-rack.properties looks like this:
 
# Cassandra Node IP=Data Center:Rack
10.0.0.10=DC1:RAC1
10.0.0.11=DC1:RAC1
10.0.0.12=DC1:RAC2
 
10.20.114.10=DC2:RAC1
10.20.114.11=DC2:RAC1
10.20.114.15=DC2:RAC2
 
# default for unknown nodes
default=DC1:r1

Here we see that there are two data centers, each with two racks. Cassandra can determine an even distribution with good performance.
Update the values in this file to record each node in your cluster to specify which rack contains the node with that IP and which data center it’s in. Although this may seem difficult to maintain if you expect to add or remove nodes with some frequency, remember that it’s one alternative, and it trades away a little flexibility and ease of maintenance in order to give you more control and better runtime performance, as Cassandra doesn’t have to figure out where nodes are. Instead, you just tell it where they are.

Adding Nodes to a Cluster
Let's add a second node in a ring to share the load. 
First, make sure that the nodes in the cluster all have the same name and the same keyspace definitions so that the new node can accept data. Edit the config file on the second node to indicate that the first one will act as the seed. 
Then, set autobootstrap to true. 
When the second node is started up, it immediately recognizes the first node, but then sleeps for 90 seconds to allow time for nodes to gossip information about how much data they are storing locally. It then gets the bootstrap token from the first node, so it knows what section of the data it should accept. The bootstrap token is a token that splits the load of the most-loaded node in half. Again, the second node sleeps for 30
seconds and then starts bootstrapping.

Depending on how much data you have, you could see your new node in this state for some time.
When the transfer is done, we can run node tool again to make sure that everything is set up properly:

$ bin/nodetool -h 192.168.1.5 ring
Address         Status  Load               Range    Ring
192.168.1.7   Up         229.56 MB      ...           |<--|
192.168.1.5   Up         459.26 MB      ...           |-->|

Multiple Seed Nodes
Cassandra allows you to specify multiple seed nodes. A seed node is used as a contact point for other nodes, so Cassandra can learn the topology of the cluster, that is, what hosts have what ranges.
By default, the configuration file will have only a single seed entry:
seeds:
 - 127.0.0.1
To add more seed nodes to your ring, just add another seed element.
seeds:
 - 192.168.1.5
 - 192.168.1.7
 Make sure that the autobootstrap element is set to false if you’re using a node’s own IP address as a seed. One approach is to make the first three or five nodes in a cluster seeds, without autobootstrap—that is, by manually choosing tokens or by allowing Cassandra to pick random tokens. The rest of your nodes will not be seeds, so they can be added using autobootstrap.
Next, we need to update the listen address. This is how nodes identify each other, and it’s used for all internal communication.
listen_address: 192.168.1.5
Finally, we need to change the rpc_address because this is how other nodes will see this one.
rpc_address: 192.168.1.5
The rpc_address setting is used only for connections made directly from clients to Cassandra nodes.
Now you can restart Cassandra and start the installations on the other machines.

Dynamic Ring Participation
Nodes in a Cassandra cluster can be brought down and brought back up without disruption to the rest of the cluster (assuming a reasonable replication factor and consistency level).

Security
The authenticator that’s plugged in by default is the org.apache.cassandra.auth.AllowAllAuthenticator. If you want to force clients to provide credentials, another alternative ships with Cassandra, the org.apache.cassandra.auth.SimpleAuthenticator.

There are two files included in the config directory: 
access.properties - specifies which users are allowed access to which keyspaces:
 Keyspace1=jsmith,Elvis Presley,dilbert
passwd.properties - contains a list of these users and specifies the password for each of them:
 jsmith=havebadpass
 Elvis\ Presley=graceland4evar
 dilbert=nomoovertime
To use the Simple Authenticator, replace the value for the authenticator element in cassandra.yaml.

We also have to tell Cassandra the location of the access and passwords files, using the bin/cassandra.in.sh include script.
JVM_OPTS="
 -Dpasswd.properties=/home/eben/books/cassandra/dist/apache-cassandra-0.7.0-beta1/conf/passwd.properties
 -Daccess.properties=/home/eben/books/cassandra/dist/apache-cassandra-0.7.0-beta1/conf/access.properties"

Now we can log into the command-line interface with a username and password combination:
[default@unknown] use Keyspace1 jsmith 'havebadpass'

We can also specify the username and password combination when we connect on the CLI:
eben@morpheus: bin/cassandra-cli --host localhost --port 9160 --username jsmith --password havebadpass --keyspace Keyspace1

If you have set up authentication on your keyspace, your client application code will need to log in.

TTransport tr = new TSocket("localhost", 9160);
TFramedTransport tf = new TFramedTransport(tr);
TProtocol proto = new TBinaryProtocol(tf);
Cassandra.Client client = new Cassandra.Client(proto);
tr.open();
AuthenticationRequest authRequest = new AuthenticationRequest();
Map<String, String> credentials = new HashMap<String, String>();
credentials.put("username", "jsmith");
credentials.put("password", "havebadpass");
authRequest.setCredentials(credentials);
client.set_keyspace("Keyspace1");
AccessLevel access = client.login(authRequest);

Note: The SimpleAuthenticator class has two modes for specifying passwords: plain text and MD5 encrypted.

You enable MD5 in the cassandra.in.sh file by passing the passwd.mode switch to the JVM:
 JVM_OPTS=" \
 -da \
 //other stuff...
 -Dpasswd.mode=MD5"

You can also provide your own method of authenticating to Cassandra by implementing the IAuthenticator interface.
There are a few other settings that you'd like to further investigate: column_index_size_in_kb, in_memory_compaction_limit_in_mb, gc_grace_seconds, phi_convict_threshold

Viewing Keys
You can see which keys are in your SSTables using a script called sstablekeys.
eben@morpheus$ bin/sstablekeys /var/lib/cassandra/data/Hotelier/Hotel-1-Data.db

Starting with version 0.7, the user keyspaces defined in cassandra.yaml are not loaded by default when the server starts.
In order to load them, you need to invoke the loadSchemaFromYaml JMX operation.

Note: If you need to import and export data, Cassandra comes with two scripts for working with JSON: one that imports JSON into Cassandra SSTables, and another that exports existing SSTables into JSON.

Note: If you prefer to use a graphical interface instead of the CLI, you can get information regarding your Cassandra setup using Chiton, which is a Python GTK-based Cassandra
browser written by Brandon Williams. You can find it at http://github.com/driftx/chiton.

In this chapter we looked at how to configure Cassandra. We saw how to set the replication factor and the snitch, and how to use a good replica placement strategy.

If this post has open your apatite for the subject, you can find much more information in the book itself: Cassandra: The Definitive Guide

The excerpt is from the book, 'Cassandra: The Definitive Guide', authored by Eben Hewit, published November 2010 by O’Reilly Media, Copyright 2011 Eben Hewitt.