I am evaluating cassandra. I am using the datastax driver and CQL.
I would like to store some data with the following internal structure, where the names are different for each update.
+-------+-------+-------+-------+-------+-------+
| | name1 | name2 | name3 | ... | nameN |
| time +-------+-------+-------+-------+-------+
| | val1 | val2 | val3 | ... | valN |
+-------+-------+-------+-------|-------+-------+
So time should be the column key, and name should be the row key. The CQL statement I use to create this table is:
CREATE TABLE IF NOT EXISTS test.wide (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
I want the schema to be this way for ease of querying. I also have to occasionally store updates with more than 65000 rows. So using the cassandra list/set/map data types is not an option.
I have to be able to handle at least 1000 wide row inserts per second, with a varying but large (~1000) number of name/value pairs.
The problem is the following: I have written a simple benchmark that does 1000 wide row inserts of 10000 name/value pairs each. I am getting very slow performance with CQL and the datastax driver, whereas the version that does not use CQL (using astyanax) has good performance on the same test cluster.
I have read this related question, and in the accepted answer of this question suggests that you should be able to atomically and quickly create a new wide row by using batch prepared statements, which are available in cassandra 2.
So I tried using those, but I still get slow performance (two inserts per second for a small three-node cluster running on localhost). Am I missing something obvious, or do I have to use the lower level thrift API? I have implemented the same insert with a ColumnListMutation in astyanax, and I get about 30 inserts per second.
If I have to use the lower level thrift API:
is it actually deprecated, or is it just inconvenient to use because it is lower level?
will I be able to query a table created with the thrift api with CQL?
Below is a self-contained code example in scala. It simply creates a batch statement for inserting a wide row with 10000 columns and times the insertion performance repeatedly.
I played with the options of BatchStatement and with the consistency level, but nothing could get me better performance.
The only explanation I have is that despite the batch consisting of prepared statements, the entries are added to the row one by one.
package cassandra
import com.datastax.driver.core._
object CassandraTestMinimized extends App {
val keyspace = "test"
val table = "wide"
val tableName = s"$keyspace.$table"
def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""
def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""
def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""
val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
val session = cluster.connect()
session.execute(createKeyspace)
session.execute(createWideTable)
for(i<-0 until 1000) {
val entries =
for {
i <- 0 until 10000
name = i.toString
value = name
} yield name -> value
val batchPreparedStatement = writeMap(i, entries)
val t0 = System.nanoTime()
session.execute(batchPreparedStatement)
val dt = System.nanoTime() - t0
println(i + " " + (dt/1.0e9))
}
def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
val template = session
.prepare(writeTimeNameValue(time.toString))
.setConsistencyLevel(ConsistencyLevel.ONE)
val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
for ((k, v) <- update)
batch.add(template.bind(k, v))
batch
}
}
Here is the astyanax code (modified from an astyanax example) that does essentially the same thing 15 times faster. Note that this also does not use asynchronous calls so it is a fair comparison. This requires the column family to already exist, since I did not yet figure out how to create it using astyanax and the example did not have any code for creating the columnfamily.
package cassandra;
import java.util.Iterator;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
public class AstClient {
private static final Logger logger = LoggerFactory.getLogger(AstClient.class);
private AstyanaxContext<Keyspace> context;
private Keyspace keyspace;
private ColumnFamily<Long, String> EMP_CF;
private static final String EMP_CF_NAME = "employees2";
public void init() {
logger.debug("init()");
context = new AstyanaxContext.Builder()
.forCluster("Test Cluster")
.forKeyspace("test1")
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
)
.withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
.setPort(9160)
.setMaxConnsPerHost(1)
.setSeeds("127.0.0.1:9160")
)
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
.setCqlVersion("3.0.0")
.setTargetCassandraVersion("2.0.5"))
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
context.start();
keyspace = context.getClient();
EMP_CF = ColumnFamily.newColumnFamily(
EMP_CF_NAME,
LongSerializer.get(),
AsciiSerializer.get());
}
public void insert(long time) {
MutationBatch m = keyspace.prepareMutationBatch();
ColumnListMutation<String> x =
m.withRow(EMP_CF, time);
for(int i=0;i<10000;i++)
x.putColumn(Integer.toString(i), Integer.toString(i));
try {
@SuppressWarnings("unused")
Object result = m.execute();
} catch (ConnectionException e) {
logger.error("failed to write data to C*", e);
throw new RuntimeException("failed to write data to C*", e);
}
logger.debug("insert ok");
}
public void createCF() {
}
public void read(long time) {
OperationResult<ColumnList<String>> result;
try {
result = keyspace.prepareQuery(EMP_CF)
.getKey(time)
.execute();
ColumnList<String> cols = result.getResult();
// process data
// a) iterate over columsn
for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
Column<String> c = i.next();
String v = c.getStringValue();
System.out.println(c.getName() + " " + v);
}
} catch (ConnectionException e) {
logger.error("failed to read from C*", e);
throw new RuntimeException("failed to read from C*", e);
}
}
public static void main(String[] args) {
AstClient c = new AstClient();
c.init();
long t00 = System.nanoTime();
for(int i=0;i<1000;i++) {
long t0 = System.nanoTime();
c.insert(i);
long dt = System.nanoTime() - t0;
System.out.println((1.0e9/dt) + " " + i);
}
long dtt = System.nanoTime() - t00;
c.read(0);
System.out.println(dtt / 1e9);
}
}
Update: I found this thread on the cassandra-user mailing list. It seems that there is a performance problem with CQL when doing large wide row inserts. There is a ticket CASSANDRA-6737 to track this issue.
Update2: I have tried out the patch that is attached to CASSANDRA-6737, and I can confirm that this patch completely fixes the issue. Thanks to Sylvain Lebresne from DataStax for fixing this so quickly!
You have a mistake in your code that I think explains a lot of the performance problems you're seeing: for each batch you prepare the statement again. Preparing a statement isn't super expensive, but doing it as you do adds a lot of latency. The time you spend waiting for that statement to be prepared is time you don't build the batch, and time Cassandra doesn't spend processing that batch. A prepared statement only needs to be prepared once and should be re-used.
I think much of the bad performance can be explained latency problems. The bottleneck is most likely your application code, not Cassandra. Even if you only prepare that statement once, you still spend most of the time either being CPU bound in the application (building a big batch) or not doing anything (waiting for the network and Cassandra).
There are two things you can do: first of all use the async API of the CQL driver and build the next batch while the network and Cassandra are busy with the one you just completed; and secondly try running multiple threads doing the same thing. The exact number of threads you'll have to experiment with and will depend on the number of cores you have and if you're running one or three nodes on the same machine.
Running a three node cluster on the same machine makes the cluster slower than running a single node, while running on different machines makes it faster. Also running the application on the same machine doesn't exactly help. If you want to test performance, either run only one node or run a real cluster on separate machines.
Batches can give you extra performance, but not always. They can lead to the kind of problem you're seeing in your test code: buffer bloat. Once batches get too big your application spends too much time building them, then too much time pushing them out on the network, and too much time waiting for Cassandra to process them. You need to experiment with batch sizes and see what works best (but do that with a real cluster, otherwise you won't see the effects of the network, which will be a big factor when your batches get bigger).
And if you use batches, use compression. Compression makes no difference in most request loads (responses are another matter), but when you send huge batches it can make a big difference.
There's nothing special about wide row writes in Cassandra. With some exceptions the schema doesn't change the time it takes to process a write. I run applications that do tens of thousands of non-batched mixed wide-row and non-wide-row writes per second. The clusters aren't big, just three or four m1.xlarge EC2 nodes each. The trick is never to wait for an request to return before sending the next (that doesn't mean fire and forget, just handle the responses in the same asynchronous manner). Latency is a performance killer.