This post continues from where we left here. I will describe how Spark is installed and develop a Spark application for extracting bus journey time data correlated with the local weather. In the following post, I will first analyze the results with R and then show how to run the application in a cluster, first locally and then on Amazon Elastic MapReduce. The companion Github project provides the source code and more detailed instructions.
The sample project uses Scala as its programming language. Scala is the native language of Spark, and also well suited for parallel programming. Spark also supports Java and Python. Check out the Spark programming guide for further details.
The examples have been tested on Mac OS X and Ubuntu 14, and should work as such on any *nix system. They should also work on Windows with minimal modifications to the shell commands.
About the case study targets
As described earlier, the objective of this case study is to analyze Tampere city bus transportation journey times on specific bus lines and routes, and correlate the variations with the following parameters:
- Day classification: the month and season, the weekday, whether it is a working day, weekend or school holiday.
- Local weather: the temperature, the amount of rain
The bundled data sample contains such a limited amount of data that no far-reaching conclusions can be made from it, but hopefully it will be enough to illustrate the concepts.
Choice of the programming language
Spark runs on the Java Virtual Machine (JVM) and is, therefore, compatible with the vast number of open source and commercial libraries available in the Java ecosystem. As mentioned above, there are three choices for programming Spark applications.
Scala is the native language of Spark. Scala provides the “best of” both object-oriented and functional programming. It is very well suited for solving parallel computation problems. Scala has very advanced features, which can feel a bit overwhelming in the beginning. However, especially Java programmers can learn Scala in baby-steps (but naturally should not simply stick with using only Java idioms). Some rationale for the choice can be found here.
Python is ubiquitous and also extremely popular among data scientists. Spark’s Python support is called PySpark. Spark’s Python documentation is here.
Java is also supported. Even with Java 8, Java programs are more verbose than their Python and Scala counterparts. Java nevertheless provides a solid and familiar foundation for writing Spark applications.
I chose to use Scala for this project, both because I have a strong Java background, it’s the Spark team’s choice and because learning Scala was long overdue on my personal todo list.
Spark installation and initial project set-up
First, we need to make sure all the pre-requisites are in place. Then I will show how to build and run the initial sample application.
Pre-requisites
- Java 7, or greater. OpenJDK is fine.
- Scala 2.10 (used instead of the current 2.11 since it’s the Amazon default): Download page
- SBT (Scala build tool): Set-up instructions
- Apache Spark 1.5.2: Download page
- 1) Spark release 1.5.2
- 2) Package type: pre-built for Hadoop 2.6)
- A good IDE with Scala support will help. I use IntelliJ Idea with the Scala plugin. Scala-IDE is another good alternative.
Building and running the sample application
The sample can be obtained by cloning it
git clone https://github.com/nuvostaq/BigDataSpark.git
Next, check out the initial sample by running
cd BigDataSpark
git checkout p2-initial
The sample can be built by running (ensure sbt is on your path)
sbt assembly
The project uses the SBT assembly plugin to bundle the necessary dependencies into an über-JAR. However, the dependencies are defines as “provided” whenever they are provided by Spark. More details about the project files can be found on the project wiki. Please check it out also in case of build issues.
Now that we have installed all the pre-requisites and built our JAR, it’s time to run the first application:
export MAIN=com.nuvostaq.bigdataspark.BusDataDriver
export APPJAR=target/scala-2.10/BigDataSpark-assembly-0.1.0.jar
export DATADIR=src/test/resources
$SPARKHOME/bin/spark-submit --class $MAIN $APPJAR $DATADIR/BusRoute.16784.21.5.gz $DATADIR/localweather.16786.4.0.gz $DATADIR/BusActivity.16785.8.0.gz /tmp/sparkout
Spark-submit1 is used to run Spark application either locally, on a Spark managed cluster or on a cluster managed by YARN or Apache Mesos. In the example above, I specify the main class of the driver program and the JAR. The rest of the parameters are command line parameters for our driver. In this case:
- BusRoute containing the raw location data of the bus routes2
- localweather containing the temperature and rain amount
- BusActivity containing the bus locations recorded at 5-second intervals
- Output directory name template, where the results will be written (e.g., /tmp/sparkout.buses/part-00000)
At the moment, the driver program looks like this.
It simply loads the input data (Spark automatically handles zipped files) to RDDs and saves the result. Spark handles input on a per-file and per-line basis. The content of each gzipped file is always processed by a single machine (gzip is a non-splittable compression format). As long as the files are not too big, this shouldn’t become an issue. If it does, then some other compression scheme, e.g., snappy, should be selected. Among the various log messages3 produced by spark-submit, you should be able to spot the following three lines:
# route entries = 2
# weather entries = 1
# bus activity entries = 115
The output from the final stage of the driver can be viewed like this:
gzcat /tmp/sparkout.dist/part-00000.gz
(16785-12-Hallila-0725,RoutePoint(12,Hallila - Keskustori P,0725,1450245589025,9330.227504218083))
The output contains a pair RDD4, indexed by a combination of the epoch day, line number, start point, and bus id (scheduled start time HHMM). The value contains a RoutePoint object with partially overlapping information, and the actual payload; the time stamp and current distance from the start for each sample.
Now we have managed to build our first application and run it. In the next post, I will show how Spark can be used to extract data from the bus timing and weather data sources.
Footnotes
2: Note that the file names contain the "Epoch day" (the number of calendar days since the Unix epoch) on which the file was created. Weather data is read on the following day. Hence the epoch day is greater by one. And since the bus routes don't change that often, its epoch day is also different.
3: Logging (provided by log4j) level can be set in $SPARKHOME/conf/log4j.properties. log4j.properties.template can be used as the starting point. 'log4j.rootCategory=INFO, console' controls the log level (WARN, INFO, DEBUG are valid log levels)
4: Pair RDDs consist of key-value pairs. Spark provides a number of useful operations on pair RDDs.