Issue in full table scan in cassandra

bcfurtado picture bcfurtado · Apr 24, 2015 · Viewed 7.4k times · Source

First: I know isn't a good idea do a full scan in Cassandra, however, at moment, is that what I need.

When I started look for do someting like this I read people saying wasn't possible do a full scan in Cassandra and he wasn't made to do this type of thing.

Not satisfied, I keep looking until I found this article: http://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/

Look like pretty reasonable and I gave it a try. As I will do this full scan only once and time and performance isn't a issue, I wrote the query and put this in a simple Job to lookup all the records that I want. From 2 billions rows of records, something like 1000 was my expected output, however, I had only 100 records.

My job:

public void run() {
    Cluster cluster = getConnection();
    Session session = cluster.connect("db");

    LOGGER.info("Starting ...");

    boolean run = true;
    int print = 0;

    while ( run ) {
        if (maxTokenReached(actualToken)) {
            LOGGER.info("Max Token Reached!");
            break;
        }
        ResultSet resultSet = session.execute(queryBuilder(actualToken));

        Iterator<Row> rows = resultSet.iterator();
        if ( !rows.hasNext()){
            break;
        }

        List<String> rowIds = new ArrayList<String>();

        while (rows.hasNext()) {
            Row row = rows.next();

            Long leadTime = row.getLong("my_column");
            if (myCondition(myCollumn)) {
                String rowId = row.getString("key");
                rowIds.add(rowId);
            }

            if (!rows.hasNext()) {
                Long token = row.getLong("token(rowid)");
                if (!rowIds.isEmpty()) {
                    LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                }
                actualToken = nextToken(token);
            }

        }

    }
    LOGGER.info("Done!");
    cluster.shutdown();
}

public boolean maxTokenReached(Long actualToken){
    return actualToken >= maxToken;
}

public String queryBuilder(Long nextRange) {
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}

public Long nextToken(Long token){
    return token + 1;
}

Basically what I do is search for the min token allowed and incrementally go until the last.

I don't know, but is like the job had not done the full-scan totally or my query had only accessed only one node or something. I don't know if I'm doing something wrong, or is not really possible do a full scan.

Today I have almost 2 TB of data, only one table in one cluster of seven nodes.

Someone already has been in this situation or have some recommendation?

Answer

Jeff Jirsa picture Jeff Jirsa · Apr 28, 2015

It's definitely possible to do a full table scan in Cassandra - indeed, it's quite common for things like Spark. However, it's not typically "fast", so it's discouraged unless you know why you're doing it. For your actual questions:

1) If you're using CQL, you're almost certainly using Murmur3 partitioner, so your minimum token is -9223372036854775808 (and maximum token is 9223372036854775808).

2) You're using session.execute(), which will use a default consistency of ONE, which may not return all of the results in your cluster, especially if you're also writing at ONE, which I suspect you may be. Raise that to ALL, and use prepared statements to speed up the CQL parsing:

 public void run() {
     Cluster cluster = getConnection();
     Session session = cluster.connect("db");
     LOGGER.info("Starting ...");
     actualToken = -9223372036854775808;
     boolean run = true;
     int print = 0;

     while ( run ) {
         if (maxTokenReached(actualToken)) {
             LOGGER.info("Max Token Reached!");
             break;
         }
         SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
         stmt.setConsistencyLevel(ConsistencyLevel.ALL);
         ResultSet resultSet = session.execute(stmt);

         Iterator<Row> rows = resultSet.iterator();
         if ( !rows.hasNext()){
             break;
         }

         List<String> rowIds = new ArrayList<String>();

         while (rows.hasNext()) {
             Row row = rows.next();

             Long leadTime = row.getLong("my_column");
             if (myCondition(myCollumn)) {
                 String rowId = row.getString("key");
                 rowIds.add(rowId);
             }

             if (!rows.hasNext()) {
                 Long token = row.getLong("token(rowid)");
                 if (!rowIds.isEmpty()) {
                     LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                 }
             actualToken = nextToken(token);
             }
         }
      }
     LOGGER.info("Done!");
     cluster.shutdown(); 
  }

public boolean maxTokenReached(Long actualToken){
     return actualToken >= maxToken; 
 }

 public String queryBuilder(Long nextRange) {
     return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
 }

 public Long nextToken(Long token) {
     return token + 1; 
 }