I have a large code and the aggregation step is the current bottleneck in terms of speed.
In my code I'd like to speed-up the data grouping step to be faster. A SNOTE (simple non trivial example) of my data looks like this:
library(data.table)
a = sample(1:10000000, 50000000, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e)
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)])
user system elapsed
60.107 3.143 63.534
This is quite fast for such large data example but in my case I am still looking for further speed-up. In my case I have multiple cores so I am almost sure there must be a way to use such computational capability.
I am open to changing my data type to a data.frame, or idata.frame objects (in theory idata.frame are supposedly faster than data.frames).
I did some research and seems the plyr package has some parallel capabilities that could be helpful but I am still struggling on how to do it for the grouping I am trying to do. In another SO post they discuss some of these ideas. I am still unsure on how much more I'd achieve with this parallelization since it uses the foreach function. In my experience the foreach function is not a good idea for millions of fast operations because the communication effort between cores ends up slowing down the parallelization effort.
Can you parallelize aggregation with data.table
? Yes.
Is it worth it? NO. This is a key point that the previous answer failed to highlight.
As Matt Dowle explains in data.table and parallel computing, copies ("chunks") need to be made before being distributed when running operations in parallel. This slows things down. In some cases, when you cannot use data.table
(e.g. running many linear regressions), it is worth splitting up tasks between cores. But not aggregation — at least when data.table
is involved.
In short (and until proven otherwise), aggregate using data.table
and stop worrying about potential speed increases using doMC
. data.table
is already blazing fast compared to anything else available when it comes to aggregation — even if it's not multicore!
Here are some benchmarks you can run for yourself comparing data.table
internal aggregation using by
with foreach
and mclapply
.
The results are listed first.
#-----------------------------------------------
# TL;DR FINAL RESULTS (Best to Worst)
# 3 replications, N = 10000:
# (1) 0.007 -- data.table using `by`
# (2) 3.548 -- mclapply with rbindlist
# (3) 5.557 -- foreach with rbindlist
# (4) 5.959 -- foreach with .combine = "rbind"
# (5) 14.029 -- lapply
# ----------------------------------------------
library(data.table)
## And used the following to create the dt
N <- 1e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
setkey(dt, "a")
# TEST AGGREGATION WITHOUT PARALLELIZATION ---------------------------
## using data.tables `by` to aggregate
round(rowMeans(replicate(3, system.time({
dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]
}))), 3)
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
## using `lapply`
round(rowMeans(replicate(3, system.time({
results <- lapply(unique(dt[["a"]]), function(x) {
dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])]
})
rbindlist(results)
}))), 3)
# [1] 14.029 elapsed for N == 10,000
# USING `mclapply` FORKING ---------------------------------
## use mclapply
round(rowMeans(replicate(3, system.time({
results <- mclapply(unique(dt[["a"]]),
function(x) {
dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}, mc.cores=4)
rbindlist(results)
}))), 3)
# [1] 3.548 elapsed for N == 10,000
# PARALLELIZATION USING `doMC` PACKAGE ---------------------------------
library(doMC)
mc = 4
registerDoMC(cores=mc)
getDoParWorkers()
# [1] 4
## (option a) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}))), 3)
# [1] 5.959 elapsed for N == 10,000
## (option b) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
results <-
foreach(x=unique(dt[["a"]])) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
rbindlist(results)
}))), 3)
# [1] 5.557 elapsed for N == 10,000
registerDoSEQ()
getDoParWorkers()
# [1] 1