MongoDB: Terrible MapReduce Performance

mellowsoon picture mellowsoon · Oct 16, 2010 · Viewed 26.1k times · Source

I have a long history with relational databases, but I'm new to MongoDB and MapReduce, so I'm almost positive I must be doing something wrong. I'll jump right into the question. Sorry if it's long.

I have a database table in MySQL that tracks the number of member profile views for each day. For testing it has 10,000,000 rows.

CREATE TABLE `profile_views` (
  `id` int(10) unsigned NOT NULL auto_increment,
  `username` varchar(20) NOT NULL,
  `day` date NOT NULL,
  `views` int(10) unsigned default '0',
  PRIMARY KEY  (`id`),
  UNIQUE KEY `username` (`username`,`day`),
  KEY `day` (`day`)
) ENGINE=InnoDB;

Typical data might look like this.

+--------+----------+------------+------+
| id     | username | day        | hits |
+--------+----------+------------+------+
| 650001 | Joe      | 2010-07-10 |    1 |
| 650002 | Jane     | 2010-07-10 |    2 |
| 650003 | Jack     | 2010-07-10 |    3 |
| 650004 | Jerry    | 2010-07-10 |    4 |
+--------+----------+------------+------+

I use this query to get the top 5 most viewed profiles since 2010-07-16.

SELECT username, SUM(hits)
FROM profile_views
WHERE day > '2010-07-16'
GROUP BY username
ORDER BY hits DESC
LIMIT 5\G

This query completes in under a minute. Not bad!

Now moving onto the world of MongoDB. I setup a sharded environment using 3 servers. Servers M, S1, and S2. I used the following commands to set the rig up (Note: I've obscured the IP addys).

S1 => 127.20.90.1
./mongod --fork --shardsvr --port 10000 --dbpath=/data/db --logpath=/data/log

S2 => 127.20.90.7
./mongod --fork --shardsvr --port 10000 --dbpath=/data/db --logpath=/data/log

M => 127.20.4.1
./mongod --fork --configsvr --dbpath=/data/db --logpath=/data/log
./mongos --fork --configdb 127.20.4.1 --chunkSize 1 --logpath=/data/slog

Once those were up and running, I hopped on server M, and launched mongo. I issued the following commands:

use admin
db.runCommand( { addshard : "127.20.90.1:10000", name: "M1" } );
db.runCommand( { addshard : "127.20.90.7:10000", name: "M2" } );
db.runCommand( { enablesharding : "profiles" } );
db.runCommand( { shardcollection : "profiles.views", key : {day : 1} } );
use profiles
db.views.ensureIndex({ hits: -1 });

I then imported the same 10,000,000 rows from MySQL, which gave me documents that look like this:

{
    "_id" : ObjectId("4cb8fc285582125055295600"),
    "username" : "Joe",
    "day" : "Fri May 21 2010 00:00:00 GMT-0400 (EDT)",
    "hits" : 16
}

Now comes the real meat and potatoes here... My map and reduce functions. Back on server M in the shell I setup the query and execute it like this.

use profiles;
var start = new Date(2010, 7, 16);
var map = function() {
    emit(this.username, this.hits);
}
var reduce = function(key, values) {
    var sum = 0;
    for(var i in values) sum += values[i];
    return sum;
}
res = db.views.mapReduce(
    map,
    reduce,
    {
        query : { day: { $gt: start }}
    }
);

And here's were I run into problems. This query took over 15 minutes to complete! The MySQL query took under a minute. Here's the output:

{
        "result" : "tmp.mr.mapreduce_1287207199_6",
        "shardCounts" : {
                "127.20.90.7:10000" : {
                        "input" : 4917653,
                        "emit" : 4917653,
                        "output" : 1105648
                },
                "127.20.90.1:10000" : {
                        "input" : 5082347,
                        "emit" : 5082347,
                        "output" : 1150547
                }
        },
        "counts" : {
                "emit" : NumberLong(10000000),
                "input" : NumberLong(10000000),
                "output" : NumberLong(2256195)
        },
        "ok" : 1,
        "timeMillis" : 811207,
        "timing" : {
                "shards" : 651467,
                "final" : 159740
        },
}

Not only did it take forever to run, but the results don't even seem to be correct.

db[res.result].find().sort({ hits: -1 }).limit(5);
{ "_id" : "Joe", "value" : 128 }
{ "_id" : "Jane", "value" : 2 }
{ "_id" : "Jerry", "value" : 2 }
{ "_id" : "Jack", "value" : 2 }
{ "_id" : "Jessy", "value" : 3 }

I know those value numbers should be much higher.

My understanding of the whole MapReduce paradigm is the task of performing this query should be split between all shard members, which should increase performance. I waited till Mongo was done distributing the documents between the two shard servers after the import. Each had almost exactly 5,000,000 documents when I started this query.

So I must be doing something wrong. Can anyone give me any pointers?

Edit: Someone on IRC mentioned adding an index on the day field, but as far as I can tell that was done automatically by MongoDB.

Answer

nonopolarity picture nonopolarity · Oct 17, 2010

excerpts from MongoDB Definitive Guide from O'Reilly:

The price of using MapReduce is speed: group is not particularly speedy, but MapReduce is slower and is not supposed to be used in “real time.” You run MapReduce as a background job, it creates a collection of results, and then you can query that collection in real time.

options for map/reduce:

"keeptemp" : boolean 
If the temporary result collection should be saved when the connection is closed. 

"output" : string 
Name for the output collection. Setting this option implies keeptemp : true.