Friday, March 7, 2014

Spark on Cassandra (w/ Calliope)


We all know that reporting off of data stored in NoSQL repositories can be cumbersome.  Either you built the queries into your data model, or you didn't.  If you are lucky, you've paired Cassandra with an indexer like SOLR or Elastic Search, but sometimes an index isn't enough to perform complex analytics on your data.  Alternatively, maybe you just want to do a simple transformation on the data.  That is often easier said than done.

What we all need is a generic way to run functions over data stored in Cassandra.   Sure, you could go grab Hadoop, and be locked into articulating analytics/transformations as MapReduce constructs.  But that just makes people sad.  Instead, I'd recommend Spark.  It makes people happy.

When I set out to run Spark against Cassandra however, I found relatively little information.  This is my attempt to rectify that.   If you are impatient, you can just go clone the repo I made:
https://github.com/boneill42/spark-on-cassandra-quickstart

Get Calliope

First stop, Calliope.
http://tuplejump.github.io/calliope/
Then go here so you know how to pronounce it. =)

Again, for reasons I've mentioned before,  I wanted to access Cassandra via CQL.  Unfortunately, at the time of this writing, the CQL version of Calliope wasn't generally available.  You need to submit for early access.  Fortunately, Rohit and crew are very responsive.  And once you have access, you can create a new project that uses it.  Drop the dependency in your pom.

<dependency>
    <groupId>com.tuplejump</groupId>
    <artifactId>calliope_2.9.3</artifactId>
    <version>0.8.1-EA</version>
</dependency>

Get Scala'd Up

Now, if you want to fully immerse yourself in the Spark experience, you'll want to develop in Scala.  For me, that meant switching over to IntelliJ because I had some challenges using Eclipse with specific (older) versions of Scala. Calliope 0.8.1 early access was compiled with Scala 2.9.3. So you'll want an IDE that supports that version.  To get maven support for scala, drop the following into your pom:

<pluginRepositories>
   <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
   </pluginRepository>
</pluginRepositories>

<dependency>
   <groupId>org.scalatest</groupId>
   <artifactId>scalatest_2.9.3</artifactId>
   <version>2.0.M5b</version>
</dependency>

<plugin>
   <groupId>org.scala-tools</groupId>
   <artifactId>maven-scala-plugin</artifactId>
   <version>2.15.2</version>
   <executions>
      <execution>
         <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
         </goals>
      </execution>
   </executions>
   <configuration>
      <scalaVersion>2.9.3</scalaVersion>
      <launchers>
         <launcher>
            <id>run-scalatest</id>
            <mainClass>org.scalatest.tools.Runner</mainClass>
            <args>
               <arg>-p</arg>
               <arg>${project.build.testOutputDirectory}</arg>
            </args>
            <jvmArgs>
               <jvmArg>-Xmx512m</jvmArg>
            </jvmArgs>
         </launcher>
      </launchers>
      <jvmArgs>
         <jvmArg>-Xms64m</jvmArg>
         <jvmArg>-Xmx1024m</jvmArg>
      </jvmArgs>
   </configuration>
</plugin>


Get Spark

Now, the simple part.  Add Spark. =)
<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-core_2.9.3</artifactId>
   <version>0.8.1-incubating</version>
</dependency>

Sling Code

Now, that we have our project stood up.  Let's race over a column family and do some processing!

All of the code to perform a Spark job is contained in FindChildrenTest.  There are two components: a transformer and the job.  The transformer is very similar to the Mapper concept that we have in storm-cassandra-cql.  The transformer translates CQL rows into objects that can be used in the job.  Here is the code for the transformer:

private object Cql3CRDDTransformers {
  import RichByteBuffer._
  implicit def row2String(key: ThriftRowKey, row: ThriftRowMap): List[String] = {
    row.keys.toList
  }
  implicit def cql3Row2Child(keys: CQLRowKeyMap, values: CQLRowMap): Child = {
    Child(keys.get("child_id").get, values.get("first_name").get, values.get("last_name").get, values.get("country").get, values.get("state").get, values.get("zip").get)
  }
}


The only real important part is the function that translates a row (keys and values) into the Child object.

 With a transformer in place, it is quite simple to create a job:

class FindChildrenTest extends FunSpec {
  import Cql3CRDDTransformers._
  val sc = new SparkContext("local[1]", "castest")
  describe("Find All Children in IRL") {
    it("should be able find children in IRL") {
      val cas = CasBuilder.cql3.withColumnFamily("northpole", "children")
      val cqlRdd = sc.cql3Cassandra[Child](cas)
      val children = cqlRdd.collect().toList
      children.filter((child) => child.country.equals("IRL")).map((child) => println(child))
      sc.stop()
    }
  }
}


The first line connects to a keyspace, table.  For this example, I reused a schema from my webinar a few years ago.  You can find the cql here.  The second line creates a Resilient Distributed Dataset (RDD) containing Child objects.  An RDD is the primary dataset abstraction in Spark. Once you have an RDD, you can operate on that RDD as if it were any other map.  (pretty powerful stuff)

In the code above, we filter the RDD for children in Ireland.  We then race over the result, and print the children out.  If all goes well, you should end up with the following output:

Child(collin.oneill,Collin,O'Neill,IRL,D,EI33)
Child(owen.oneill,Owen,O'Neill,IRL,D,EI33)

OK -- That should be enough to make you dangerous.  I have to give a *HUGE* pile of kudos to Rohit Rai and his team at TupleJump for developing the Calliope project. They are doing great things at TupleJump.  I'm keeping an eye on Stargate and Cash as well.  In fact, next time, I'll take this a step further and show Shark against Cassandra using Cash.

No comments: