chunk.apply.Rdchunk.apply processes input in chunks and applies FUN
to each chunk, collecting the results.
chunk.apply(input, FUN, ..., CH.MERGE = rbind, CH.MAX.SIZE = 33554432,
CH.PARALLEL=1L, CH.SEQUENTIAL=TRUE, CH.BINARY=FALSE,
CH.INITIAL=NULL)
chunk.tapply(input, FUN, ..., sep, CH.MERGE = rbind, CH.MAX.SIZE = 33554432)Either a chunk reader or a file name or connection that will be used to create a chunk reader
Function to apply to each chunk
Additional parameters passed to FUN
singe character string. For tapply, gives
separator for the key over which to apply. Each line is split at the
first separator, and the preceding value is treated as the key over
which to apply the function. If the input is a chunk reader,
then this value is ignored (can be missing) and the key separator of
the chunk reader is always used, otherwise defaults to "\t"
and the corresponding chunk reader with that key separator is
created internally.
Function to call to merge results from all
chunks. Common values are list to get lapply-like
behavior, rbind for table-like output or c for a long
vector.
maximal size of each chunk in bytes
the number of parallel processes to use in the calculation (unix only).
logical, only relevant for parallel
processing. If TRUE then the chunks are guaranteed to be
processed in sequential order. If FALSE then the chunks may
be processed in any order to gain better performance.
logical, if TRUE then CH.MERGE is a
binary function used to update the result object for each chunk,
effectively acting like the Reduce function. If FALSE
then the results from all chunks are accumulated first and then
CH.MERGE is called with all chunks as arguments. See below
for performance considerations.
Function which will be applied to the first chunk if
CH.BINARY=TRUE. If NULL then CH.MERGE(NULL,
chunk) is called instead.
The input to FUN is the raw chunk, so typically it is
advisable to use mstrsplit or similar function as the
first step in FUN.
The result of calling CH.MERGE on all chunk results as
arguments (CH.BINARY=FALSE) or result of the last call to
binary CH.MERGE.
Due to the fact that chunk-wise processing is typically used when the
input data is too large to fit in memory, there are additional
considerations depending on whether the results after applying
FUN are itself large or not. If they are not, then the apporach
of accumulating them and then applying CH.MERGE on all results
at once is typically the most efficient and it is what
CH.BINARY=FALSE will do.
However, in some situations where the result are resonably big or
the number of chunks is very high, it may be more efficient to update
a sort of state based on each arriving chunk instead of collecting all
results. This can be achieved by setting CH.BINARY=TRUE in which
case the process is equivalent to:
res <- CH.INITIAL(FUN(chunk1))
res <- CH.MERGE(res, FUN(chunk2))
res <- CH.MERGE(res, FUN(chunk3))
...
resIf CH.INITITAL is NULL then the first line is
res <- CH.MERGE(NULL, FUN(chunk1)).
The parameter CH.SEQUENTIAL is only used if parallel
processing is requested. It allows the system to process chunks out of
order for performace reasons. If it is TRUE then the order of
the chunks is respected, but merging can only proceed if the result of
the next chunk is avaiable. With CH.SEQUENTIAL=FALSE the workers
will continue processing further chunks as they become avaiable, not
waiting for the results of the preceding calls. This is more
efficient, but the order of the chunks in the result is not
deterministic.
Note that if parallel processing is required then all calls to
FUN should be considered independent. However, CH.MERGE
is always run in the current session and thus is allowed to have
side-effects.
chunk.tapply requires that the input is sharded by key, i.e.
records with the same key must be adjacent (similar to
ctapply). The function FUN is then guaranteed to
be called for all values of exactly one key at a time (unlike
chunk.apply which always processes the entire chunk
which may contain multiple keys).
The support for CH.PARALLEL is considered experimental and may
change in the future.