The first thing to do is to run your code locally (in non-distributed mode). This does not require you to be running on a Hadoop cluster or even to have Hadoop installed on your local machine. The way to do that is simply to run your application normally under the JVM. You can do that directly using java, but specifying the right class path is tricky; as a result, it's often easier to run using sbt run-main. For example, if your main class is called mypackage.myapp.RunMyApp, run something like the following command from the top-level directory of your SBT project:
sbt "run-main mypackage.myapp.RunMyApp input-files output"
Only when your application runs and works locally should you try running on a Hadoop cluster. In order to do so you will probably need to build an assembly, i.e. a self-contained JAR file containing all of the libraries needed. This can be done using the sbt-assembly plugin to SBT. See the Quick Start page in this User Guide for more information.
Once you have built the assembly, you can run it by logging into the job tracker and running it using hadoop, e.g. as follows:
hadoop jar ./target/MyApp-assembly-0.1.jar mypackage.myapp.RunMyApp input-files output
Scoobi provides testing support to make your coding experience as productive as possible. It is tightly integrated with specs2 out-of-the-box, but you can reuse the testing traits with your own favorite testing library.
First of all you need to add the specs2 dependency to your build.sbt file (use the same version that Scoobi is using)
The abstract class com.nicta.scoobi.testing.mutable.HadoopSpecification is the base class for testing Hadoop jobs. Here's an example showing how to use it:
import com.nicta.scoobi.testing.mutable._
class WordCountSpec extends HadoopSpecification {
"Counting words frequencies must return the frequency for each word" >> { conf: ScoobiConfiguration =>
// your Scoobi code goes here
}
}
This specification does several things for you:
ScoobiConfiguration for each example so that you should be able to run your examples concurrently$HADOOP_HOME/conf directory. If the $HADOOP_HOME variable is not set or the properties not found you will get an exception at runtimescoobi cluster to the command line arguments. If you want some cluster execution but no local execution, use scoobi !local.cluster.~/libjars by default). This upload is only done for missing jars on the clusterYou can change every step in the process above and create your own Specification trait with a different behavior:
the fs and jobTracker properties comes from the application.Cluster trait and you can override them with hardcoded values so that you don't depend on what's on your build server
you can change the execution context of the examples by overriding the context method and returning local or cluster instead of localThenCluster which is the default specs2 context. The same thing is achievable on the sbt command line by using the exclude argument: test-only *WordCount* -- exclude cluster will only run locally.
the directory for loading the jars is defined by the libjarsDirectory method which you can override. More generally you can change the loading and distribution of jars by overriding methods of the application.LibJars trait
The command-line arguments which you can use to execute a Scoobi application are also applicable when running specs2 specifications. Here are a few examples:
// enable logging with all the categories
sbt> test-only *WordCountSpec* -- scoobi verbose.all
// enable logging but filter the categories with a regular expression
sbt> test-only *WordCountSpec* -- scoobi verbose.all.[scoobi.impl.*]
// only run in memory
sbt> test-only *WordCountSpec* -- scoobi !local
// only run in memory and on the cluster
sbt> test-only *WordCountSpec* -- scoobi !local.cluster
// only run one example but not in memory, locally only (with all log categories)
sbt> test-only *WordCountSpec* -- -ex my first example -- scoobi !inmemory.verbose.all
// only run the examples with the tag 'amazon EC2', on the cluster
sbt> test-only *WordCountSpec* -- -include amazon EC2 -- scoobi !inmemory.!local.cluster
By default, all the examples of a specification are executed concurrently, which is why each example needs to be passed its own ScoobiConfiguration instance. If you prefer having a sequential execution (with the sequential specs2 argument) you can omit the explicit passing of the ScoobiConfiguration object:
class WordCountSpec extends HadoopSpecification {
sequential
"Counting words frequencies must return the frequency for each word" >> {
// your Scoobi code goes here
}
}
If you only have one cluster for your testing you can hardcode the fs and jobTracker properties by overriding the corresponding methods:
class WordCountSpec extends HadoopSpecification {
override def fs = "hdfs://svm-hadoop1.ssrg.nicta.com.au"
override def jobTracker = "svm-hadoop1.ssrg.nicta.com.au:8021"
...
}
This will be especially useful if you execute your specifications on a build server where Hadoop is not installed or configured.
The display of Hadoop and Scoobi logs can be controlled by passing command-line arguments. By default logs are turned off (contrary to a ScoobiApp) but they can be turned on by using the verbose arguments. See the Application in this User Guide to learn how to set log levels and log categories.
Passing the configuration to each example is a bit verbose so you can use a type alias to shorten it:
class WordCountSpec extends HadoopSpecification {
type SC = ScoobiConfiguration
"Counting words frequencies must return the frequency for each word" >> { implicit conf: SC =>
// your Scoobi code goes here
}
}
The HadoopSpecification class allows to create any kind of job and execute them either locally or on the cluster. The SimpleJobs trait is an additional trait which you can use to:
DList representing this dataDList API"run" the DList and get the results as a Seq[T]
class WordCountSpec extends HadoopSpecification with SimpleJobs {
"getting the size of words" >> { implicit c: SC =>
val list: List[String] = fromInput("hello", "world")
list.map(_.size).run must_== Seq("5", "5")
}
}
fromInput creates a temporary file and a new DList from a TextInput. Then the run method persists the DList and retrieves the results. At the end of the tests the temporary files are deleted unless the keepFiles argument is set on the command line
Other jobs might be slightly more complex and require inputs coming from several files:
"Numbers can be partitioned into even and odd numbers" >> { implicit sc: SC =>
val numbers = fromInput((1 to count).map(i => r.nextInt(count * 2).toString):_*).map((_:String).toInt)
val (evens, odds) = numbers.partition(_ % 2 == 0).run
forall(evens.map(_.toInt))(i => i must beEven)
forall(odds.map(_.toInt))(i => i must beOdd)
}
Some of the functionalities described above has been extracted into traits in the application package which you can reuse with your own test framework:
ScoobiAppConfiguration provides a ScoobiConfiguration configured from the HADOOP_HOME/conf files
LocalHadoop provides the onLocal method to execute Hadoop code locally
Hadoop extends the LocalHadoop with the onCluster method to execute a Hadoop job on the cluster
ScoobiUserArgs parses command line arguments to extract meaningful values (quiet, showTimes,...) for the Hadoop trait
LibJars distributes the dependent jars to the cluster
Here are a few tips to help you in debugging your application:
allow some tasks to fail without the entire job failing by using the settings mapred.max.map.failures.percent and mapred.max.reduce.failures.percent. For example, the following command
hadoop jar ./myapp.jar mypackage.RunMyApp -Dmapred.max.map.failures.percent=20 -Dmapred.max.reduce.failures.percent=20 input output
will allow up to 20% of map and reduce tasks to fail while still allowing the operation as a whole to proceed
Scoobi logs additional debugging information at the DEBUG logging level. Unfortunately, Apache Commons Logging provides no means of programmatically changing the logging level. To do this you have to reach down directly into log4j, the underlying logging implementation used in Hadoop, as follows:
import org.apache.log4j.{Level=>JLevel,_}
{
...
LogManager.getRootLogger().setLevel(JLevel.DEBUG.asInstanceOf[JLevel])
...
}