Moving Data [Apache Spark]

So I decided I'm going to use a real world example and do some transformations against such. I decided on, so that I have some nice sized files which I'm able to really see the advantages Apache Spark brings to the table.

My system is loaded with Apache Spark 1.6.0 and Scala 2.10.5.

Lets do this:
  1. First open the spark shell:
    • spark-shell
  2. Next load an SQLContext:
    • val sqlContext = new org.apache.spark.sql.SQLContext(sc) //sc an existing Spark context
  3.  Next import the following packages into the shell session:
    • import sqlContext.implicits._
    • import org.apache.spark.sql._
  4. Now, you can start by loading the data from the "pagecounts-2011222" file into a Resilient Distributed Dataset (RDD). RDDs have transformations and actions; the first() action returns
    the first element in the RDD
  5.  Now load the data from the file into a new RDD:
    • val wikiHits = sc.textFile("/home/osboxes/Downloads/pagecounts-20151222")
  6. Do some actions on the RDD:
    • wikiHits.count() // counts the records in the RDD. My count turns up 7,5 million rows
    • wikiHits.first()
  7. What we need to do now is to create a case class which will correspond to the structure of the file we loaded. Use a Scala case class to define the wiki hits schema corresponding to the
    pagecounts-20151222 file we loaded into the RDD. Then a map() transformation is applied to each element of wiki hits text to create the WikiHit RDD of hits objects.
  8. Define the schema using a case class:
    •  case class WikiHit(lang: String, str: String, cnt: Integer, pageNr: BigDecimal)
  9. Create an RDD of WikiHit objects. As the code below illustrates, the columns in the file are delimited by a space. We also cast the non string columns when doing the mapping:
    • val wiki =" ")).map(p => WikiHit(p(0),p(1),p(2).toInt,p(3).toFloat))
  10. Performing an action like first() will return the first element in this RDD:
    • wiki.first()
    • wiki.count()
  11. A DataFrame is a distributed collection of data organized into named columns.
    Spark SQL supports automatically converting an RDD containing case
    classes to a DataFrame with the method toDF(). Below we change the RDD to a DataFrame:
    • var hit = wiki.toDF()
Now we would like to explore the DataFrame we have set up. Lets have some fun:
  1.  Firstly, lets show the DataFrame:
    • The results show the first 20 rows in the DataFrame:
      • +----+--------------------+---+--------------------+
        |lang|                 str|cnt|              pageNr|
        |  aa|%22/ru/%D0%9A%D0%...|  1|4739.000000000000...|
        |  aa|    1377_m._pr._m._e|  1|4700.000000000000...|
        |  aa|Alpini_d%27Arrest...|  1|4724.000000000000...|
        |  aa|   Claudio_Pollastri|  1|4684.000000000000...|
        |  aa|File:Barnstar_of_...|  1|9156.000000000000...|
        |  aa|File:Dyrh%C3%B3la...|  1|14982.00000000000...|
        |  aa|File:Erik_Zachte_...|  1|7741.000000000000...|
        |  aa|Lichten%C5%A1tein...|  1|4793.000000000000...|
        |  aa|           Main_Page|  1|21408.00000000000...|
        |  aa|MediaWiki:Geodata...|  1|4927.000000000000...|
  2. You notice the pageNr field. We have set it up as a BigDecimal, but without constraints. Therefore the field is of type (38,18) - default.
  3. Using the Scala DataFrame API, the DataFrame can now be queried:
    •"cnt").distinct.count //my result is 908 rows
    •"str").distinct.count  //my result is 6,8 million rows
  4. Maybe not that valuable, but how many of a certain count do we get in a grouping:
    • hit.groupBy("cnt")
  5. Now what is the mininimum number of counts per lang value. What is the average and max:
    • hit.groupBy("lang").count.agg(min("count"), avg("count"), max("count")).show
    • Results:
      • +----------+-----------------+----------+                                       
        |min(count)|       avg(count)|max(count)|
        |         1|6613.508268059182|   2417377|
  6. Filter hit DataFrame to only return where cnt is greater than 10,000:
    • val highhit = hit.filter("cnt > 10000")
  7. Now action this with:
    • //the results will be those whose cnt is more than 10000

If you love your SQL like me, then you can register this DataFrame as a temporary table, further to which you can run ansi sql against:
  1.  val results = sqlContext.sql("SELECT lang, sum(cnt) FROM hit GROUP BY lang")
  3. Results:
      • +------+-----+                                                                  
        |  lang|  _c1|
        |  az.s|  229|
        |  km.b|   14|
        |    si| 1039|
        |  uk.n|   82|
        || 1923|
        || 2693|
        |  et.v|    1|
        |  fi.b|  487|
        |  is.s|   61|
        |   lez|  311|  
And Voila: you have loaded data into a RDD, and then a Spark DataFrame. You have explored this data in tabular format with Spark SQL.

Now its your turn to extend this use case.

Happy Spark 'ing :)


Popular posts from this blog

Moving Data [Spark Streaming - simple data files]

Notepad++ Regular expressions

RSS feeds to your webpage [part 1]