hadoop {hmr}R Documentation

Experimental Hadoop chunk-processing code


hmr runs a chunk-wise Hadoop job.

hpath and hinput define HDFS file path and input source.


hmr(input, output, map = identity, reduce = identity, job.name,
    aux, formatter, packages = loadedNamespaces(), reducers,
    remote, wait=TRUE, hadoop.conf, hadoop.opt, R="R",
    verbose = TRUE, persistent = FALSE, overwrite = FALSE,
    use.kinit = !is.null(getOption("hmr.kerberos.realm")))
hinput(path, formatter = .default.formatter)



input data - see details


output path (optional)


chunk compute function (map is a misnomer)


chunk combine function


name of the job to pass to Hadoop


either a character vector of symbols names or a named list of values to push to the compute nodes


formatter to use. It is optional in hmr if the input source already contains a formatter definition. See below for details on how to sepcify separate formatters.


character vector of package names to attach on the compute nodes


optional integer specifying the number of parallel jobs in the combine step. It is a hint in the sense that any number greater than one implies independence of the chunks in the combine step. Default is to not assume independence.


optional, active Rserve connection or a host name. IF specified, the job is submitted remotely by using HAdoop installed on the target machine (experimental)


logical, if TRUE then the command returns after the job finished, otherwise the command returns after the job has been submitted


optional string, path to the hadoop configuration directory for submission


additional Java options to pass to the job - named character vectors are passed as -D<name>=<value>, unnamed vectors are collapsed. Note: this is only a transitional interface to work around deficiencies in the job generation and should only be used as a last measure since the semantics is implementation specific and thus not prtable across systems.


command to call to run R on the Hadoop cluster


logical, indicating whether the output sent to standard error and standard out from hadoop should be printed to the console.


logical, if TRUE then an ROctopus job is started and the mapper is executed in "hot" ROctopus instances instead of regular R. The results in that case are ROctopus URLs.


logical, if TRUE then the output directory is first deleted before the job is started.


logical, if TRUE automatically invokes kinit(realm=getOption("hmr.kerberos.realm")) before running any Hadoop commands.


HDFS path


hmr creates and runs a Hadoop job to perform chunkwise compute + combine. The input is read using chunk.reader, processed using the formatter function and passed to the map function. The result is converted using as.output before going back to Hadoop. The chunkwise results are combined using the reduce function - the flow is the same as in the map case. Then result is returned as HDFS path. Either map or reduce can be identity (the default).

If the formatter if omitted then the format is taken from input object (if it has one) or the default formatter (mstrsplit with '\t' as key spearator, '|' as column separator) is used. If formater is a function then the same formatter is used for both the map and reduce steps. If separate formatters are required, the formatter can be a list with the entries map and/or reduce specifying the corresponding formatter function.

hpath tags a string as HDFS path. The sole purpose here is to distiguish local and HDFS paths.

hinput creates a subclass of HDFSpath which also contains the definition of the formatter for that path. The default formatter honors default Hadoop settings of '\t' as the key/value separator and '|' as the field separator.


hmr returns the HDFS path to the result when finished.

hpath returns a character vector of class "HDFSpath"

hinput returns a subclass "hinput" of "HDFSpath" containing the additional "formatter" attribute.


Requires properly installed Hadoop client. The installation must either be in /usr/lib/hadoop or one of HADOOP_HOME, HADOOP_PREFIX environment variables must be set accordingly.


Simon Urbanek


## Not run: 
## map points to ZIP codes and count the number of points per ZIP
## uses Tiger/LINE 2010 census data shapefiles
## we can use ctapply becasue Hadoop guarantees contiguous input

## require(fastshp); require(tl2010)
r <- hmr(
  map = function(x)
       inside(zcta2010.shp(), x[,4], x[,5]), 1]),
  reduce = function(x) ctapply(as.numeric(x), names(x), sum))

## End(Not run)

[Package hmr version 1.0-8 Index]