Grouping

## Grouping

The sort and shuffle phase of MapReduce is abstracted by `DList.groupByKey`, it allows us to have a `DList[(K, V)]` and obtain a `DList[(K, Iterable[V])]` where all the values "with the same" K are grouped together. Grouping is a scala trait, that defines exactly what constitutes "with the same K" means.

### The Grouping trait

The Grouping type class is automatically provided for anything with a `scala.math.Ordering`, or that implements Java's `java.lang.Comparable` interface. This means all common types (e.g. String, Int etc.) can be grouped out of the box. If you have a more complex type (or complex grouping requirements) you will need to write some code to group by the type. The three options are:

Since the third option is the only one Scoobi specific, and a little more powerful, we'll focus on that.

### Basic grouping

Let's say we have a user-defined type, `case class Point(x: Integer, y: Integer)` and are using it in a `DList[(Point, String)]`. Now if want to group all the same points together, so as to end up with a: `DList[(Point, Iterable[String])` we will need to write a function that defines what makes two points equal. It would not be enough to merely provide a pure equality function, as that would require every point to be check against every point (quadratic time).

Instead, we need to provide strict total ordering. That is, provide a function that can reliably create an ordering for Points. It doesn't really matter how this ordering is done, as long as it is. The scheme normally used is to break the type down into all its members, and provide ordering based on it. So in the `Point` case, we can order by `Point.x`, if they're the same, order by `Point.y` and finally, if they're the same -- we can say the two points are the same.

The `Grouping[T]` trait has a method called `sortCompare` that we will override. It takes two T's, and like Java's `Comparable` it returns an Int. A negative number to indicate the first `T` is less than the second. Zero if the two `T`s are equal. And a positive number if the second `T` is after the first `T`.

Sample code for our `Point`:

``````import scalaz.Ordering._

implicit val pointGrouping = new Grouping[Point] {
override def groupCompare(first: Point, second: Point) = {
val xResult = first.x.compareTo(second.x)
val result =
if (xResult != 0) xResult
else {
val yResult = first.y.compareTo(second.y)
if (yResult != 0) yResult
else              0
}
fromInt(result)
}
}
``````

### Secondary sort

Assuming you have read and understood the previous sections, we will move on to some advanced usage of Grouping by example. Let's start some type alias's to make the code more understandable:

``````type FirstName = String
type LastName = String
``````

And let's start with a DList with some easily understandable data:

``````val names: DList[(FirstName, LastName)] = DList.apply(
("Michael", "Jackson"),
("Leonardo", "Da Vinci"),
("John", "Kennedy"),
("Mark", "Twain"),
("Bat", "Man"),
("Michael", "Jordan"),
("Mark", "Edison"),
("Michael", "Landon"),
("Leonardo", "De Capro"),
("Michael", "J. Fox"))
``````

Based on the previous sections, you should know that we could simply do `names.groupByKey` to obtain a `DList[(FirstName, Iterable[String])]. However, there's no ordering associated with the`Iterable[LastName]` this means, if order is required (e.g. we're processing a time series) or outputting the last names in alphabetical order -- we'd have to use a parallelDo to load the entire reducers collection to memory, then sort it there. This is both slow, and going to likely to use too much memory.

So this is where a "Secondary Sort", comes into play. A secondary sort allows us to make sure the `Iterable[LastName]` comes to us in a defined ordering, so we can efficiently process it.

In hadoop (and thus scoobi) sorting only happens on the Key, so what we need to do, is make our Key contain enough information to sort the last names for any given first name.

Your first instinct might be to move the information to the key, e.g. make the type: `DList[((FirstName, LastName), Unit)]` however this will not work. We need to instead duplicate the information to the key, otherwise while things arrive in the correct order, it will not be possible to get the value!

``````val bigKey: DList[((FirstName, LastName), LastName)] = names.map(a => ((a._1, a._2), a._2))
``````

Now the key part of `bigKey` is `(FirstName, LastName)` so this is what we need to provide a `Grouping` object for. Our goal is to make sure that two keys with the same FirstName evaluate to being the same, so `groupCompare` and `partition` should only consider the FirstName part. However, `sortCompare` should put everything in the correct order (so it should consider both parts).

``````import scalaz.Ordering._

val grouping = new Grouping[(FirstName, LastName)] {

override def partition(key: (FirstName, LastName), howManyReducers: Int): Int = {
// This function says what reducer this particular 'key' should go to. We must override the
// default impl, because it looks at the entire key, and makes sure all the same
// keys go to the same reducer. But we want to only 'look' at the 'FirstName' part
// so that everything with the same FirstName goes to the same reducer (even if it has a different LastName)

// So we'll just use the default (string) partition, and only on the first name
implicitly[Grouping[FirstName]].partition(key._1, howManyReducers)
}

override def sortCompare(a: (FirstName, LastName), b: (FirstName, LastName)): scalaz.Ordering = {
// Ok, here's where the fun is! Everything that is sent to the same reducer now needs
// an ordering. So this function is called. Here we return `scalaz.Ordering.LT` if 'a' should be before 'b',
// `scalaz.Ordering.EQ` if they're they same, and `scalaz.Ordering.GT` if they're different.

// So the first thing we want to do, is look at first names
val firstNameOrdering = a._1.compareTo(b._1)

firstNameOrdering match {
case 0 => {
// Interesting! Here the firstName's are the same. So what we want to do, is order by
// the lastNames

fromInt(a._2.compareTo(b._2))
}
case x => fromInt(x) // otherwise, just return the result for which of the FirstName's is first
}
}

override def groupCompare(a: (FirstName, LastName), b: (FirstName, LastName)): scalaz.Ordering = {
// So now everything going to the reducer has a proper ordering (thanks to our 'sortCompare' function)
// now hadoop allows us to "collapse" everything that is logically the same. So two keys are logically
// the same if the FirstName's are equal
fromInt(a._1.compareTo(b._1))
}

}
``````

Now calling `bigKey.groupByKeyWith(grouping)` will work as intended, with all lastNames arriving in order. Note, that instead of explicitly passing in `grouping` it's possible to use implicits and the plain `groupByKey`, just be careful the correct one is being picked up.