Moving Data [Spark Streaming - simple data files]

Spark Streaming a local directory

Introduction

 For our streaming exercise, we will create two classes (one being a utility class that initially only handle the log4j properties for our application). We will manually compile our classes and use spark-submit to run our application.

Note: For this example, we are creating the logging class inline into our main class.

 Therefore this example helps you to slowly get to grips with the dynamics of Spark Scala programming, and, slowly grows your knowledge. We will not use the "spark/bin/run-example" script that most examples use, as I want you to understand step by step what you are doing. You are also more easily able to alter your code by doing it my way.

Use Case

 We are creating a streaming Apache Spark Scala program that reads a directory for new files and counts the amount of words in the file.

Example

 Create our class FileStream.scala (which contains the StreamingUtility class), which will for this exercise, will be our complete code:


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.Logging

object FileStream {
    def main(args: Array[String]) {
        if (args.length < 1) {
            System.err.println("Usage: FileStream <directory>")
            System.exit(1)
        }

        StreamingUtility.setStreamingLogLevels()
        val sparkConf = new SparkConf().setAppName("hdfsWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(2))

        //Create the FileInputDStream on the directory and use the stream to count new file words
        val lines = ssc.textFileStream(args(0)) //A Discretized Stream (DStream)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_+_)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }


    /**
    *
    * Logging class
    */

    object StreamingUtility extends Logging {

      /** Set reasonable logging levels for streaming if the user has not configured log4j. */
      def setStreamingLogLevels() {
        val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
        if (!log4jInitialized) {
          // We first log something to initialize Spark's default logging, then we override the
          // logging level.

          logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
          Logger.getRootLogger.setLevel(Level.WARN)
        }
      }
    }
}

 After you have saved this file to your /spark/spark-application/ folder, you can open a terminal, and navigate to this directory. Next you run the following command to compile your class:
  • $ scalac -classpath "spark-core_2.10-1.6.1.jar:/usr/share/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar" FileStream.scala

note: /usr/share/spark/lib is the lib folder path to where spark was installed on your system


 The next thing you want to do is to jar up the compiled files and create a jar file for easier deployment. Run the following line to create your jar file named SparkStream.jar:
  • $ jar -cvf SparkStream.jar FileStream*.class "spark-core_2.10-1.6.1.jar:/usr/share/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar"
 As the next step, we will execute our little streamer with spark-submit. All you need to do is run the below command:
  • $ spark-submit --class FileStream --master local SparkStream.jar /usr/share/spark/spark-application/files/
 The above command executes class FileStream. It executes with a single thread (i.e. local). The class accepts a single argument which is the folder path where new files will be dropped into for processing by this application (i.e. /usr/share/spark/spark-application/files/)

Running
 Looking at the spark shell you should see logging text printed to screen every second. However, if you havent placed any files in your read folder, then that is the only text you will see.
 Now comes the fun part. Place some text files in your read folder (i.e. /usr/share/spark/spark-application/files/), and see what is printed in the spark shell. Yippeeeeeee!!!!!

Conclusion

 I hope you liked this example. What I like about it, is that its clear and concise. This Scala program polls a directory every 2 seconds and reads new files. The word count in the file is retrieved and printed on the screen. From this example you can extend it to what you heart desires.

 Happy Spark'ing

www.silvafox.co.za

Comments

Popular posts from this blog

"Bigness" data - some thoughts about the big data journey

Code Reviews

Moving Data [Apache Spark]