Monday, March 19, 2012

Cassandra: Reading and Writing Data

Book Review and Excerpt
Cassandra: The Definitive Guide by Eben Hewitt
Chapter 7

My previous post on Cassandra discussed various configuration challenges for a better performance. Let's turn now to the basics: read and write data!


Query Differences Between RDBMS and Cassandra

No Update Query
You can achieve the same effect, however, by simply performing an insert using an existing row key.

Record-Level Atomicity on Writes
Cassandra automatically gives you record-level atomicity on every write operation. In RDBMS, you would have to specify row-level locking. Although Cassandra offers atomicity at the column family level, it does not guarantee isolation.

No Server-Side Transaction Support
If it is applicable for your use case, you’ll need to manually “roll back” writes by issuing a delete if one of the insert operations fails.
More to the point, perhaps, is that Cassandra doesn’t have transactions, because it just
wasn’t built with that goal in mind.

No Duplicate Keys
If you write a new record with a key that already exists in a column family, the values for any existing columns will be overwritten, and any columns that previously were not present for that row will be added to the row.

Basic Write Properties
Writing data is very fast in Cassandra, because its design does not require performing disk reads or seeks. The memtables and SSTables save Cassandra from having to perform these operations on writes, which slow down many databases. All writes in Cassandra are append-only.
Because of the database commit log and hinted handoff design, the database is always
writeable, and within a column family, writes are always atomic.

Consistency Levels
A higher consistency level means that more nodes need to respond to the query, giving you more assurance that the values present on each replica are the same. If two nodes respond with different timestamps, the newest value
wins, and that’s what will be returned to the client. In the background, Cassandra will then perform what’s called a read repair (replicas synchronization).
There are several consistency levels that you can specify, and they mean something different for read operations than for write operations.

Read consistency levels
ZERO - Unsupported. You cannot specify CL.ZERO for read operations because it doesn’t make sense. This would amount to saying “give me the data from no nodes.”
ANY - Unsupported. Use CL.ONE instead.
ONE - Immediately return the record held by the first node that responds to the query. A background thread is created to check that record against the same record on other replicas. If any are out of date, a read repair is then performed to sync them all to the most recent value.
QUORUM - Query all nodes. Once a majority of replicas ((replication factor / 2) + 1) respond, return to the client the value with the most recent timestamp. Then, if necessary, perform a read repair in the
background on all remaining replicas.
ALL - Query all nodes. Wait for all nodes to respond, and return to the client the record with the most recent timestamp. Then, if necessary, perform a read repair in the background. If any nodes fail to respond, fail the read operation.

Note: A node is considered unresponsive if it does not respond to a query before the value specified by   pc_timeout_in_ms in the configuration file. The default is 10 seconds.

Write consistency levels
ZERO - The write operation will return immediately to the client before the write is recorded; the write will happen asynchronously in a background thread, and there are no guarantees of success.
ANY - Ensure that the value is written to a minimum of one node, allowing hints to count as a write.
ONE - Ensure that the value is written to the commit log and memtable of at least one node before returning to the client.
QUORUM - Ensure that the write was received by at least a majority of replicas ((replication factor / 2) + 1).
ALL - Ensure that the number of nodes specified by replication factor received the write before returning to the client. If even one replica is unresponsive to the write operation, fail the operation.

The most notable consistency level for writes is the ANY level. This level means that the write is guaranteed to reach at least one node, but it allows a hint to count as a successful write.
Note: In many cases, the node that makes the hint actually isn’t the node that stores it; instead, it sends it off to one of the nonreplica neighbors of the node that is down.

Using the consistency level of ONE on writes means that the write operation will be written to both the commit log and the memtable. That means that writes at CL.ONE are durable, so this level is the minimum level to use to achieve fast performance and durability.
For both reads and writes, the consistency levels of ZERO, ANY, and ONE are considered weak, whereas QUORUM and ALL are considered strong. Consistency is tuneable in Cassandra because clients can specify the desired consistency level on both reads and writes. There is an equation that is popularly used to represent the way to achieve strong consistency in Cassandra: R(read replica count) + W(write replica count) > N(replication factor) = strong consistency. All client reads will see the most recent write in this scenario, and you will have strong consistency.

Basic Read Properties
If a client connects to a node that doesn’t have the data it’s trying to read, the node it’s connected to will act as coordinator node to read the data from a node that does have it, identified by token ranges.

To fulfill read operations, Cassandra does have to perform seeks, but you can speed these up by adding RAM. Adding RAM will help you if you find the OS doing a lot of paging on reads (in general, it is better to enable the various caches Cassandra has).

So reads are clearly slower than writes, for these various reasons. The partitioner doesn’t influence the speed of reads. In the case of range queries, using OPP is significantly faster because it allows you to easily determine which nodes don’t have your data. The partitioner’s responsibility is to perform the consistent hash operation that maps keys to nodes. In addition, you can choose row caching and key caching strategies to give you a performance boost.

Note: Columns are sorted by their type (as specified by CompareWith), and rows are sorted by their partitioner.

Ranges and Slices
Ranges typically refer to ranges of keys (rows). The term slice is used to refer to a range of columns within a row.

When specifying a range query and using Random Partitioner, there’s really no way to specify a range more narrow than “all”. This is obviously an expensive proposition, because you might incur additional network operations. It can also potentially result in missed keys. That’s because it’s possible that an update happening at the same time as your row scan will miss the updates made earlier in the index than what you are currently processing.

There is another thing that can be confusing at first. When you are using Random Partitioner, you must recall that range queries first hash the keys. So if you are using a range of “Alice” to “Alison”, the query will first run a hash on each of those keys and return not simply the natural values between Alice and Alison, but rather the values between the hashes of those values.

Here is the basic flow of a read operation that looks for a specific key when using Random Partitioner. First, the key is hashed, and then the client connects to any node in the cluster. That node will route the request to the node with that key. The memtable is consulted to see whether your key is present; if it’s not found, then a scan is performed on the Bloom filter for each file, starting with the newest one. Once the key is found in the Bloom filter, it is used to consult the corresponding datafile and find the column values.

Setup for a Sample Application
First, download Cassandra from http://cassandra.apache.org. It’s easiest to get started with the binary version.
For now, we’ll use a Java project in Eclipse. First, create a new project, and then add a few JARs to your classpath: a Log4J JAR to output log statements (don’t forget to set the properties file for your own classes); the Thrift library called libthrift-r917130.jar, which contains the org.apache.thrift classes; and the Cassandra JAR apache-cassandra-x.x.x.jar, which contains the org.apache.cassandra classes. We also need to add the SLF4J logger (both the API and the implementation JAR), which Cassandra requires. Finally, add an argument to your JVM to add the log4j.properties file to your classpath:
-Dlog4j.configuration=file:///home/eben/books/cassandra/log4j.properties
In Eclipse, you can add the log4j.properties file by creating a new Run Configuration. Click the Arguments tab, and then in the VM Arguments text field, enter the parameter specified in the previous code sample - of course using the actual path to your properties file.
The log4j.properties file looks like this:
# output messages into a rolling log file as well as stdout
log4j.rootLogger=DEBUG,stdout,R
# stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
# rolling log file
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.file.maxFileSize=5MB
log4j.appender.file.maxBackupIndex=5
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %C %F (line %L) %m%n
# This points to your logs directory
log4j.appender.R.File=cass-client.log

Simple Write Read Examples

//create a representation of the Name column
ColumnPath colPathName = new ColumnPath(cfName); // "Standard1"
colPathName.setColumn("name".getBytes(UTF8));
ColumnParent cp = new ColumnParent(cfName);
//insert the name column
LOG.debug("Inserting row for key " + new String(userIDKey)); // "1".getBytes()
client.insert(userIDKey, cp, new Column("name".getBytes(UTF8), "George Clinton".getBytes(),
 clock), CL);

// read just the Name column
LOG.debug("Reading Name Column:");
Column col = client.get(userIDKey, colPathName,
CL).getColumn();
LOG.debug("Column name: " + new String(col.name, UTF8));
LOG.debug("Column value: " + new String(col.value, UTF8));
LOG.debug("Column timestamp: " + col.clock.timestamp);

//create a slice predicate representing the columns to read
//start and finish are the range of columns--here, all
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new byte[0]);
sliceRange.setFinish(new byte[0]);
predicate.setSlice_range(sliceRange);
LOG.debug("Complete Row:");
// read all columns in the row
ColumnParent parent = new ColumnParent(cfName);
List<ColumnOrSuperColumn> results = client.get_slice(userIDKey, parent, predicate, CL);
//loop over columns, outputting values
for (ColumnOrSuperColumn result : results) {
 Column column = result.column;
 LOG.debug(new String(column.name, UTF8) + " : " + new String(column.value, UTF8));
} 


So each column value has its own timestamp (wrapped in a clock) because it’s the
column, not the row, that is the atomic unit. This can be confusing when you first come
to Cassandra if you’re used to adding a timestamp column to a relational table to indicate
when it was last updated. But there’s no such thing in Cassandra as the last time
a row was updated; it’s granular at the column level.

The column parent is the path to the parent of a set of columns.

Get Example
ColumnPath path = new ColumnPath();
path.column_family = cfName;
path.column = "name".getBytes(); // sets the name of the column that we’re looking for
ColumnOrSuperColumn cosc = client.get(userIDKey, path, CL);


Slice Predicate
You can specify the predicate one of two ways: with a list of column names or with a slice range. If you know the names of a few columns that you want to retrieve, then you can specify them explicitly by name. If you don’t know their names, or for another reason want to retrieve a range of columns, you use a slice range object to specify the range.
Note: Cassandra is a big data store and can hold two billion columns per row in version 0.7.

You can get a set of columns contained in a column parent using a get_slice operation.
It will retrieve values by column name or a range of names. Example:
SlicePredicate predicate = new SlicePredicate();
List<byte[]> colNames = new ArrayList<byte[]>();
colNames.add("a".getBytes());
colNames.add("b".getBytes());
predicate.column_names = colNames;
ColumnParent parent = new ColumnParent("Standard1");
byte[] key = "k1".getBytes();
List<ColumnOrSuperColumn> results =
client.get_slice(key, parent, predicate, ConsistencyLevel.ONE);


Or, in order to read a range of the columns in a row, you can specify the start and finish columns, and Cassandra will give you the start and finish columns as well as any columns in
between, according to the sorting order based on the comparator for that column family.
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setStart("age".getBytes());
sliceRange.setFinish("name".getBytes());

sliceRange.count = 10; // limit the number of columns returned
predicate.setSlice_range(sliceRange);


You must consider the column names according to their comparator, and specify start
and finish in the proper order. For example, trying to set a start of name and a finish of
age will throw an exception like this:
InvalidRequestException (why:range finish must come after start in the order of traversal)

Note: You can also reverse the order in which the columns are fetched by setting the reversed = true attribute on the slice range.

Recall that “returning a column” doesn’t mean that you get the value, as in SQL; it means that you get the complete column data structure, which is the name, the value, and the timestamp.

Get Range Slices
In the same way that you access a set of columns by using a range, you can also access
a range of keys or tokens. Using the get_range_slices operation, you can pass it a
KeyRange object that defines the range of keys you want.
One major distinction here is that you can get either keys or tokens in a range using
the same KeyRange data structure as the parameter to get_range_slices. Key ranges are
start-inclusive, whereas token ranges are start-exclusive; because of ring distribution,
tokens can wrap around so that the end token is less than the start token.
SlicePredicate predicate = new SlicePredicate();
List<byte[]> colNames = new ArrayList<byte[]>();
colNames.add("a".getBytes());
colNames.add("b".getBytes());
predicate.column_names = colNames;
ColumnParent parent = new ColumnParent("Standard1");
KeyRange keyRange = new KeyRange();
keyRange.start_key = "k1".getBytes();
keyRange.end_key = "k2".getBytes();
//a key slice is returned
List<KeySlice> results = client.get_range_slices(parent, predicate, keyRange,
 ConsistencyLevel.ONE);


Multiget Slice
With get_slice, we saw how to get a set of column names for a single specified row key. multiget_slice lets you retrieve a subset of columns for a set of row keys.
//instead of one row key, we specify many
List<byte[]> rowKeys = new ArrayList<byte[]>();
rowKeys.add("k1".getBytes());
rowKeys.add("k2".getBytes());
//instead of a simple list, we get a map, where the keys are row keys
//and the values the list of columns returned for each
Map<byte[],List<ColumnOrSuperColumn>> results =
client.multiget_slice(rowKeys, parent, predicate, CL);


Note: Because we’re using Thrift as the underlying RPC mechanism to communicate
with Cassandra, the results will come back to you unordered and you’ll have to sort them on the client. That’s because Thrift can’t preserve order. A multiget is actually just a wrapper over a series of get requests.

Deleting
Deleting data is not the same in Cassandra as it is in a relational database. In RDBMS,
you simply issue a delete statement that identifies the row or rows you want to delete.
In Cassandra, a delete does not actually remove the data immediately. There’s a simple
reason for this: Cassandra’s durable, eventually consistent, distributed design. If Cassandra
had a straightforward design for deletes and a node goes down, that node would
therefore not receive the delete. Once that node comes back online, it would mistakenly
think that all of the nodes that had received the delete had actually missed a write (the
data that it still has because it missed the delete), and it would start repairing all of the
other nodes. So Cassandra needs a more sophisticated mechanism to support deletes.
That mechanism is called a tombstone.
A tombstone is a special marker issued in a delete that overwrites the deleted values,
acting as a placeholder. If any replica did not receive the delete operation, the tombstone
can later be propagated to those replicas when they are available again. The net effect
of this design is that your data store will not immediately shrink in size following a
delete. Each node keeps track of the age of all its tombstones. Once they reach the age
as configured in gc_grace_seconds (which is 10 days by default), then a compaction is
run, the tombstones are garbage-collected, and the corresponding disk space is
recovered.
Note: Remember that SSTables are immutable, so the data is not deleted from the SSTable. On compaction, tombstones are accounted for, merged data is sorted, a new index is created over the sorted data, and the freshly merged, sorted, and indexed data is written to a single new file.

The assumption is that 10 days is plenty of time for you to bring a failed node back online before compaction runs. If you feel comfortable doing so, you can reduce that grace period to reclaim disk space more quickly.

Note: Because a remove operation is really a tombstone write, you still have to supply a timestamp with the operation, because if there are multiple clients writing, the highest timestamp wins—and those writes might include a tombstone or a new value. Cassandra doesn’t discriminate here; whichever operation has the highest timestamp will win.

client.remove(key, colPath, clock, ConsistencyLevel.ALL);

Batch Mutates
To perform many insert or update operations at once, use the batch_mutate method
instead of the insert method. Like a batch update in the relational world, the
batch_mutate operation allows grouping calls on many keys into a single call in order
to save on the cost of network round trips. If batch_mutate fails in the middle of its list
of mutations, there will be no rollback, so any updates that have already occured up to
this point will remain intact. In the case of such a failure, the client can retry the
batch_mutate operation.

Batch Deletes
You use remove to delete a single column, but you can use a Deletion structure with a
batch_mutate operation to perform a set of complex delete operations at once.
You can create a list of column names that you want to delete, and then indirectly pass
it to batch_mutate:
First, create the list of column names to delete. Pass that list to a SlicePredicate, pass
the SlicePredicate to a Deletion object, pass that to a Mutation object, and finally,
pass that to a batch_mutate.
Example:
SlicePredicate delPred = new SlicePredicate();
List<byte[]> delCols = new ArrayList<byte[]>();
//let's delete the column named 'b', though we could add more
delCols.add("b".getBytes());
delPred.column_names = delCols;
Deletion deletion = new Deletion();
deletion.predicate = delPred;
deletion.clock = clock;
Mutation mutation = new Mutation();
mutation.deletion = deletion;
Map<byte[], Map<String, List<Mutation>>> mutationMap =
new HashMap<byte[], Map<String, List<Mutation>>>();
List<Mutation> mutationList = new ArrayList<Mutation>();
mutationList.add(mutation);
Map<String, List<Mutation>> m = new HashMap<String, List<Mutation>>();
m.put(columnFamily, mutationList);

//just for this row key, though we could add more
mutationMap.put(key, m);
client.batch_mutate(mutationMap, ConsistencyLevel.ALL);


There is a second way to specify items to delete using the Deletion structure: you can
use a SliceRange instead of a List of columns, so you can delete by range instead of
explicitly listing column names.

Range Ghosts
You may sometimes hear people refer to “range ghosts” in Cassandra. This means that
even if you have deleted all of the columns for a given row, you will still see a result
returned for that row in a range slice, but the column data will be empty. This is valid,
and is just something to keep in mind as you iterate result sets on the client.

Programmatically Defining Keyspaces and Column Families
KsDef ksdef = new KsDef();
ksdef.name = "ProgKS";
ksdef.replication_factor = 1;
ksdef.strategy_class =
"org.apache.cassandra.locator.RackUnawareStrategy";
List<CfDef> cfdefs = new ArrayList<CfDef>();
CfDef cfdef1 = new CfDef();
cfdef1.name = "ProgCF1";
cfdef1.keyspace = ksdef.name;
cfdefs.add(cfdef1);

ksdef.cf_defs = cfdefs;
client.system_add_keyspace(ksdef);
System.out.println("Defining new cf.");
CfDef cfdef2 = new CfDef();
cfdef2.keyspace = ksdef.name;
cfdef2.column_type = "Standard";
cfdef2.name = "ProgCF";
client.system_add_column_family(cfdef2);


In this chapter we experienced data reading and writing using a variety of operations offered
by Cassandra’s rich API.

If this post has open your apatite for the subject, you can find much more information in the book itself: Cassandra: The Definitive Guide

The excerpt is from the book, 'Cassandra: The Definitive Guide', authored by Eben Hewit,

 published November 2010 by O’Reilly Media, Copyright 2011 Eben Hewitt.

No comments:

Post a Comment