DList
objects are merely nodes in a graph describing a series of data computation we want to perform. However, at some point we need to specify what the inputs and outputs to that computation are. In the WordCount example we simply use in memory data and we print out the result of the computations. However the data used by Hadoop jobs is generally loaded from files and the results persisted to files. Let's see how to specify this.
Most of the time when we create DList
objects, it is the result of calling a method on another DList
object (e.g. map
). Loading, on the other hand, is the only way to create a DList
object that is not based on any others. It is the means by which we associate a DList
object with some data files on HDFS. Scoobi provides functions to create DList
objects associated with text files on HDFS, which are implemented in the object com.nicta.scoobi.io.text.TextInput
.
There are a number of ways in which to construct a DList
object from a text file. The simplest is fromTextFile
. It takes one or more paths (globs are supported) to text files on HDFS (or whichever file system Hadoop has been configured for) and returns a DList[String]
object, where each element of the distributed list refers to one of the lines of text from the files:
// load a single text file
val lines1: DList[String] = fromTextFile("hdfs://path/to/file")
// load multiple text files
val lines2: DList[String] = fromTextFile("hdfs://path/to/file1", "hdfs://path/to/file2")
// load from a list of text files
val lines3: DList[String] = fromTextFile(Seq("hdfs://path/to/file1", "hdfs://path/to/file2"):_*)
Whilst some problems involve working with entire lines of text, often it's the case that we are interested in loading delimited text files, for example, comma separated value (CSV) or tab separated value (TSV) files and want to extract values from fields. In this case, we could use fromTextFile
followed by a map
that pulls out fields of interest:
// load CSV with schema "id,first_name,second_name,age"
val lines: DList[String] = fromTextFile("hdfs://path/to/CVS/files/*")
// pull out id and second_name
val names: DList[(Int, String)] = lines map { line =>
val fields = line.split(",")
(fields(0).toInt, fields(2))
}
Given that these types of field extractions from delimited text files are such a common task, Scoobi provides a more convenient mechanism for achieving this:
// load CSV and pull out id and second_name
val names: DList[(Int, String)] = fromDelimitedTextFile("hdfs://path/to/CVS/files/*", ",") {
case AnInt(id) :: first_name :: second_name :: age :: _ => (id, second_name)
}
As this example illustrates, the call to fromDelimitedTextFile
takes a number of arguments. The first argument specifies the path and the second is the delimiter, in this case a comma. Following is a second parameter list that is used to specify how to extract fields once they are separated out. This is specified by supplying a partial function that takes a list of separated String
fields as its input and returns a value whose type will set the type of the resulting DList
- i.e. a PartialFunction[List[String], A]
will create a DList[A]
(where A
is (Int, String)
above). In this example, we use Scala's pattern matching feature to pull out the four fields and return the first and third.
In addition Scoobi also provides a number of extractors for automatically checking and converting of fields to an expected type. In the above example, the AnInt
extractor is used to specify that the id
field must be an integer in order for the case
statement to match. In the case of a match, it also has the effect of typing id
as an Int
. Field extractors are provided for Int
, Long
, Double
and Float
(called AnInt
, ALong
, ADouble
, AFloat
).
One of the advantages of using fromDelimitedTextFile
is that we have at our disposal all of the Scala pattern matching features, and because we are providing a partial function, any fields that don't match against the supplied pattern will not be present in the returned DList
. This allows us to implement simple filtering inline with the extraction:
// load CSV and pull out id and second_name if first_name is "Harry"
val names = fromDelimitedTextFile("hdfs://path/to/CSV/files/*", ",") {
case AnInt(id) :: "Harry" :: second_name :: age :: _ => (id, second_name)
}
We can of course supply multiple patterns:
// load CSV and pull out id and second_name if first_name is "Harry" or "Lucy"
val names: DList[(Int, String)] = fromDelimitedTextFile("hdfs://path/to/CSV/files/*", ",") {
case AnInt(id) :: "Harry" :: second_name :: age :: _ => (id, second_name)
case AnInt(id) :: "Lucy" :: second_name :: age :: _ => (id, second_name)
}
And, a more interesting example is when the value of one field influences the semantics of another. For example:
val thisYear: Int = 2013
// load CSV with schema "event,year,year_designation" and pull out event and how many years ago it occurred
val yearsAgo: DList[(String, Int)] = fromDelimitedTextFile("hdfs://path/to/CSV/files/*", ",") {
case event :: AnInt(year) :: "BC" :: _ => (event, thisYear + year - 1) // No 0 AD
case event :: AnInt(year) :: "AD" :: _ => (event, thisYear - year)
}
Files accessed with Hive can typically be stored by date:
+ out
|
+ year=2014
|
+ month=01
|
+ day=30
| part.txt
+ day=31
| part.txt
+ month=02
|
+ day=01
| part.txt
+ day=02
| part.txt
In this case, part of the file path is meaningful and we want associate each value read in the file with its corresponding path:
// read the values
val list: DList[(String, String)] = fromTextFileWithPath("out/*/*/*")
// parse the path component to extract a date
val toDate: String => java.util.Date = ??? // date parsing function
val dated: DList[(java.util.Date, String)] = list.mapKeys(toDate)
Sequence files are the built-in binary file format used in Hadoop. Scoobi provides a number of ways to load existing Sequence files as DList
s as well as for persisting DList
s as Sequence files. For more detail refer to the API docs for both Sequence file input and output.
In a Sequence file there are key-value pairs where the types of the key and value must be Writable
(i.e. are classes that implement the Writable
interface). Given a Sequence file of Writable
key-value pairs, a DList
can be constructed:
// load a sequence file
val events1: DList[(TimestampWritable, TransactionWritable)] = fromSequenceFile("hdfs://path/to/transactions")
// alternatively, you can specify the key and value types
val events2 = fromSequenceFile[TimestampWritable, TransactionWritable]("hdfs://path/to/transactions")
In this example, a Sequence file is being loaded where the key is of type TimestampWritable
and the value is of type TransactionWritable
. The result is a DList
paramterised by the same key-value types. Note that whilst the classes associated with the key and value are specified within the header of a Sequence file, when using fromSequenceFile
they must also be specified. The signature of fromSequenceFile
will enforce that the key and value types do implement the Writable
interface, however, there are no static checks to ensure that the specified types actually match the contents of a Sequence file. It is the responsibility of the user to ensure there is a match else a run-time error will result.
Like fromTextFile
, fromSequenceFile
can also be passed multiple input paths as long as all files contain keys and values of the same type:
// load multiple sequence file
val events1: DList[(TimestampWritable, TransactionWritable)] =
fromSequenceFile("hdfs://path/to/transactions1", "hdfs://path/to/transaction2")
// load from a list of sequence files
val transactionFiles = List("hdfs://path/to/transactions1", "hdfs://path/to/transaction2")
val events2: DList[(TimestampWritable, TransactionWritable)] = fromSequenceFile(transactionFiles)
In some situations only the key or value needs to be loaded. To make this use case more convient, Scoobi provides two additional methods: keyFromSequenceFile
and valueFromSequenceFile
. When using keyFromSequenceFile
or valueFromSequenceFile
, Scoobi ignores the value or key, respectively, assuming it is just some Writable
type:
// load keys only from an IntWritable-Text Sequence file
val ints: DList[IntWritable] = keyFromSequenceFile("hdfs://path/to/file")
// load values only from an IntWritable-Text Sequence file
val strings: DList[Text] = valueFromSequenceFile("hdfs://path/to/file")
Hadoop's Sequence files provide a convenient mechanism for persisting data of custom types (so long as they implement Writable
) in a binary file format. Hadoop also includes a number of common Writable
types, such as IntWritable
and Text
that can be used within an application. For Sequence files containing keys and/or values of these common types, Scoobi provides additional convenience methods for constructing a DList
and automatically converting values to common Scala types:
// load a IntWritable-Text sequence file
val data: DList[(Int, String)] = fromSequenceFile("hdfs://path/to/file")
In the above code, a Sequence file of IntWritable
-Text
pairs is being loaded as a DList
of Int
-String
pairs. Just as with fromSequenceFile
, type annotations are necessary, but in this case, the (Int, String)
annotation is signalling that the Sequence file is contains IntWritable
-Text
pairs, not Int
-String
pairs. The table below lists the Writable
conversions supported by fromSequenceFile
:
Writable type | Scala type |
---|---|
BooleanWritable |
Boolean |
IntWritable |
Int |
FloatWritable |
Float |
LongWritable |
Long |
DoubleWritable |
Double |
Text |
String |
ByteWritable |
Byte |
BytesWritable |
Traversable[Byte] |
Conversion support for BytesWritable
is interesting as the type of Scala collection it converts to is not fixed and can be controlled by the user. For example, it is possible to specify conversion to List[Byte]
or Seq[Byte]
:
// load a DoubleWritable-BytesWritable sequence file
val data1: DList[(Double, List[Byte])] = fromSequenceFile("hdfs://path/to/file")
// also ok
val data2: DList[(Double, Seq[Byte])] = fromSequenceFile("hdfs://path/to/file")
Avro is a language-agnostic specification for data serialization. From a Hadoop perspective it has a lot of the attributes of Sequence files with the addition of features such as evolvable schemas.
Avro schemas describe the structure of data and are the key to creating or loading an Avro file. Scoobi provides a mechansim for mapping between Avro schemas and Scala types such that an Avro file can be easily loaded as a DList
with the correct type parameterization, and a DList
can be easily persisted as an Avro file with the correct schema.
The mechanism for mapping between Avro schemas and Scala types is the AvroSchema
type class. Instances are provided for all Scala types that have sensible mappings to Avro schema elements:
Scala type | Avro Schema |
---|---|
Boolean |
boolean |
Int |
int |
Float |
gloat |
Long |
long |
Double |
double |
String |
string |
Traversable[_] |
array |
Array[_] |
array |
Map[_,_] |
map |
Tuple2[_,_] |
record |
Tuple3[_,_,_] |
record |
Tuple4[_,_,_,_] |
record |
Tuple5[_,_,_,_,_] |
record |
Tuple6[_,_,_,_,_,_] |
record |
Tuple7[_,_,_,_,_,_,_] |
record |
Tuple8[_,_,_,_,_,_,_,_] |
record |
Note that, like Avro schemas, the Scala types can be fully nested. For example, the Scala type:
(Int, Seq[(Float, String)], Map[String, Int])
would map to the Avro schema:
{
"type": "record",
"name": "tup74132vn1nc193418", // Scoobi-generated UUID
"fields" : [
{
"name": "v0",
"type": "int"
},
{
"name": "v1",
"type": {
"type": "array",
"items": {
"type": {
"type": "record",
"name": "tup44132vr1ng198419",
"fields": [
{
"name": "v0",
"type": "float"
},
{
"name": "v1",
"type": "string"
}
]
}
}
}
},
{
"name": "v2",
"type": {
"type": "map",
"values": "int"
}
}
]
}
The method fromAvroFile
is used to load an Avro file as a DList
:
val xs = fromAvroFile[(Int, Seq[(Float, String)], Map[String, Int])]("hdfs://path/to/file")
As with fromSequenceFile
, the compiler needs to know the type of avroFile you are loading. If the file doesn't match this schema, a runtime error will occur. fromAvroFile
has a default argument checkSchemas
that tries to fail-fast by verifying the schema matches.
Note that for compilation to succeed, there must be an AvroSchema
instance for the particular type you are using. For example, the following will fail unless an AvroSchema
type class instance for Person
is implemented and in scope:
// assuming case class Person(name: String, age: Int)
// will not compile, unless you provide an AvroSchema
val people = fromAvroFile[Person]("hdfs://path/to/file")
However, there is is a scala-avro plugin to make this pretty painless (See: examples/avro for an example)
And naturally, fromAvroFile
supports loading from multiple files:
// load multiple Avro files
val xs1: DList[(Int, String, Float)] = fromAvroFile("hdfs://path/to/file1", "hdfs://path/to/file2")
// load from a list of Avro file
val files = Seq("hdfs://path/to/file1", "hdfs://path/to/file2")
val xs2: DList[(Int, String, Float)] = fromAvroFile(files)
Any type that extends org.apache.avro.generic.GenericContainer
Scoobi knows how to generate a WireFormat for. This means that Scoobi is capable of seamlessly interoperating with the Java classes, including the auto-generated ones (and sbt-avro is capable of generating a Java class for a given Avro record/protocol. See examples/avro
for an example of this plugin in action
It is also possible to load and persist GenericRecord
s even if you don't know the schema. You can indeed access the schema and all the fields at run-time like this:
// load generic records
fromAvroFile[GenericRecord]("path").map { record =>
// you can get the schema and do the mapping based on its structure
if (record.getSchema.getFields.size == 1) record.get(0).asInstanceOf[Int]
else 1 // default value
}
Because Scoobi is a library for constructing Hadoop applications, data input and output is typically synonymous with file input and output. Whilst Scoobi provides numerous mechanism for creating new DList
objects from files (and multiple file types), it also has some simple ways for constructing a DList
without files.
The simplest way of creating a new DList
object is to use the DList
companion object's apply
method. This behaves just like the Scala List
version:
// create a DList[Int] object
val ints = DList(1, 2, 3, 4)
// create a DList[String] object
val strings = DList("bob", "mary", "jane", "fred")
// create a DList[(String, Int)] object
val ages = DList(("bob", 12), ("mary", 33), ("jane", 61), ("fred", 24))
As a convenience, the apply
method is also overloaded to handle the special case of integer ranges. This allows a DList
of Int
values to be constructed than can span a range:
// all integers from 0 to 1023
val manyInts: DList[Int] = DList(0 to 1023)
Whilst using apply
is simple, this is typically not all that useful in practice. The purpose of a DList
is to abstract large volumes of data. Using the apply
method in this way, only memory-bound data sizes can be handled. As an alternative, the tabulate
method can be used to create much larger DList
objects where an element value can be specified by a function applied to an element index. This is particularly useful for creating randomized DList
objects:
// random integer values
val randomInts = DList.tabulate(1000 * 1000)(_ => Random.nextInt)
// words pairs taken randomly from a bag of words
val words: Seq[String] = Seq(???)
def hash(i: Int) = (i * 314 + 56) % words.size
val randomWords: DList[(String, String)] = DList.tabulate(1000 * 1000)(ix => (words(hash(ix)), words(hash(ix + 1))))
Otherwise if you want to avoid the sequence of elements to be created as soon as the DList is created but only when MapReduce jobs are executed you can use the fromLazySeq
method:
val dontEvaluateNow_! : Seq[Int] = ???
val list = fromLazySeq(dontEvaluateNow_!)
// since we evaluate the sequence only at the latest time, we can not know its size in advance to compute the number of splits
// so you can specify this number if you know it
val listWithSize = fromLazySeq(dontEvaluateNow_!, seqSize = 1000000)
Finally, for pure convenience, with Scoobi all Scala Traversable
collections can be converted to DList
objects toDList
method:
val wordList = List("hello", "big", "data", "world")
val wordDList: DList[String] = wordList.toDList
val numbersMap = Map("one" -> 1, "two" -> 2, "three" -> 3)
val numbersDList: DList[(String, Int)] = numbersMap.toDList
Scoobi is not locked to loading and persisting the data sources and sinks that have been described. Instead, the Scoobi API is designed in a way to make it relatively simple to implement support for custom data sources and sinks.
We have seen that Scoobi provides many factory methods for creating DList
objects, for example, fromTextFile
and fromAvroFile
. At their heart, all of these methods are built upon a single primitive mechanism: DList
companion object's fromSource
factory method:
def fromSource[K, V, A : WireFormat](source: DataSource[K, V, A]): DList[A] = ???
fromSource
takes as input an object implementing the DataSource
trait. Implementing the DataSource
trait is all that is required to create a DList
from a custom data source. If we look at the DataSource
trait, we can see that it is tightly coupled with the Hadoop InputFormat
interface:
trait DataSource[K, V, A] extends Source {
def inputFormat: Class[_ <: InputFormat[K, V]]
def inputConverter: InputConverter[K, V, A]
def inputCheck(implicit sc: ScoobiConfiguration)
def inputConfigure(job: Job)(implicit sc: ScoobiConfiguration)
def inputSize(implicit sc: ScoobiConfiguration): Long
}
trait InputConverter[K, V, A] {
type InputContext = MapContext[K, V, _, _]
def fromKeyValue(context: InputContext, key: K, value: V): A
}
The core role of a DataSource
is to provide a mechanism for taking the key-value records produced by an InputFormat
and converting them into the values contained within a DList
. Following the type parameters is a good way to understand this:
inputFormat
specifies an InputFormat
classInputFormat
class will produce key-value records of type K
-V
inputConverter
specifies an InputConverter
objectInputConverter
object implements fromKeyValue
which converts a key of type K
and a value of type V
(as produced by the InputFormat
) to a value of type A
fromSource
with this DataSource
object will produce a DList
parameterised on type A
The other methods that must be implemented in the DataSource
trait provide hooks for configuration and giving Scoobi some visibility of the data source:
inputCheck
: This method is called before any MapReduce jobs are run. It is provided as a hook to check the validity of data source input. For example, it could check that the input exists and if notinputConfigure
: This method is provided as a hook to configure the DataSource
. Typically it is used to configure the InputFormat
by adding or modifying properties in the job's Configuration
. ItDataSource
provides input data to.inputSize
: This method should returns an estimate of the size in bytes of the input data source. It does not need to be exact. Scoobi will use this value as one metric in determining how to configure the execution of MapReduce jobs.The following Scala objects provided great working examples of DataSource
implementations in Scoobi:
It is also possible to load and persist DObjects. A DObject
, when persisted, is either stored as a DList[A]
if it is a DObject[Iterable[A]]
or as a DList[A]
containing just one element if it is a DObject[A]
. In the first case, you can load the DObject
by loading the file as a DList[T]
and materialising it:
val sums: DObject[Iterable[Int]] = fromAvroFile[Int]("hdfs://path/to/average").materialise
In the second case you can use methods which are very similar to DList
methods, having object
prepended to them:
val average1: DObject[String] = objectFromTextFile("hdfs://path/to/text/average")
val average2: DObject[Int] = objectKeyFromSequenceFile[Int]("hdfs://path/to/seq/average")
val average3: DObject[Int] = objectFromAvroFile[Int]("hdfs://path/to/avro/average")
Note however that those methods are unsafe. They are merely a shortcut to access the first element of a persisted DList
. A safer possibility is to load a DList
and use the headOption
method to create a DObject
:
val average: DObject[Option[Int]] = fromAvroFile[Int]("hdfs://path/to/avro/average").headOption
DObjects can be loaded from a single value with the DObject.apply
method or the Scoobi.lazyObject
method:
val o1 = DObject("start")
val o2 = lazyObject("don't evaluate now, but only on the cluster!")
Persisting is the mechanism Scoobi uses for specifying that the result of executing the computational graph associated with a DList
object is to be associated with a particular data file on HDFS. There are two parts to persisting:
DList
is to be persisted by using the numerous toXXX
methods available (toTextFile
, toAvroFile
,...)DList
(s) by calling persist
This is an example of persisting a single DList
:
val rankings: DList[(String, Int)] = DList(???)
rankings.toTextFile("hdfs://path/to/output").persist
And now with several DLists
:
val rankings: DList[(String, Int)] = DList(???)
val rankingsReverse: DList[(Int, String)] = rankings map (_.swap)
val rankingsExample: DList[(Int, String)] = rankingsReverse.groupByKey.map { case (ranking, items) => (ranking, items.head) }
persist(rankings. toTextFile("hdfs://path/to/output"),
rankingsReverse.toTextFile("hdfs://path/to/output-reverse"),
rankingsExample.toTextFile("hdfs://path/to/output-example"))
As mentioned previously, persist
is the trigger for executing the computational graph associated with its DList
objects. By bundling DList
objects together, persist
is able to determine computations that are shared by those outputs and ensure that they are only performed once.
The simplest mechanism for persisting a DList
of any type is to store it as a text file using toTextFile
. This will simply invoke the toString
method of the type that the DList
is parameterised on:
/** output text file of the form:
* 34
* 3984
* 732
*/
val ints: DList[Int] = DList(34, 3984, 732)
ints.toTextFile("hdfs://path/to/output").persist
/** output text file of the form:
* (foo, 6)
* (bob, 23)
* (joe, 91)
*/
val stringsAndInts: DList[(String, Int)] = DList(("foo", 6), ("bar", 23), ("joe", 91))
stringsAndInts.toTextFile("hdfs://path/to/output").persist
In the same way that toString
is used primarily for debugging purposes, toTextFile
is best used for the same purpose. The reason is that the string representation for any reasonably complex type is generally
not convenient for input parsing. For cases where text file output is still important, and the output must be easily parsed, there are two options.
The first is to simply map
the DList
elements to formatted strings that are easily parsed. For example:
/** output text file of the form:
* foo, 6
* bob, 23
* joe, 91
*/
val stringsAndInts: DList[(String, Int)] = DList(("foo", 6), ("bar", 23), ("joe", 91))
val formatted: DList[String] = stringsAndInts map { case (s, i) => s + "," + i }
formatted.toTextFile("hdfs://path/to/output").persist
The second option is for cases when the desired output is a delimited text file, for example, a CSV or TSV. In this case, if the DList
is parameterised on a Tuple
, case class, or any Product
type, toDelimitedTextFile
can be used:
/** output text file of the form:
* foo, 6
* bob, 23
* joe, 91
*/
val stringsAndInts: DList[(String, Int)] = DList(("foo", 6), ("bar", 23), ("joe", 91))
stringsAndInts.toDelimitedTextFile("hdfs://path/to/output", ",").persist
/** the default separator is a tab (\\t), so in this case the output text file is of the form:
* foo 6
* bob 23
* joe 91
*/
stringsAndInts.toDelimitedTextFile("hdfs://path/to/output").persist
/** output text file of the form:
* foo, 6
* bob, 23
* joe, 91
*/
val peopleAndAges: DList[Person] = DList(Person("foo", 6), Person("bar", 23), Person("joe", 91))
peopleAndAges.toDelimitedTextFile("hdfs://path/to/output", ",").persist
It is possible to output a DList[(K, V)]
to text files in different directories where the name of the directory depends on the value of the key:
val list: DList[(java.util.Date, String)] = ???
// implement a function which will encode a date into a path name, for example as year=yy/month=dd/day=dd
val partitionFunction: java.util.Date => String = ???
list.toPartitionedTextFile("hdfs://path/to/output", partitionFunction)
The available mechanism for persisting a DList
to a Sequence file mirror those for persisting. The toSequenceFile
method can be used to persist a DList
of a Writable
pair:
val intText: DList[(IntWritable, Text)] = DList[(IntWritable, Text)](???)
intText.toSequenceFile("hdfs://path/to/output").persist
In cases where we want to persist a DList
to a Sequence file but its type parameter is not a Writable
pair, single Writable
can be stored as the key or the value, the other being NullWritable
:
// persist as IntWritable-NullWritable Sequence file
val ints: DList[IntWritable] = DList[IntWritable](???)
ints.keyToSequenceFile("hdfs://path/to/output").persist
// persist as NullWritable-IntWritable Sequence file
ints.valueToSequenceFile("hdfs://path/to/output").persist
Like loading, DList
s of simple Scala types can be automatically converted to Writable
types and persisted as Sequence files. The extent of these automatic conversions is limited to the types listed in the table above. Value- and key-only versions are also provided:
// persist as Int-String Sequence file
val intString: DList[(Int, String)] = DList[(Int, String)](???)
intString.toSequenceFile("hdfs://path/to/output").persist
// persist as Int-NullWritable Sequence file
intString.keyToSequenceFile("hdfs://path/to/output").persist
// persist as NullWritable-Int Sequence file
intString.valueToSequenceFile("hdfs://path/to/output").persist
To persist a DList
to an Avro file, Scoobi provides the method toAvroFile("")
. Again, in order for compilation to succeed, the DList
must be paramterised on a type that has an AvroSchema
type class instance implemented:
val xs: DList[(Int, Seq[(Float, String)], Map[String, Int])] = DList(???)
xs.toAvroFile("hdfs://path/to/file").persist
We have seen that to persist a DList
object we use the persist
method:
val (dogs, names) = (DList("Labrador retriever", "Poodle", "Boxer"), DList("Max", "Molly", "Toby"))
persist(dogs.toTextFile("hdfs://path/to/dogs"), names.toAvroFile("hdfs://path/to/names"))
But what exactly does toTextFile
, toAvroFile
and the other output methods? Those methods simply add Sinks to the DList
. Those sinks implement the DataSink
trait. The DataSink
trait is, not surprisingly, the reverse of the DataSource
trait. It is tightly coupled with the Hadoop OutputFormat
interface and requires the specification of an OutputConverter
that converts values contained within the DList
to key-value records to be persisted by the OutputFormat
:
trait DataSink[K, V, B] extends Sink {
def outputFormat(implicit sc: ScoobiConfiguration): Class[_ <: OutputFormat[K, V]]
def outputKeyClass(implicit sc: ScoobiConfiguration): Class[K]
def outputValueClass(implicit sc: ScoobiConfiguration): Class[V]
def outputConverter: OutputConverter[K, V, B]
def outputCheck(implicit sc: ScoobiConfiguration)
def outputConfigure(job: Job)(implicit sc: ScoobiConfiguration)
}
trait OutputConverter[K, V, B] {
def toKeyValue(x: B): (K, V)
}
Again, we can follow the types through to get a sense of how it works:
persist
is called with a DList
object that specifies Sinks
implementing the trait DataSink[K, V, B]
DataSink
object specifies the class of an OutputFormat
that can persist or write key-values of type K
-V
, which are specified by outputKeyClass
and outputValueClass
, respectivelyOutputConverter[K, V, B]
trait is specified by outputConverter
, which converts values of type B
to (K, V)
Like DataSource
, some additional methods are included in the DataSink
trait that provide configuration hooks:
outputCheck
: This method is called before any MapReduce jobs are run. It is provided as a hook to check the validity of the target data output. For example, it could check if the output already exists and if so throw an exceptionoutputConfigure
: This method is provided as a hook for configuring the DataSink
. Typically it is used to configure the OutputFormat
by adding or modifying properties in the job's Configuration
. It is called prior to running the specific MapReduce job this DataSink
consumes output data fromoutputSetup
method which is called right before output data is created (doing nothing by default). This allows to do some last-minute cleanup before outputing the data.The following Scala objects provided great working examples of DataSink
implementations in Scoobi:
DObjects
are results of distributed computations and can be accessed in memory with the run
method:
val list: DList[Int] = DList(1, 2, 3)
// the sum of all values
val sum: DObject[Int] = list.sum
// execute the computation graph and collect the result
println(sum.run)
The call to run
above is equivalent to calling persist
on the DObject
to execute the computation, then collecting the result. If you call:
val sum = DList(1, 2, 3).sum
persist(sum)
sum.run
then the first persist
executes the computation and run
merely retrieves the result.
Similarly, if you want to access the value of a DList
after computation, you can call run
on that list:
val list: DList[Int] = DList(1, 2, 3)
// returns Seq(1, 2, 3)
list.run
The code above is merely a shorthand for:
val list: DList[Int] = DList(1, 2, 3)
val materialisedList = list.materialise
// returns Seq(1, 2, 3)
materialisedList.run
Finally, when you have several DObjects
and DLists
which are part of the same computation graph, you can persist them all at once:
val list: DList[Int] = DList(1, 2, 3)
val plusOne: DList[Int] = list.map(_ + 1)
// the sum of all values
val sum: DObject[Int] = list.sum
// the max of all values
val max: DObject[Int] = list.max
// execute the computation graph for the 2 DObjects and one DList
persist(sum, max, plusOne)
// collect results
// (6, 3, Seq(2, 3, 4))
(sum.run, max.run, plusOne.run)
DObjects
can also be persisted to files by specifying sinks so that they can be re-loaded later. If the DObject
represents a single value, like a sum, you can write
val sum = DList(1, 2, 3).sum
sum.toAvroFile("hdfs://path/to/avro").persist
val reloaded: DObject[Int] = objectFromAvroFile[Int]("hdfs://path/to/avro")
And if the DObject
stores an Iterable
you can either load it as a DList
or a DObject
:
val even: DObject[Iterable[Int]] = DList(0, 2, 4, 6, 8).materialise
even.toAvroFile("hdfs://path/to/even").persist
val evenAsDList: DList[Int] = fromAvroFile[Int]("hdfs://path/to/even")
val evenAsDObject: DObject[Iterable[Int]] = fromAvroFile[Int]("hdfs://path/to/even").materialise
Many distributed algorithms (such as PageRank) require to iterate over DList computations. You evaluate the results of a DList computation, and based on that, you decide if you should go on with more computations.
For example, let's say for each integer in the list, we want to subtract 1 if the integer is positive (and leave it alone if it is 0 or less). We want to do this until the maximum from the list is 10.
There are several ways to write this, which we are going to evaluate:
val ints = DList(12, 5, 8, 13, 11)
def iterate1(list: DList[Int]): DList[Int] = {
if (list.max.run > 10) iterate1(list.map(i => if (i <= 0) i else i - 1))
else list
}
def iterate2(list: DList[Int]): DList[Int] = {
persist(list)
if (list.max.run > 10) iterate2(list.map(i => if (i <= 0) i else i - 1))
else list
}
def iterate3(list: DList[Int]): DList[Int] = {
persist(list, list.max)
if (list.max.run > 10) iterate3(list.map(i => if (i <= 0) i else i - 1))
else list
}
def iterate4(list: DList[Int]): DList[Int] = {
val maximum = list.max
persist(list, maximum)
if (maximum.run > 10) iterate4(list.map(i => if (i <= 0) i else i - 1))
else list
}
iterate1(ints).toTextFile("path").persist
iterate2(ints).toTextFile("path").persist
iterate3(ints).toTextFile("path").persist
iterate4(ints).toTextFile("path").persist
persist
In that case we get the least amount of generated MapReduce jobs, 5 jobs only: 4 jobs for the 4 main iterations, to do mapping + maximum, plus one job to write out the data to a text file
The big disadvantage of this method is that the DList
being computed is getting bigger and bigger all being re-computed all over for each new iteration.
DList
Here, before trying to evaluate the maximum value of the list, we save the mapped list first because later on we know we want to resume the computations from that stage, then we compute the maximum.
This generates 8 MapReduce jobs: 4 jobs to map the list each time we enter the loop + 4 jobs to compute the maximum. However, if we compare with 1. the computations are reduced to a minimum for each job because we reuse previously saved data.
DList
and the maximumThis variation creates 12 MapReduce jobs: 4 to map the list on each iteration, 4 to compute the maximum on each iteration (because even if the list and its maximum are persisted at the same time, one depends on the other) and 4 to recompute the maximum and bring it to memory! The issue here is that we call list.max
twice, hereby effectively creating 2 similar but duplicate DObject
s.
DList
and the maximum as a variableIn this case we get a handle on the maximum
DObject
and accessing his value with run
is just a matter of reading the persisted information hence the number of MapReduce jobs is 8, as in case 2.
It might be useful, for debugging reasons, to save the output of each intermediary step. Here is how to do it:
def iterate5(list: DList[Int]): DList[Int] = {
list.persist
if (list.max.run > 10) iterate5(list.map(i => if (i <= 0) i else i - 1).toAvroFile("out", overwrite = true))
else list
}
// no need to persist to a Text file since there is already an Avro file storing the results
iterate5(ints).persist
With the code above the intermediary results will be written to the same output directory. You can also create one output directory per iteration:
def iterate6(list: DList[Int], n: Int = 0): DList[Int] = {
list.persist
if (list.max.run > 10) iterate6(list.map(i => if (i <= 0) i else i - 1).toAvroFile("out"+n, overwrite = true), n+1)
else list
}
iterate6(ints).persist
When you have a big pipeline of consecutive computations it can be very time-consuming to start the process all over again if you've just changed some function down the track.
In order to avoid this you can create checkpoints, that is sinks which will persist data in between executions:
// before
val list1 = DList(1, 2, 3).map(_ + 1).
filter(isEven)
// after
val list2 = DList(1, 2, 3).map(_ + 1).toAvroFile("path", overwrite = true, checkpoint = true).
filter(isEven)
If you run the after
program twice, the second time the program is run, only the filter
operation will be executed taking its input data from the saved Avro file.
Important limitation: you can't use a Text
sink as a checkpoint because Text file sinks can't not be used as source files.
It is not necessary to declare a file type in order to create a checkpoint
val list = DList(1, 2, 3).map(_ + 1).checkpoint("path").filter(isEven)
In this case a SequenceFile will be created under to hold the checkpoint data.
By default the checkpoint files will always persist on disk and be reused the next time you run your application. However you can, if you want, specify an expiry time:
import scala.concurrent.duration._
val list = DList(1, 2, 3).map(_ + 1).checkpoint("path", expiryPolicy = ExpiryPolicy(expiryTime = 1 day)).filter(_ > 2)
When the file is expired, it will be simply overwritten. If this is a concern for you, there is another parameter "archiving" which can be used to specify what to do with the old checkpoint file:
import scala.concurrent.duration._
// append a counter to the old checkpoint file name
val list1 = DList(1, 2, 3).map(_ + 1).checkpoint("path",
expiryPolicy = ExpiryPolicy(expiryTime = 1 day, archive = ExpiryPolicy.incrementCounterFile)).filter(_ > 2)
// append a counter to the old checkpoint file name and remove the 5 oldest files
val list2 = DList(1, 2, 3).map(_ + 1).checkpoint("path",
expiryPolicy = ExpiryPolicy(expiryTime = 1 day, archive = ExpiryPolicy.incrementCounterAndRemoveLast(5))).filter(_ > 2)
// append a counter to the old checkpoint file name and remove the 5 oldest files
val customArchiving = (path: Path, sc: ScoobiConfiguration) => {
sc.fileSystem.rename(path, new Path(path.toUri+"-old")); ()
}
val list3 = DList(1, 2, 3).map(_ + 1).checkpoint("path",
expiryPolicy = ExpiryPolicy(expiryTime = 1 day, archive = customArchiving)).filter(_ > 2)