home

Understanding Map Reduce

Jan 20, 2011

I'm in the process of rolling out a feature which will let developers see some basic usage statistics on their games via mogade.com. The first phase was to make it so data could be collected, the next phase is to build the reporting UI. Of course, there's a phase in the middle which is to transform our data from raw statistics to actual reportable information. Since we are using mongodb we are doing our transformation via map reduce. Though our reports are basic, I thought I'd share some of the code since I know the topic is still exotic to a lot of developers.

As far as the actual transformation goes, it doesn't really matter whether you do it in real time (when the user asks for the report) or as part of a job. The way we do it though is to run a job which caches the data (permanently). The first report that we generate, which is really the Hello World of map reduce, is the number of times the game is started per day. Data comes into a single collection (think of it as a table if you aren't familiar with mongodb) which basically has 2 fields: gameId and date. The collection is called hits. We want to transform this information into a daily_hits collection with the following fields: game_id, year, month, day, count.

Essentially, given the following raw data:

game_id | date
1         Jan 20 2010 4:30
1         Jan 20 2010 5:30
2         Jan 20 2010 6:00
1         Jan 20 2010 7:00
2         Jan 21 2010 8:00
2         Jan 21 2010 8:30
1         Jan 21 2010 8:30
2         Jan 21 2010 9:00
1         Jan 21 2010 9:30
1         Jan 22 2010 5:00

We'd expect the following output:

game_id | year | month | day | count
1         2010   1       20    3
2         2010   1       20    1
2         2010   1       21    3
1         2010   1       21    2
1         2010   1       22    1

I like this approach because it's blazing fast, clear and data growth is controlled (per game you'll add 1 row of data per day).

In the relational world you'd solve this problem using a GROUP BY. We're going to use a map reduce. Map reduce is a two step process. The mapping step transforms object in hits and emits a key=>value pair (the key and/or value can be complex). The reduce gets a key and the array of values emitted with that key and produces the final result. Stepping through the process should help clarify.

It's possible for map to emit 0 or more times. We'll always make it emit once (which is a common). Imagine map as looping through each object in hits (we can apply a filter to only map-reduce over some objects, but thats besides the point). For each object we want to emit a key with game_id, year, month and day, and a simple value of 1:

function()
{
	var key = {game_id: this.game_id, year: this.date.getFullYear(), month: this.date.getMonth(), day: this.date.getDate()};
	emit(key, {count: 1});
}

Map reduce in mongodb is written in JavaScript, and this refers to the current object being inspected. Hopefully what'll help make this all clear for you is to see what the output of the mapping step is. Using our above data, the complete output would be:

{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}, {count:1}]
{game_id: 2, year: 2010, month: 0, day: 20} => [{count: 1}]
{game_id: 2, year: 2010, month: 0, day: 21} => [{count: 1}, {count: 1}, {count:1}]
{game_id: 1, year: 2010, month: 0, day: 21} => [{count: 1}, {count: 1}]
{game_id: 1, year: 2010, month: 0, day: 22} => [{count: 1}]

I think understanding this intermediary step is really the key to understanding map reduce. The values from emit are grouped together, as arrays, by key. I don't know if it helps, but .NET developers could think of the data structure as being of type IDictionary<object, IList<object>>.

Let's change our map function in some contrived way:

function()
{
	var key = {game_id: this.game_id, year: this.date.getFullYear(), month: this.date.getMonth(), day: this.date.getDate()};
	if (this.game_id == 1 && this.date.getHours() == 4)
	{
		emit(key, {count: 5});
	}
	else
	{
		emit(key, {count: 1});
	}

}

Then the first intermediary output would change to:

{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 5}, {count: 1}, {count:1}]

Now, the reduce function takes each of these intermediary results and outputs a final result. Here's what ours looks like:

function(key, values)
{
   var sum = 0;
   values.forEach(function(value)
   {
     sum += value['count'];
   });
   return {count: sum};
};

The final output from this is:

{game_id: 1, year: 2010, month: 0, day: 20} => {count: 3}
{game_id: 2, year: 2010, month: 0, day: 20} => {count: 1}
{game_id: 2, year: 2010, month: 0, day: 21} => {count: 3}
{game_id: 1, year: 2010, month: 0, day: 21} => {count: 2}
{game_id: 1, year: 2010, month: 0, day: 22} => {count: 1}

Technically, the output in MongoDB is:

_id: {game_id: 1, year: 2010, month: 0, day: 20}, value: {count: 3}

Hopefully you've noticed that this is the final result we were after.

If you've really been paying attention, you might be asking yourself: Why didn't we simply use sum = values.length?. This would seem like an efficient approach when you are essentially summing an array of 1s. The fact is that reduce isn't always called with a full and perfect set of intermediate date. For example, instead of being called with:

{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}, {count:1}]
Reduce could be called with:
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 1}, {count: 1}]
{game_id: 1, year: 2010, month: 0, day: 20} => [{count: 2}, {count: 1}]

The final output is the same (3), the path taken is simply different. As such, reduce must always be idempotent.

So what advantage does map reduce hold? The oft-cited benefit is that both the map and reduce operations can be distributed. So the code I've written above could be executed by multiple threads, multiple cpus, or even thousands of servers as-is. This is key when dealing with millions and billions of records, or smaller sets with more complex logic. For the rest of us though, I think the real benefit is the power of being able to write these types of transforms using actual programming languages, with variables, conditional statements, methods and so on. It is a mind shift from the traditional approach, but I do think even slightly complex queries are cleaner and easier to write with map reduce. We didn't look at it here, but you'll commonly feed the output of a reduce function into another reduce function - each function further transforming it towards the end-result.

I do want to point out that MongoDB doesn't currently scale map-reduce across multiple threads. Hopefully that's something that'll change in the future. Also, the current stable version pretty much stores the output into a collection (which can be permanent or temporary). We want to add the data to our daily_hits collection. This means pulling the data down to code and then inserting it (again, this isn't a huge deal since it's, at most, 1 row per game per day).

To deal with this MongoDB limitation we can leverage one of its great feature: upserts. The problem we have is that if we run this transform multiple times per day the key may or may not already exist in daily_hits. I've done this in SQL server in the past, and the end result is always executing an UPDATE for existing values and INSERT for new ones (typically requiring a join/subselect on the destination table to see if the key does or does not exist). Here's what the ruby code looks like:

#map and reduce are strings containing our above methods, blah!
db['hits'].map_reduce(map, reduce).find.each do |row|
  key = {:game_id => row['_id']['game_id'], :year => row['_id']['year'], :month => row['_id']['month'], :day => row['_id']['day']}
  daily_hits.update(key, {'$inc' => {:count => row['value']['count']}}, {:upsert => true})
end

If the key is found, then the count field is incremented by the specified value, else the entry is inserted and count is increment from 0 with the value.

In the next release of MongoDB (1.8) or the current development release (1.7.4) map reduce can take an output collection to insert into directly or a reduce operation to merge (upsert). This will avoid the need to pull the data down to code.

For completeness, we need to either delete the processed rows or flag them (and use the flag to filter them) so that when we rerun the transformation we don't double our results. This can actually be a bit tricky since new hits might have come in during our processing which we can't simply delete/mark as processed. A simple approach is to run the query against X year old records only. So our map reduce now looks like:

limit = 5.minutes.ago
db['hits'].map_reduce(map, reduce, {:query => {:date => {'$lte' => limit}, 'processed' => {'$exists' => false}}}).find.each do |row|
  key = {:game_id => row['_id']['game_id'], :year => row['_id']['year'], :month => row['_id']['month'], :day => row['_id']['day']}
  daily_hits.update(key, {'$inc' => {:count => row['value']['count']}}, {:upsert => true})
end
#update those records we just processed
db['hits'].update({:date => {'$lte' => limit}}, {'$set' => {'processed' => true}}, {:multi => true })