chunk.apply processes input in chunks and applies FUN to each chunk, collecting the results.

chunk.apply(input, FUN, ..., CH.MERGE = rbind, CH.MAX.SIZE = 33554432,
            CH.PARALLEL=1L, CH.SEQUENTIAL=TRUE, CH.BINARY=FALSE,
            CH.INITIAL=NULL)

chunk.tapply(input, FUN, ..., sep = "\t", CH.MERGE = rbind, CH.MAX.SIZE = 33554432)

Arguments

input

Either a chunk reader or a file name or connection that will be used to create a chunk reader

FUN

Function to apply to each chunk

...

Additional parameters passed to FUN

sep

for tapply, gives separator for the key over which to apply. Each line is split at the first separator, and the value is treated as the key over which to apply the function.

CH.MERGE

Function to call to merge results from all chunks. Common values are list to get lapply-like behavior, rbind for table-like output or c for a long vector.

CH.MAX.SIZE

maximal size of each chunk in bytes

CH.PARALLEL

the number of parallel processes to use in the calculation (unix only).

CH.SEQUENTIAL

logical, only relevant for parallel processing. If TRUE then the chunks are guaranteed to be processed in sequential order. If FALSE then the chunks may be processed in any order to gain better performance.

CH.BINARY

logical, if TRUE then CH.MERGE is a binary function used to update the result object for each chunk, effectively acting like the Reduce function. If FALSE then the results from all chunks are accumulated first and then CH.MERGE is called with all chunks as arguments. See below for performance considerations.

CH.INITIAL

Function which will be applied to the first chunk if CH.BINARY=TRUE. If NULL then CH.MERGE(NULL, chunk) is called instead.

Note

The input to FUN is the raw chunk, so typically it is advisable to use mstrsplit or similar function as the first step in FUN.

Value

The result of calling CH.MERGE on all chunk results as arguments (CH.BINARY=FALSE) or result of the last call to binary CH.MERGE.

Details

Due to the fact that chunk-wise processing is typically used when the input data is too large to fit in memory, there are additional considerations depending on whether the results after applying FUN are itself large or not. If they are not, then the apporach of accumulating them and then applying CH.MERGE on all results at once is typically the most efficient and it is what CH.BINARY=FALSE will do.

However, in some situations where the result are resonably big or the number of chunks is very high, it may be more efficient to update a sort of state based on each arriving chunk instead of collecting all results. This can be achieved by setting CH.BINARY=TRUE in which case the process is equivalent to:

res <- CH.INITIAL(FUN(chunk1))
res <- CH.MERGE(res, FUN(chunk2))
res <- CH.MERGE(res, FUN(chunk3))
...
res

If CH.INITITAL is NULL then the first line is res <- CH.MERGE(NULL, FUN(chunk1)).

The parameter CH.SEQUENTIAL is only used if parallel processing is requested. It allows the system to process chunks out of order for performace reasons. If it is TRUE then the order of the chunks is respected, but merging can only proceed if the result of the next chunk is avaiable. With CH.SEQUENTIAL=FALSE the workers will continue processing further chunks as they become avaiable, not waiting for the results of the preceding calls. This is more efficient, but the order of the chunks in the result is not deterministic.

Note that if parallel processing is required then all calls to FUN should be considered independent. However, CH.MERGE is always run in the current session and thus is allowed to have side-effects.

Author

Simon Urbanek

Note

The support for CH.PARALLEL is considered experimental and may change in the future.

Examples

if (FALSE) {
## compute quantiles of the first variable for each chunk
## of at most 10kB size
chunk.apply("input.file.txt",
            function(o) {
              m = mstrsplit(o, type='numeric')
              quantile(m[,1], c(0.25, 0.5, 0.75))
            }, CH.MAX.SIZE=1e5)
}