Memory efficient parallel repeated rarefaction with subsequent matrix addition of large data set

101 Views Asked by At

I am trying to speed up repeatedly rarefying a data frame and the subsequent addition of generated matrices. Some background information: The data set I want to repeatedly rarefy is very large (about 200 rows and >> 100 000 columns). So far, I have tried the following:

i) Repeated rarefaction and storing rarefied data frames in a list for subsequent matrix addition. Unfortunately, memory becomes the limiting factor pretty rapidly if many repeated rarefactions are performed.

ii) I also parallelized the repeated rarefaction through foreach, wrote the rarefied data frames to disk and retrieved them latter for matrix addition. While the first part was pretty fast, the latter negated all speed benefits.

Sadly, the fastest solution is currently the non-parallelized one:

library(vegan)
data <- matrix(sample(1:20, 10000, replace = TRUE), ncol = 100)
min_sample <- min(rowSums(data)[rowSums(data) > 0])
for (i in 1:100) {
  if (i == 1) {
    suppressWarnings(dataRF1 <- rrarefy(data, min_sample))
  }
  else if (i > 2) {
    suppressWarnings(dataRF2 <- rrarefy(data, min_sample))
    dataRF1 <- dataRF1 + dataRF2
  }
}

I also tried foreach with .combine = "+". However it seems that the rarefied data frames are held in memory as well and summed the earliest after all rarefactions have taken place. Due to memory limitations, I need a solution that performs matrix addition immediately after the construction of individual rarefied data frames or a comparable memory-efficient and fast approach.

Thanks for your help.

Update: Thanks @SamR and the other one. In fact, from parallelizing the rarefactioning through foreach, I know that the speed gains through parallel processing are not completely negated by the overhead but by the import of files from disk and downstream matrix addition (see approach ii). I am well aware that through parallelizing through sockets, each worker runs in strictly isolated fashion and hence, the matrix addition with a shared starting matrix is not possible. However, I am thinking about providing such a starting matrix to each of the workers separately.

For instance, the processing is performed through foreach on three workers. Within each worker, the rarefied data frames are summed by matrix addition, resulting in a total of three matrices that are summed themselves after the parallel processing has finished. To do so, I would need to provide this starting matrix (a matrix with the same dimension as the input data frame) to each of the worker separately. Is this possible?

2

There are 2 best solutions below

0
jay.sf On BEST ANSWER

Based on your comments, we could rrarefy R times. Next, we transform the resulting list in a 3D-array, where we easily may apply a function, e.g. mean on MARGINs c(1, 2), referring to rows and columns, i.e. mean is calculated accross the 3rd dimension of Replications.

> library(tictoc)  ## for timings
> ncpu <- 7L
> R <- 100
> 
> ## on Linux
> tic()
> rr <- parallel::mclapply(seq_len(R), \(i) rrarefy(data, min_sample), mc.cores=ncpu)
> res <- unlist(rr) |> array(c(dim(data), R)) |> apply(MARGIN=1:2, mean)
> res[1:5, 1:5]
      [,1]  [,2]  [,3]  [,4]  [,5]
[1,]  5.90 10.07 16.14 11.56  4.88
[2,]  7.11  9.66  4.28  4.33 17.40
[3,]  6.35  4.89  6.49 14.83  2.48
[4,] 13.01  3.42 11.26  2.46 16.15
[5,] 16.20  5.19  8.90  3.38  3.38
> toc()
0.534 sec elapsed
> 
> ## on Windows
> tic()
> cl <- parallel::makeForkCluster(ncpu)
> rr <- parallel::parLapply(cl, seq_len(R), \(i) rrarefy(data, min_sample))
> res <- unlist(rr) |> array(c(dim(data), R)) |> apply(1:2, mean)
> parallel::stopCluster(cl)
> res[1:5, 1:5]
      [,1]  [,2]  [,3]  [,4]  [,5]
[1,]  5.74 10.49 16.08 11.30  5.00
[2,]  7.05  9.51  4.72  4.37 16.95
[3,]  6.54  4.77  6.84 14.72  2.14
[4,] 13.33  3.72 10.79  2.54 16.32
[5,] 17.09  4.95  8.60  3.37  3.35
> toc()
0.66 sec elapsed

Perhaps, if RAM is too weak, we need parallel::makePSOCKcluster.


Data:

> set.seed(42)
> data <- matrix(sample(1:20, 10000, replace=TRUE), ncol=100)
> min_sample <- min(rowSums(data)[rowSums(data) > 0])
4
jblood94 On

Inspecting the code of vegan::rrarefy reveals that it is taking an independent (by row) sample from a multivariate hypergeometric distribution. The rmvhyper implementation from extraDistr is much faster (possibly multiple orders of magnitude, depending on the value of min_sample).

Performing the samples in parallel is straightforward and can be made relatively memory-efficient. Timing 100 sums of rarefied samples of a 200 x 2e5 matrix:

library(parallel)

data <- matrix(as.numeric(sample(1:20, 4e7, replace = TRUE)), ncol = 2e5)
system.time({
  rs <- rowSums(data)
  min_sample <- min(rs[rs > 0])
  # set up the parallel cluster
  cl <- makeCluster(detectCores() - 1L)
  invisible(clusterEvalQ(cl, library(extraDistr)))
  clusterExport(cl, "min_sample")
  # sum 100 random rarefied samples
  dataRF <- matrix(
    unlist(
      parLapply(
        cl, lapply(1:nrow(data), \(i) data[i,]),
        \(x) as.integer(colSums(rmvhyper(100, x, min_sample)))
      )
    ), nrow(data), byrow = TRUE
  )
  stopCluster(cl)
})
#>    user  system elapsed 
#>    1.42    0.55   91.95

For the matrix dimensions described, the memory limitation will be on the number of samples to be summed. rmvhyper(100, x, min_sample) results in an intermediate matrix of dimensions c(100, ncol(x)) in each cluster node. For many samples, this process could be repeated multiple times (e.g., for 1000 samples, iteratively call parLapply another 9 times and add it to the previous cumulative sum).


Check that vegan::rrarefy is equivalent to a multivariate hypergeometric random sample

Simulated data

library(vegan)
library(extraDistr)

N <- 1e5 # number of multivariate hypergeometric samples to take
n <- 10 # number of balls to sample from the "urn"
# example data: each row represents a different "urn" containing 
# varying numbers of balls of three different colors
m <- matrix(c(3,4,12,4,6,16), 2)
# sample using `vegan::rrarefy` and `extraDistr::rmvhyper`
system.time(rare <- replicate(N, c(suppressWarnings(rrarefy(m, n)))))
#>    user  system elapsed 
#>   35.59    0.35   38.77
system.time(hyper <- array(aperm(array(c(rmvhyper(N, m[1,], n), rmvhyper(N, m[2,], n)), c(N, 3:2)), 3:1), dim(rare)))
#>    user  system elapsed 
#>    0.14    0.02    0.15

Compare the marginal distributions.

for (i in 1:6) print(rbind(tabulate(rare[i,]), tabulate(hyper[i,])))
#>       [,1]  [,2] [,3]
#> [1,] 41213 37086 9034
#> [2,] 41211 37242 9194
#>       [,1]  [,2]  [,3] [,4]
#> [1,] 34259 38343 16102 1985
#> [2,] 34208 38508 15979 1939
#>      [,1] [,2] [,3]  [,4]  [,5]  [,6]  [,7] [,8] [,9] [,10]
#> [1,]    4  165 2220 11714 28083 33076 19152 4998  571    17
#> [2,]    2  174 2271 11704 28210 33069 18865 5153  538    14
#>       [,1]  [,2]  [,3] [,4]
#> [1,] 34532 38327 15765 1976
#> [2,] 34299 38562 15803 1988
#>      [,1]  [,2]  [,3]  [,4] [,5] [,6]
#> [1,] 8489 27497 36580 21237 4992  384
#> [2,] 8613 27714 36313 21132 5027  396
#>      [,1] [,2] [,3] [,4]  [,5]  [,6]  [,7]  [,8] [,9] [,10]
#> [1,]    0    1  198 2614 12524 28745 32632 18145 4712   429
#> [2,]    0    5  212 2582 12619 28620 32633 18294 4623   412

Compare the joint distributions.

x <- do.call(table, lapply(1:3, \(i) rare[i,]))
y <- do.call(table, lapply(1:3, \(i) hyper[i,]))
cbind(i <- which(x | y, TRUE), x[i], y[i])
#>   dim1 dim2 dim3          
#> 3    4    2    1    2    1
#> 3    4    3    1    1    0
#> 3    4    4    1    1    1
#> 2    3    1    2    5    4
#> 3    4    1    2   13   11
#> 2    3    2    2   19   19
#> 3    4    2    2   42   33
#> 2    3    3    2   27   21
#> 3    4    3    2   35   46
#> 2    3    4    2    8   17
#> 3    4    4    2   14   20
#> 3    4    5    2    2    3
#> 1    2    1    3   20   14
#> 2    3    1    3   76   99
#> 3    4    1    3   78   91
#> 1    2    2    3   55   75
#> 2    3    2    3  389  388
#> 3    4    2    3  326  339
#> 1    2    3    3   67   73
#> 2    3    3    3  418  423
#> 3    4    3    3  339  371
#> 1    2    4    3   26   43
#> 2    3    4    3  207  182
#> 3    4    4    3  167  137
#> 1    2    5    3    2    0
#> 2    3    5    3   30   17
#> 3    4    5    3   20   19
#> 0    1    1    4   13   15
#> 1    2    1    4  202  241
#> 2    3    1    4  639  559
#> 3    4    1    4  241  284
#> 0    1    2    4   60   43
#> 1    2    2    4  822  908
#> 2    3    2    4 2159 2148
#> 3    4    2    4  990 1010
#> 0    1    3    4   53   56
#> 1    2    3    4  940  917
#> 2    3    3    4 2455 2432
#> 3    4    3    4 1064 1060
#> 0    1    4    4   23   11
#> 1    2    4    4  390  365
#> 2    3    4    4  981  974
#> 3    4    4    4  449  463
#> 0    1    5    4    2    3
#> 1    2    5    4   55   48
#> 2    3    5    4  117  111
#> 3    4    5    4   59   56
#> 0    1    1    5  120  143
#> 1    2    1    5  905  964
#> 2    3    1    5 1292 1223
#> 3    4    1    5  304  335
#> 0    1    2    5  480  447
#> 1    2    2    5 3381 3369
#> 2    3    2    5 4605 4518
#> 3    4    2    5 1159 1184
#> 0    1    3    5  504  485
#> 1    2    3    5 3792 3841
#> 2    3    3    5 5104 5206
#> 3    4    3    5 1303 1362
#> 0    1    4    5  219  231
#> 1    2    4    5 1627 1596
#> 2    3    4    5 2156 2199
#> 3    4    4    5  556  566
#> 0    1    5    5   34   20
#> 1    2    5    5  205  198
#> 2    3    5    5  276  263
#> 3    4    5    5   61   60
#> 0    1    1    6  362  341
#> 1    2    1    6 1481 1449
#> 2    3    1    6 1047 1127
#> 3    4    1    6  140  161
#> 0    1    2    6 1428 1391
#> 1    2    2    6 5325 5373
#> 2    3    2    6 4015 4021
#> 3    4    2    6  543  555
#> 0    1    3    6 1530 1519
#> 1    2    3    6 6134 6033
#> 2    3    3    6 4513 4599
#> 3    4    3    6  627  544
#> 0    1    4    6  641  657
#> 1    2    4    6 2462 2434
#> 2    3    4    6 1953 1927
#> 3    4    4    6  241  250
#> 0    1    5    6   85   75
#> 1    2    5    6  308  339
#> 2    3    5    6  217  240
#> 3    4    5    6   24   34
#> 0    1    1    7  456  436
#> 1    2    1    7  993  959
#> 2    3    1    7  361  367
#> 3    4    1    7   21   16
#> 0    1    2    7 1582 1471
#> 1    2    2    7 3492 3496
#> 2    3    2    7 1421 1436
#> 3    4    2    7   99   71
#> 0    1    3    7 1732 1692
#> 1    2    3    7 3946 3959
#> 2    3    3    7 1498 1555
#> 3    4    3    7   73   87
#> 0    1    4    7  762  720
#> 1    2    4    7 1658 1564
#> 2    3    4    7  646  666
#> 3    4    4    7   38   23
#> 0    1    5    7  102   84
#> 1    2    5    7  204  184
#> 2    3    5    7   66   78
#> 3    4    5    7    2    1
#> 0    1    1    8  199  192
#> 1    2    1    8  245  235
#> 2    3    1    8   45   44
#> 0    1    2    8  724  693
#> 1    2    2    8  823  886
#> 2    3    2    8  123  148
#> 0    1    3    8  813  848
#> 1    2    3    8 1001 1021
#> 2    3    3    8  162  150
#> 0    1    4    8  298  363
#> 1    2    4    8  422  406
#> 2    3    4    8   48   74
#> 0    1    5    8   42   35
#> 1    2    5    8   45   51
#> 2    3    5    8    8    7
#> 0    1    1    9   37   47
#> 1    2    1    9   11    9
#> 0    1    2    9  132  121
#> 1    2    2    9   58   58
#> 0    1    3    9  136  127
#> 1    2    3    9   69   75
#> 0    1    4    9   72   63
#> 1    2    4    9   37   26
#> 0    1    5    9    9   10
#> 1    2    5    9   10    2
#> 0    1    1   10    5    0
#> 0    1    2   10    5    6
#> 0    1    3   10    7    6
#> 0    1    4   10    0    1
#> 0    1    5   10    0    1

vegan::rrarefy does appear to be sampling each row without replacement, which is what extraDistr::rmvhyper does. The difference (other than a huge speed difference) is that rrarefy takes a single sample from each row of a matrix all at once, while rmvhyper can be used to take many samples from a single row.